RT-Thread之消息队列_rtthread消息队列-程序员宅基地

技术标签: RTOS  RT-Thread  

消息队列的应用场景

消息队列是常用的线程间通信方式,是一种异步的通信方式。消息队列可以应用于多种场合:线程间的消息交换、在中断服务函数中给线程发送消息(中断服务例程不可能接收消息)、使用串口接收不定长数据等。

消息队列的基本概念

队列又称消息队列,是一种常用于线程间通信的数据结构。队列可以在线程与线程间、中断与线程间传送消息,实现了线程接收来自其它线程或中断的不固定长度的消息,并根据不同的接口选择传递的消息是否存放在自己的空间。线程能够从队列中读取消息,当队列中的消息为空时,挂起读取线程。用户还可以指定线程挂起的超时时间timeout;当队列中有新消息时,挂起的读取线程被唤醒以接收并处理新消息。

通过消息队列服务,线程或中断服务例程可以将一条或多条消息放入消息队列中。同样,一个或多个线程可以从消息队列中获得消息。当有多个消息发送到消息队列时,通常将先进入消息队列的消息传给线程,也就是说,线程先得到的是最先进入消息队列的消息,即先进先出(FIFO)原则。同时,RT-Thread中的消息队列支持优先级,所有等待消息的线程中优先级最高的会先获得消息。

用户在处理业务时,消息队列提供了异步处理机制,允许将一个消息放入队列,但并不立即处理它,同时队列还能起到缓冲消息的作用。

RT-Thread中使用队列数据结构实现线程异步通信工作,具有如下特性:

  • 消息支持先进先出排队方式与优先级排队方式,支持异步读写工作方式
  • 读队列支持超时机制
  • 支持发送紧急消息
  • 可以允许不同长度(不超过队列节点最大值)的任意类型消息
  • 一个线程能够从任意一个消息队列接收和发送消息
  • 多个线程能够从同一个消息队列接收和发送消息
  • 当队列使用结束后,需要通过删除队列操作来释放内存

消息队列的工作机制

创建消息队列时先创建一个消息队列对象控制块。然后给消息队列分配一块内存空间,组织成空闲消息链表,这块内存的大小等于[(消息大小+消息头)*消息队列容量](对应后文中struct rt_messagequeue中的msg_size以及max_msgs)。接着再初始化消息队列,此时消息队列为空。

RT-Thread操作系统的消息队列对象由多个元素组成,当消息队列被创建时,它就被分配了消息队列控制块:消息队列名称、内存缓冲区消息大小以及队列长度等。同时,每个消息队列对象中包含多个消息框,每个消息框可以存放一条消息。消息队列中的第一个和最后一个消息框分别被称为消息链表头和消息链表尾,对应于消息队列控制块中的msg_queue_head和msg_queue_tail。有些消息框可能是空的,它们通过msg_queue_free形成一个空闲消息框链表。所有消息队列中的消息框总数就是消息队列的长度,这个长度可在消息队列创建时被指定。

线程或中断服务程序都可以给消息队列发送消息。当发送消息时,

(1)消息队列对象先从空闲消息链表上取出下一个空闲消息快;

(2)把线程或中断服务例程发送的消息内容复制到消息块上;

(3)然后把该消息块挂到消息队列的尾部。

当且仅当空闲消息链表上有可用的空闲消息块,发送者才能成功发送消息;当空闲消息链表上无可用消息块时,说明消息队列已满,此时,发送消息的线程或者中断服务程序会收到一个错误码(-RT_EFULL)。

发送紧急消息的过程与发送消息的过程几乎一样,唯一不同的是,当发送紧急消息时,从空闲消息链表上取下来的消息快不是挂到消息队列的队尾,而是挂到消息队列的队首,这样,接受者就能够优先接收到紧急消息,从而及时进行消息处理。

读取消息时,根据消息链表头(msg_queue_head)找到最先进入队列的消息节点进行读取。根据消息队列控制块中的entry判断队列是否有消息读取,对于全部空闲的队列(entry为0)进行读消息操作会引起线程挂起。

当消息队列不再被使用时,应该删除它以释放系统资源。一旦删除操作完成,消息队列将被永久性地删除。

消息队列的阻塞机制

消息队列一般不是属于某个线程的队列,在很多时候,创建的队列是每个线程都可以对其进行读写操作的,但是为了保护被每个线程对其进行读写操作的过程,必须有阻塞机制。在某个线程对消息队列进行读写操作时,必须保证该线程能正常完成读写操作,而不受后来的线程干扰。

那么,如何实现这个机制呢?

很简单,因为RT-Thread已经提供了这种机制,直接使用即可。每个对消息队列读写的函数都有这种机制,称其为阻塞机制。

举个例子:假设有一个线程A,在其对某个队列进行读操作(出队)时发现此队列上没有消息,那么此时线程A有三种选择:第一种选择,既然队列没有消息,那么不再等待,去进行其它操作,这样线程A不会进入阻塞态;第二种选择,线程A继续等待,此时线程A会进入阻塞状态,等待消息的到来。而线程A的等待时间可以自行定义,比如设置1000个tick的超时时间,在这段时间之内,如果没有消息,线程A都是处于阻塞态;若阻塞的这段时间内,线程A收到了队列的消息,则线程A就会由阻塞态变为就绪态。如果此时线程A的优先级比当前运行的线程的优先级高,则线程A就会得到消息并且运行。加入1000个tick过去了,队列还是没有消息,那么线程A就不再等待了,而是从阻塞态中(超时)唤醒,并返回一个没有等到消息的错误代码,然后继续执行线程A的其它代码;第三种选择,线程A一直等待,直到收到消息,这样线程A就会进入阻塞态,直到完成读取队列的消息。

与接收消息机制不同,发送消息操作并不带有阻塞机制,因为发送消息的环境可能是在中断中,不允许出现阻塞的情况。在消息队列已满的情况下,将返回一个错误码(-RT_EFULL)。

消息队列控制块

在RT-Thread中,消息队列控制块是操作系统用于管理消息队列的一个数据结构,由结构体struct rt_messagequeue表示,另一种C表达方式为rt_mq_t,表示的是消息队列的句柄,在C语言中的实现是消息队列控制块的指针。消息队列控制块结构体的详细定义在include\rtdef.h中,代码如下所示:

#ifdef RT_USING_MESSAGEQUEUE
/**
 * message queue structure
 */
struct rt_messagequeue
{
    struct rt_ipc_object parent;                        /**< inherit from ipc_object */

    void                *msg_pool;                      /**< start address of message queue */

    rt_uint16_t          msg_size;                      /**< message size of each message */
    rt_uint16_t          max_msgs;                      /**< max number of messages */

    rt_uint16_t          entry;                         /**< index of messages in the queue */

    void                *msg_queue_head;                /**< list head */
    void                *msg_queue_tail;                /**< list tail */
    void                *msg_queue_free;                /**< pointer indicated the free node of queue */

    rt_list_t            suspend_sender_thread;         /**< sender thread suspended on this message queue */
};
typedef struct rt_messagequeue *rt_mq_t;
#endif

消息队列控制块是一个结构体,其中含有消息队列相关的重要参数,在消息队列的功能实现中起着重要的作用。消息队列控制块包含了每个消息队列的信息,如消息队列名称、存放消息的消息池起始地址、每条消息的长度以及队列中已有的消息个数和能够容纳的最大消息数量等。rt_messagequeue对象从rt_ipc_object中派生,由IPC容器所管理。

消息队列相关接口函数

对一个消息队列的操作包括:创建/初始化消息队列、发送消息、接收消息、删除/脱离消息队列。

使用消息队列模块的典型流程如下:

(1)消息队列创建/初始化函数rt_mq_create()/rt_mq_init()

(2)消息队列发送消息函数rt_mq_send()

(3)消息队列接收函数rt_mq_recv()

(4)消息队列删除;/脱离函数rt_mq_delete()/rt_mq_detach()

  • 创建消息队列

消息队列在使用前,应该被创建出来,或对已有的静态消息队列对象进行初始化,创建消息队列的函数接口在include\rt_thread.h中定义,代码如下:

#ifdef RT_USING_HEAP
rt_mq_t rt_mq_create(const char *name,
                     rt_size_t   msg_size,
                     rt_size_t   max_msgs,
                     rt_uint8_t  flag);
rt_err_t rt_mq_delete(rt_mq_t mq);
#endif

rt_mq_create函数的参数和返回值如下表所示:

参数 描述
name 消息队列的名称
msg_size 消息队列中一条消息的最大长度,单位为字节
max_msgs 消息队列的最大个数
flag

消息队列采用的灯带方式,可以取如下数值:

RT_IPC_FLAG_FIFO或RT_IPC_FLAG_PRIO

返回值 描述
消息队列对象的句柄 成功
RT_NULL 失败

创建消息队列时先从对象管理器中分配一个消息队列对象,然后给消息队列对象分配一块内存空间,组织成空闲消息链表,这块内存的大小如前所述,为:[消息大小+消息头(用于链表连接)的大小]*消息队列最大个数。接着再初始化消息队列,此时消息队列为空。具体代码实现在src\ipc.c中,如下所示:

rt_mq_t rt_mq_create(const char *name,
                     rt_size_t   msg_size,
                     rt_size_t   max_msgs,
                     rt_uint8_t  flag)
{
    struct rt_messagequeue *mq;
    struct rt_mq_message *head;
    register rt_base_t temp;

    RT_ASSERT((flag == RT_IPC_FLAG_FIFO) || (flag == RT_IPC_FLAG_PRIO));

    RT_DEBUG_NOT_IN_INTERRUPT;

    /* allocate object */
    mq = (rt_mq_t)rt_object_allocate(RT_Object_Class_MessageQueue, name);
    if (mq == RT_NULL)
        return mq;

    /* set parent */
    mq->parent.parent.flag = flag;

    /* initialize ipc object */
    _ipc_object_init(&(mq->parent));

    /* initialize message queue */

    /* get correct message size */
    mq->msg_size = RT_ALIGN(msg_size, RT_ALIGN_SIZE);
    mq->max_msgs = max_msgs;

    /* allocate message pool */
    mq->msg_pool = RT_KERNEL_MALLOC((mq->msg_size + sizeof(struct rt_mq_message)) * mq->max_msgs);
    if (mq->msg_pool == RT_NULL)
    {
        rt_object_delete(&(mq->parent.parent));

        return RT_NULL;
    }

    /* initialize message list */
    mq->msg_queue_head = RT_NULL;
    mq->msg_queue_tail = RT_NULL;

    /* initialize message empty list */
    mq->msg_queue_free = RT_NULL;
    for (temp = 0; temp < mq->max_msgs; temp ++)
    {
        head = (struct rt_mq_message *)((rt_uint8_t *)mq->msg_pool +
                                        temp * (mq->msg_size + sizeof(struct rt_mq_message)));
        head->next = (struct rt_mq_message *)mq->msg_queue_free;
        mq->msg_queue_free = head;
    }

    /* the initial entry is zero */
    mq->entry = 0;

    /* initialize an additional list of sender suspend thread */
    rt_list_init(&(mq->suspend_sender_thread));

    return mq;
}
RTM_EXPORT(rt_mq_create);

上边为函数概述,函数详细步骤描述如下:

(1)分配消息队列对象。调用rt_object_allocate函数将从对象系统为创建的消息队列分配一个对象,并且设置对象名称。在系统中,对象的名称必须是唯一的。

(2)设置消息队列的阻塞唤醒模式,创建的消息队列针对于指定的flag有不同的意义。使用RT_IPC_FLAG_PRIO创建的IPC对象,在多个线程等待队列资源时,优先级高的线程将优先获得资源;而使用RT_IPC_FLAG_FIFO创建的IPC对象,在多个线程等待队列资源时,将按照先到先得的顺序获得资源。RT_IPC_FLAG_PRIO和RT_IPC_FLAG_FIFO均在rtdef.h中定义,代码如下所示:

#define RT_IPC_FLAG_FIFO                0x00            /**< FIFOed IPC. @ref IPC. */
#define RT_IPC_FLAG_PRIO                0x01            /**< PRIOed IPC. @ref IPC. */

(3)初始化消息队列内核对象。此处会初始化一个链表,用于记录访问此队列而阻塞的线程,通过这个链表,可以找到对应的阻塞线程的控制块,从而能恢复线程。

(4)设置消息队列的节点大小与消息队列的最大容量。节点大小要按照RT_ALIGN_SIZE字节对齐,消息队列的容量由用户自己定义。

(5)给此消息队列分配内存。这块内存的大小为[消息大小+消息头(用于链表连接)的大小]*消息队列容量。每个消息节点中都有一个消息头,用于链表链接,指向下一个消息节点,作为消息的排序。

(6)初始化消息队列头尾链表。

(7)将所有的消息队列的节点连接起来,形成空闲链表。

(8)消息队列的个数置零。

注意:在创建消息队列时,是需要用户自己定义消息队列的句柄的,但定义了消息队列的句柄并不等于创建了队列,创建队列必须通过调用rt_mq_create函数来完成。否则,以后根据队列句柄使用消息队列的其它函数时会发生错误。在创建消息队列时会返回创建的结果,如果创建成功,则返回消息队列句柄;如果返回RT_NULL,则表示创建失败。

关于rt_mq_create函数,必须提到一个容易被忽略的关键点:struct rt_mq_message。前文已经多次提到一个概念:消息头,但是一直没有明确说明消息头具体是指什么,是什么结构,在这里详细讲解一下。上面代码中的struct rt_mq_message即指的是消息头,此结构定义于src\ipc.c中,代码如下:

/**
 * @addtogroup messagequeue
 */

/**@{*/

struct rt_mq_message
{
    struct rt_mq_message *next;
};

由代码可以看出,就是一个最基本的单链表结构。虽然很简单,但明确一下是非常有必要的。

  • 初始化消息队列

初始化消息队列的函数接口同样在include\rt_thread.h中声明,代码如下:

rt_err_t rt_mq_init(rt_mq_t     mq,
                    const char *name,
                    void       *msgpool,
                    rt_size_t   msg_size,
                    rt_size_t   pool_size,
                    rt_uint8_t  flag);

rt_mq_init函数的参数和返回值如下表所示:

参数 描述
mq 消息队列对象的句柄
name 消息队列的名称
msgpool 指向存放消息的缓冲区的指针
msg_size 消息队列中一条消息的最大长度,单位为字节
pool_size 存放消息的缓冲区大小
flag 消息队列采用的等待方式,可以取如下数值:RT_IPC_FLAG_FIFO或RT_IPC_FLAG_PRIO
返回值 描述
RT_EOK 成功

初始化静态消息队列对象跟创建消息队列对象类似,只是静态消息队列对象的内存是在系统编译时由编译器分配的,一般放于读数据段或未初始化数据段中。在使用这类静态消息队列对象前,需要进行初始化。初始化消息队列时,该接口需要用户已经申请获得的消息队列对象的句柄(即指向消息队列对象控制块的指针)、消息队列名、消息缓冲区指针、消息大小以及消息队列缓冲区大小。消息队列初始化后,所有消息都挂在空闲消息链表上,消息队列为空。具体代码实现在src\ipc.c中,如下所示:


/**
 * @brief    Initialize a static messagequeue object.
 *
 * @note     For the static messagequeue object, its memory space is allocated by the compiler during compiling,
 *           and shall placed on the read-write data segment or on the uninitialized data segment.
 *           By contrast, the rt_mq_create() function will allocate memory space automatically
 *           and initialize the messagequeue.
 *
 * @see      rt_mq_create()
 *
 * @param    mq is a pointer to the messagequeue to initialize. It is assumed that storage for
 *           the messagequeue will be allocated in your application.
 *
 * @param    name is a pointer to the name that given to the messagequeue.
 *
 * @param    msgpool is a pointer to the starting address of the memory space you allocated for
 *           the messagequeue in advance.
 *           In other words, msgpool is a pointer to the messagequeue buffer of the starting address.
 *
 * @param    msg_size is the maximum length of a message in the messagequeue (Unit: Byte).
 *
 * @param    pool_size is the size of the memory space allocated for the messagequeue in advance.
 *
 * @param    flag is the messagequeue flag, which determines the queuing way of how multiple threads wait
 *           when the messagequeue is not available.
 *           The messagequeue flag can be ONE of the following values:
 *
 *               RT_IPC_FLAG_PRIO          The pending threads will queue in order of priority.
 *
 *               RT_IPC_FLAG_FIFO          The pending threads will queue in the first-in-first-out method
 *                                         (also known as first-come-first-served (FCFS) scheduling strategy).
 *
 *               NOTE: RT_IPC_FLAG_FIFO is a non-real-time scheduling mode. It is strongly recommended to
 *               use RT_IPC_FLAG_PRIO to ensure the thread is real-time UNLESS your applications concern about
 *               the first-in-first-out principle, and you clearly understand that all threads involved in
 *               this messagequeue will become non-real-time threads.
 *
 * @return   Return the operation status. When the return value is RT_EOK, the initialization is successful.
 *           If the return value is any other values, it represents the initialization failed.
 *
 * @warning  This function can ONLY be called from threads.
 */
rt_err_t rt_mq_init(rt_mq_t     mq,
                    const char *name,
                    void       *msgpool,
                    rt_size_t   msg_size,
                    rt_size_t   pool_size,
                    rt_uint8_t  flag)
{
    struct rt_mq_message *head;
    register rt_base_t temp;

    /* parameter check */
    RT_ASSERT(mq != RT_NULL);
    RT_ASSERT((flag == RT_IPC_FLAG_FIFO) || (flag == RT_IPC_FLAG_PRIO));

    /* initialize object */
    rt_object_init(&(mq->parent.parent), RT_Object_Class_MessageQueue, name);

    /* set parent flag */
    mq->parent.parent.flag = flag;

    /* initialize ipc object */
    _ipc_object_init(&(mq->parent));

    /* set message pool */
    mq->msg_pool = msgpool;

    /* get correct message size */
    mq->msg_size = RT_ALIGN(msg_size, RT_ALIGN_SIZE);
    mq->max_msgs = pool_size / (mq->msg_size + sizeof(struct rt_mq_message));

    /* initialize message list */
    mq->msg_queue_head = RT_NULL;
    mq->msg_queue_tail = RT_NULL;

    /* initialize message empty list */
    mq->msg_queue_free = RT_NULL;
    for (temp = 0; temp < mq->max_msgs; temp ++)
    {
        head = (struct rt_mq_message *)((rt_uint8_t *)mq->msg_pool +
                                        temp * (mq->msg_size + sizeof(struct rt_mq_message)));
        head->next = (struct rt_mq_message *)mq->msg_queue_free;
        mq->msg_queue_free = head;
    }

    /* the initial entry is zero */
    mq->entry = 0;

    /* initialize an additional list of sender suspend thread */
    rt_list_init(&(mq->suspend_sender_thread));

    return RT_EOK;
}
RTM_EXPORT(rt_mq_init);

  • 删除消息队列

当消息队列不再被使用时,应该删除它以释放系统资源。一旦操作完成,消息队列将被永久性地删除。删除消息队列的函数接口同样在include\rt_thread.h中声明,代码如下:

#ifdef RT_USING_HEAP
rt_err_t rt_mq_delete(rt_mq_t mq);
#endif

 rt_mq_delete函数的参数和返回值如下表所示:

参数 描述
mq 消息队列对象的句柄
返回值 描述
RT_EOK 成功

删除消息队列时,如果有线程被挂起在该消息队列等待队列上,则内核先唤醒挂起在该消息等待队列上的所有线程(线程返回值是-RT_ERROR),然后再释放消息队列使用的内存,最后删除消息队列对象。具体代码实现在src\ipc.c中,如下所示:

/**
 * @brief    This function will delete a messagequeue object and release the memory.
 *
 * @note     This function is used to delete a messagequeue object which is created by the rt_mq_create() function.
 *           By contrast, the rt_mq_detach() function will detach a static messagequeue object.
 *           When the messagequeue is successfully deleted, it will resume all suspended threads in the messagequeue list.
 *
 * @see      rt_mq_detach()
 *
 * @param    mq is a pointer to a messagequeue object to be deleted.
 *
 * @return   Return the operation status. When the return value is RT_EOK, the operation is successful.
 *           If the return value is any other values, it means that the messagequeue detach failed.
 *
 * @warning  This function can ONLY delete a messagequeue initialized by the rt_mq_create() function.
 *           If the messagequeue is initialized by the rt_mq_init() function, you MUST NOT USE this function to delete it,
 *           ONLY USE the rt_mq_detach() function to complete the detachment.
 *           for example,the rt_mq_create() function, it cannot be called in interrupt context.
 */
rt_err_t rt_mq_delete(rt_mq_t mq)
{
    /* parameter check */
    RT_ASSERT(mq != RT_NULL);
    RT_ASSERT(rt_object_get_type(&mq->parent.parent) == RT_Object_Class_MessageQueue);
    RT_ASSERT(rt_object_is_systemobject(&mq->parent.parent) == RT_FALSE);

    RT_DEBUG_NOT_IN_INTERRUPT;

    /* resume all suspended thread */
    _ipc_list_resume_all(&(mq->parent.suspend_thread));
    /* also resume all message queue private suspended thread */
    _ipc_list_resume_all(&(mq->suspend_sender_thread));

    /* free message queue pool */
    RT_KERNEL_FREE(mq->msg_pool);

    /* delete message queue object */
    rt_object_delete(&(mq->parent.parent));

    return RT_EOK;
}
RTM_EXPORT(rt_mq_delete);

消息队列删除函数是根据消息队列句柄直接进行删除的,删除之后这个消息队列的所有信息都会被清空,而且 不能再次使用这个消息队列。需要注意的是,如果某个消息队列没有被创建,那当然也是无法被删除的。删除消息队列时会把所有由于访问此消息队列而进入阻塞态的线程从阻塞链表中删除,mq是rt_mq_delete()传入的参数,是消息队列句柄,表示的是想要删除哪个队列。

(1)检测消息队列是否被创建了,只有已经被创建了,才可以进行删除操作。

(2)调用_ipc_list_resume_all函数将所有由于访问此队列而阻塞的线程从阻塞状态中恢复过来,线程获得队列返回的错误代码。在实际情况中一般不这样使用,在删除时,应先确认所有的线程都无须再次访问此队列,并且此时没有线程被此队列阻塞,才进行删除操作。

(3)删除了消息队列,那肯定要把消息队列的内存释放出来。

(4)删除消息队列对象并且释放消息队列内核对象的内存,释放内核对象内存在rt_object_delete函数中实现。

消息队列删除函数rt_mq_delete的使用也很简单,只需传入要删除的消息队列的句柄即可。调用rt_mq_delete函数时,系统将删除这个消息队列。如果删除该消息队列时有线程正在等待消息,那么删除操作会先唤醒等待在消息队列上的线程(等待线程的返回值是-RT_ERROR)。

  • 脱离消息队列

脱离消息队列将使消息队列对象从内核对象管理器中脱离。脱离消息队列的函数接口同样在include\rt_thread.h中声明,代码如下:

rt_err_t rt_mq_detach(rt_mq_t mq);

rt_mq_detach函数的参数和返回值如下表所示:

参数 描述
mq 消息队列对象的句柄
返回值 描述
RT_EOK 成功

使用该接口后,内核先唤醒所有挂在该消息等待队列对象上的线程(线程返回值是-RT_ERROR),然后将该消息队列对象从内核对象管理器中脱离。具体代码实现在src\ipc.c中,如下所示:

/**
 * @brief    This function will detach a static messagequeue object.
 *
 * @note     This function is used to detach a static messagequeue object which is initialized by rt_mq_init() function.
 *           By contrast, the rt_mq_delete() function will delete a messagequeue object.
 *           When the messagequeue is successfully detached, it will resume all suspended threads in the messagequeue list.
 *
 * @see      rt_mq_delete()
 *
 * @param    mq is a pointer to a messagequeue object to be detached.
 *
 * @return   Return the operation status. When the return value is RT_EOK, the initialization is successful.
 *           If the return value is any other values, it means that the messagequeue detach failed.
 *
 * @warning  This function can ONLY detach a static messagequeue initialized by the rt_mq_init() function.
 *           If the messagequeue is created by the rt_mq_create() function, you MUST NOT USE this function to detach it,
 *           and ONLY USE the rt_mq_delete() function to complete the deletion.
 */
rt_err_t rt_mq_detach(rt_mq_t mq)
{
    /* parameter check */
    RT_ASSERT(mq != RT_NULL);
    RT_ASSERT(rt_object_get_type(&mq->parent.parent) == RT_Object_Class_MessageQueue);
    RT_ASSERT(rt_object_is_systemobject(&mq->parent.parent));

    /* resume all suspended thread */
    _ipc_list_resume_all(&mq->parent.suspend_thread);
    /* also resume all message queue private suspended thread */
    _ipc_list_resume_all(&(mq->suspend_sender_thread));

    /* detach message queue object */
    rt_object_detach(&(mq->parent.parent));

    return RT_EOK;
}
RTM_EXPORT(rt_mq_detach);

  • 消息队列发送消息

线程或者中断服务程序都可以给消息队列发送消息。当发送消息时,消息队列对象先从空闲消息链表上取下一个空闲消息块,把线程或者中断服务程序发送的消息内容复制到消息块上,然后把该消息块挂到消息队列的尾部。当且仅当空闲消息链表上有可用的空闲消息块时,发送者才能成功发送消息;当空闲消息链表上无可用消息块时,说明消息队列已满,此时,发送消息的线程或者中断服务程序会收到一个错误码(-RT_EFULL)。发送消息的函数接口在include\rt_thread.h中定义,代码如下:

rt_err_t rt_mq_send(rt_mq_t mq, const void *buffer, rt_size_t size);
rt_err_t rt_mq_send_wait(rt_mq_t     mq,
                         const void *buffer,
                         rt_size_t   size,
                         rt_int32_t  timeout);

rt_mq_send函数的参数和返回值如下表所示:

参数 描述
mq 消息队列对象的句柄
buffer 消息内容
size 消息大小
返回 描述
RT_EOK 成功
-RT_EFULL 消息队列已满
-RT_ERROR 失败,表示发送的消息长度大于消息队列中消息的最大长度

发送消息时,发送者需指定发送的消息队列的对象句柄(即指向消息队列控制块的指针),并且指定发送的消息内容以及消息大小。在发送一条普通消息之后,空闲消息链表上的队首消息被转移到了消息队列尾。发送消息的函数实现在src\ipc.c中,代码如下:

/**
 * @brief    This function will send a message to the messagequeue object.
 *           If there is a thread suspended on the messagequeue, the thread will be resumed.
 *
 * @note     When using this function to send a message, if the messagequeue is fully used,
 *           the current thread will wait for a timeout.
 *           By contrast, when the messagequeue is fully used, the rt_mq_send_wait() function will
 *           return an error code immediately without waiting.
 *
 * @see      rt_mq_send_wait()
 *
 * @param    mq is a pointer to the messagequeue object to be sent.
 *
 * @param    buffer is the content of the message.
 *
 * @param    size is the length of the message(Unit: Byte).
 *
 * @return   Return the operation status. When the return value is RT_EOK, the operation is successful.
 *           If the return value is any other values, it means that the messagequeue detach failed.
 *
 * @warning  This function can be called in interrupt context and thread context.
 */
rt_err_t rt_mq_send(rt_mq_t mq, const void *buffer, rt_size_t size)
{
    return rt_mq_send_wait(mq, buffer, size, 0);
}
RTM_EXPORT(rt_mq_send);

该函数实际上是rt_mq_send_wait函数的封装,rt_mq_send_wait函数也在同文件中实现,就在上边,代码如下:

/**
 * @brief    This function will send a message to the messagequeue object. If
 *           there is a thread suspended on the messagequeue, the thread will be
 *           resumed.
 *
 * @note     When using this function to send a message, if the messagequeue is
 *           fully used, the current thread will wait for a timeout. If reaching
 *           the timeout and there is still no space available, the sending
 *           thread will be resumed and an error code will be returned. By
 *           contrast, the rt_mq_send() function will return an error code
 *           immediately without waiting when the messagequeue if fully used.
 *
 * @see      rt_mq_send()
 *
 * @param    mq is a pointer to the messagequeue object to be sent.
 *
 * @param    buffer is the content of the message.
 *
 * @param    size is the length of the message(Unit: Byte).
 *
 * @param    timeout is a timeout period (unit: an OS tick).
 *
 * @return   Return the operation status. When the return value is RT_EOK, the
 *           operation is successful. If the return value is any other values,
 *           it means that the messagequeue detach failed.
 *
 * @warning  This function can be called in interrupt context and thread
 * context.
 */
rt_err_t rt_mq_send_wait(rt_mq_t     mq,
                         const void *buffer,
                         rt_size_t   size,
                         rt_int32_t  timeout)
{
    register rt_ubase_t temp;
    struct rt_mq_message *msg;
    rt_uint32_t tick_delta;
    struct rt_thread *thread;

    /* parameter check */
    RT_ASSERT(mq != RT_NULL);
    RT_ASSERT(rt_object_get_type(&mq->parent.parent) == RT_Object_Class_MessageQueue);
    RT_ASSERT(buffer != RT_NULL);
    RT_ASSERT(size != 0);

    /* current context checking */
    RT_DEBUG_SCHEDULER_AVAILABLE(timeout != 0);

    /* greater than one message size */
    if (size > mq->msg_size)
        return -RT_ERROR;

    /* initialize delta tick */
    tick_delta = 0;
    /* get current thread */
    thread = rt_thread_self();

    RT_OBJECT_HOOK_CALL(rt_object_put_hook, (&(mq->parent.parent)));

    /* disable interrupt */
    temp = rt_hw_interrupt_disable();

    /* get a free list, there must be an empty item */
    msg = (struct rt_mq_message *)mq->msg_queue_free;
    /* for non-blocking call */
    if (msg == RT_NULL && timeout == 0)
    {
        /* enable interrupt */
        rt_hw_interrupt_enable(temp);

        return -RT_EFULL;
    }

    /* message queue is full */
    while ((msg = (struct rt_mq_message *)mq->msg_queue_free) == RT_NULL)
    {
        /* reset error number in thread */
        thread->error = RT_EOK;

        /* no waiting, return timeout */
        if (timeout == 0)
        {
            /* enable interrupt */
            rt_hw_interrupt_enable(temp);

            return -RT_EFULL;
        }

        /* suspend current thread */
        _ipc_list_suspend(&(mq->suspend_sender_thread),
                            thread,
                            mq->parent.parent.flag);

        /* has waiting time, start thread timer */
        if (timeout > 0)
        {
            /* get the start tick of timer */
            tick_delta = rt_tick_get();

            RT_DEBUG_LOG(RT_DEBUG_IPC, ("mq_send_wait: start timer of thread:%s\n",
                                        thread->name));

            /* reset the timeout of thread timer and start it */
            rt_timer_control(&(thread->thread_timer),
                             RT_TIMER_CTRL_SET_TIME,
                             &timeout);
            rt_timer_start(&(thread->thread_timer));
        }

        /* enable interrupt */
        rt_hw_interrupt_enable(temp);

        /* re-schedule */
        rt_schedule();

        /* resume from suspend state */
        if (thread->error != RT_EOK)
        {
            /* return error */
            return thread->error;
        }

        /* disable interrupt */
        temp = rt_hw_interrupt_disable();

        /* if it's not waiting forever and then re-calculate timeout tick */
        if (timeout > 0)
        {
            tick_delta = rt_tick_get() - tick_delta;
            timeout -= tick_delta;
            if (timeout < 0)
                timeout = 0;
        }
    }

    /* move free list pointer */
    mq->msg_queue_free = msg->next;

    /* enable interrupt */
    rt_hw_interrupt_enable(temp);

    /* the msg is the new tailer of list, the next shall be NULL */
    msg->next = RT_NULL;
    /* copy buffer */
    rt_memcpy(msg + 1, buffer, size);

    /* disable interrupt */
    temp = rt_hw_interrupt_disable();
    /* link msg to message queue */
    if (mq->msg_queue_tail != RT_NULL)
    {
        /* if the tail exists, */
        ((struct rt_mq_message *)mq->msg_queue_tail)->next = msg;
    }

    /* set new tail */
    mq->msg_queue_tail = msg;
    /* if the head is empty, set head */
    if (mq->msg_queue_head == RT_NULL)
        mq->msg_queue_head = msg;

    if(mq->entry < RT_MQ_ENTRY_MAX)
    {
        /* increase message entry */
        mq->entry ++;
    }
    else
    {
        rt_hw_interrupt_enable(temp); /* enable interrupt */
        return -RT_EFULL; /* value overflowed */
    }

    /* resume suspended thread */
    if (!rt_list_isempty(&mq->parent.suspend_thread))
    {
        _ipc_list_resume(&(mq->parent.suspend_thread));

        /* enable interrupt */
        rt_hw_interrupt_enable(temp);

        rt_schedule();

        return RT_EOK;
    }

    /* enable interrupt */
    rt_hw_interrupt_enable(temp);

    return RT_EOK;
}
RTM_EXPORT(rt_mq_send_wait)

实际上较早一些的版本如3.0.3版本中并没有单独拎出一个rt_mq_send_wait函数,只有rt_mq_send函数,所有的实现都在其中。 

(1)在发送消息时需要传递一些参数:rt_mq_t mq是已经创建的队列句柄;void *buffer是即将发送消息的存储地址;rt_size_t size是即将发送的消息的大小。

(2)检测传递进来的参数,即使这些参数中有一个是无效的,就无法发送消息。

(3)判断消息的大小,其大小不能超过创建时设置的消息队列的大小mq->msg_size。用户可以自定义大小,如果mq->msg_size不够,可以在创建时设置得大一些。

(4)获取一个空闲链表的指针。必须有一个空闲链表节点用于存放要发送的消息,如果消息队列已经满了,则无法发送消息。

(5)移动空闲链表指针。

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/phmatthaus/article/details/123401823

智能推荐

python opencv resize函数_python opencv 等比例调整(缩放)图片分辨率大小代码 cv2.resize()...-程序员宅基地

文章浏览阅读1.3k次。# -*- coding: utf-8 -*-"""@File : 200113_等比例调整图像分辨率大小.py@Time : 2020/1/13 13:38@Author : Dontla@Email : [email protected]@Software: PyCharm"""import cv2def img_resize(image):height, width = image...._opencv小图等比例缩放

【OFDM、OOK、PPM、QAM的BER仿真】绘制不同调制方案的误码率曲线研究(Matlab代码实现)-程序员宅基地

文章浏览阅读42次。对于这些调制技术的误码率(BER)研究是非常重要的,因为它们可以帮助我们了解在不同信道条件下系统的性能表现。通过以上步骤,您可以进行OFDM、OOK、PPM和QAM的误码率仿真研究,并绘制它们的误码率曲线,以便更好地了解它们在不同信道条件下的性能特点。针对这些调制技术的BER研究是非常重要的,可以帮助我们更好地了解这些技术在不同信道条件下的性能表现,从而指导系统设计和优化。6. 分析结果:根据误码率曲线的比较,分析每种调制方案在不同信噪比条件下的性能,包括其容忍的信道条件和适用的应用场景。_ber仿真

【已解决】Vue的Element框架,日期组件(el-date-picker)的@change事件,不会触发。_el-date-picker @change不触发-程序员宅基地

文章浏览阅读2.5w次,点赞3次,收藏3次。1、场景照抄官方的实例,绑定了 myData.Age 这个值。实际选择某个日期后,从 vuetool(开发工具)看,值已经更新了,但视图未更新。2、尝试绑定另一个值: myData,可以正常的触发 @change 方法。可能是:值绑定到子对象时,组件没有侦测到。3、解决使用 @blur 代替 @change 方法。再判断下 “值有没有更新” 即可。如有更好的方法,欢迎评论!..._el-date-picker @change不触发

PCL学习:滤波—Projectlnliers投影滤波_projectinliers-程序员宅基地

文章浏览阅读1.5k次,点赞2次,收藏8次。Projectlnliersclass pcl: : Projectlnliers< PointT >类 Projectlnliers 使用一个模型和一组的内点的索引,将内点投影到模型形成新的一个独立点云。关键成员函数 void setModelType(int model) 通过用户给定的参数设置使用的模型类型 ,参数 Model 为模型类型(见 mo..._projectinliers

未处理System.BadImageFormatException”类型的未经处理的异常在 xxxxxxx.exe 中发生_“system.badimageformatexception”类型的未经处理的异常在 未知模块。 -程序员宅基地

文章浏览阅读2.4k次。“System.BadImageFormatException”类型的未经处理的异常在 xxxx.exe 中发生其他信息: 未能加载文件或程序集“xxxxxxx, Version=xxxxxx,xxxxxxx”或它的某一个依赖项。试图加载格式不正确的程序。此原因是由于 ” 目标程序的目标平台与 依赖项的目标编译平台不一致导致,把所有的项目都修改到同一目标平台下(X86、X64或AnyCPU)进行编译,一般即可解决问题“。若果以上方式不能解决,可采用如下方式:右键选择配置管理器,在这里修改平台。_“system.badimageformatexception”类型的未经处理的异常在 未知模块。 中发生

PC移植安卓---2018/04/26_电脑软件移植安卓-程序员宅基地

文章浏览阅读2.4k次。记录一下碰到的问题:1.Assetbundle加载问题: 原PC打包后的AssetBundle导入安卓工程后,加载会出问题。同时工程打包APK时,StreamingAssets中不能有中文。解决方案: (1).加入PinYinConvert类,用于将中文转换为拼音(多音字可能会出错,例如空调转换为KongDiao||阿拉伯数字不支持,如Ⅰ、Ⅱ、Ⅲ、Ⅳ(IIII)、Ⅴ、Ⅵ、Ⅶ、Ⅷ、Ⅸ、Ⅹ..._电脑软件移植安卓

随便推点

聊聊线程之run方法_start 是同步还是异步-程序员宅基地

文章浏览阅读2.4k次。话不多说参考书籍 汪文君补充知识:start是异步,run是同步,start的执行会经过JNI方法然后被任务执行调度器告知给系统内核分配时间片进行创建线程并执行,而直接调用run不经过本地方法就是普通对象执行实例方法。什么是线程?1.现在几乎百分之百的操作系统都支持多任务的执行,对计算机来说每一个人物就是一个进程(Process),在每一个进程内部至少要有一个线程实在运行中,有时线..._start 是同步还是异步

制作非缘勿扰页面特效----JQuery_单击标题“非缘勿扰”,<dd>元素中有id属性的<span>的文本(主演、导演、标签、剧情-程序员宅基地

文章浏览阅读5.3k次,点赞9次,收藏34次。我主要用了层次选择器和属性选择器可以随意选择,方便简单为主大体CSS格式 大家自行构造网页主体<body> <div class='main' > <div class='left'> <img src="images/pic.gif" /> <br/><br/> <img src="images/col.gif" alt="收藏本片"/&_单击标题“非缘勿扰”,元素中有id属性的的文本(主演、导演、标签、剧情

有了这6款浏览器插件,浏览器居然“活了”?!媳妇儿直呼“大开眼界”_浏览器插件助手-程序员宅基地

文章浏览阅读901次,点赞20次,收藏23次。浏览器是每台电脑的必装软件,去浏览器搜索资源和信息已经成为我们的日常,我媳妇儿原本也以为浏览器就是上网冲浪而已,哪有那么强大,但经过我的演示之后她惊呆了,直接给我竖起大拇指道:“原来浏览器还能这么用?大开眼界!今天来给大家介绍几款实用的浏览器插件,学会之后让你的浏览器“活过来”!_浏览器插件助手

NumPy科学数学库_数学中常用的环境有numpy-程序员宅基地

文章浏览阅读101次。NumPy是Python中最常用的科学数学计算库之一,它提供了高效的多维数组对象以及对这些数组进行操作的函数NumPy的核心是ndarray(N-dimensional array)对象,它是一个用于存储同类型数据的多维数组Numpy通常与SciPy(Scientific Python)和 Matplotlib(绘图库)一起使用,用于替代MatLabSciPy是一个开源的Python算法库和数学工具包;Matplotlib是Python语言及其Numpy的可视化操作界面'''_数学中常用的环境有numpy

dind(docker in docker)学习-程序员宅基地

文章浏览阅读1.1w次。docker in docker说白了,就是在docker容器内启动一个docker daemon,对外提供服务。优点在于:镜像和容器都在一个隔离的环境,保持操作者的干净环境。想到了再补充 :)一:低版本启动及访问启动1.12.6-dinddocker run --privileged -d --name mydocker docker:1.12.6-dind在其他容器访问d..._dind

推荐文章

热门文章

相关标签