RT-Thread之消息隊列使用示例
消息隊列(Message Queue)是一種異步通信機制,其核心功能是:
- 解耦生產者和消費者:發送方和接收方無需同時在線
- 緩沖與流量整形:應對瞬時流量過載
- 優先級處理:支持緊急消息插隊
典型案例:
在RT-Thread中,中斷服務程序(ISR)通過rt_mq_send非阻塞發送傳感器數據,消費者線程按需從隊列讀取數據,避免ISR因等待導致實時性下降。
一、消息隊列API函數
1、消息隊列的創建
- 動態創建消息隊列
rt_mq_t rt_mq_create(const char *name,
rt_size_t msg_size,
rt_size_t max_msgs,
rt_uint8_t flag)
- 靜態創建消息隊列
rt_err_t rt_mq_init(rt_mq_t mq,
const char *name,
void *msgpool,
rt_size_t msg_size,
rt_size_t pool_size,
rt_uint8_t flag)
2、消息的發送
發送消息時,從空閑消息鏈表取一個空閑消息塊,把消息復制到該消息塊,然后把消息塊掛到消息隊列尾部。消息的發送有以下三種:
- 直接發送消息
隊列中有空閑消息塊時,才能成功發送消息,否則返回錯誤。
rt_err_t rt_mq_send(rt_mq_t mq, const void *buffer, rt_size_t size)
- 等待發送消息
如果隊列中沒有可用的空閑消息塊,會根據timeout參數等待,超時后才返回錯誤。
rt_err_t rt_mq_send_wait(rt_mq_t mq,
const void *buffer,
rt_size_t size,
rt_int32_t timeout)
- 緊急發送消息
它會把消息塊放在消息隊列的頭部,以便這個消息能被第1時間讀取。
rt_err_t rt_mq_urgent(rt_mq_t mq, const void *buffer, rt_size_t size)
3、消息的接收
當隊列有消息時,使用收消息函數,可以從隊列接收消息;如果沒有消息,根據指定的 timeout 參數等待,直到超時結束。
rt_err_t rt_mq_recv(rt_mq_t mq,
void *buffer,
rt_size_t size,
rt_int32_t timeout)
4、消息隊列的脫離/刪除
- 刪除使用 rt_mq_create()創建的隊列
rt_err_t rt_mq_delete(rt_mq_t mq)
- 脫離使用 rt_mq_init()初始化的隊列
rt_err_t rt_mq_detach(rt_mq_t mq)
二、創建消息隊列示例
1、創建消息隊列
/* 使用靜態創建(確定內存占用) */
struct rt_messagequeue mq_static;
uint8_t msgpool[96]; /* 消息數量計算:96/(RT_ALIGN(2,4)+4)=12條消息 */
rt_mq_init(
&mq_static, /* 消息隊列對象的句柄 */
"smq", /* 消息隊列的名字 */
msgpool, /* 內存池指向的地址 */
2, /* 每一個消息占2字節 */
96, /* 內存池的大小 */
RT_IPC_FLAG_FIFO); /* 選擇線程喚醒的順序方式 */
/* 使用動態創建(靈活調整) */
rt_mq_t mq_dynamic = rt_mq_create(
"dmq", /* 消息隊列名稱 */
2, /* 每一個消息大小 */
12, /* 消息數量 */
RT_IPC_FLAG_FIFO); /* 選擇線程喚醒的順序方式 */
2、完整示例
#include "thread_task.h"
#include "main.h"
#include <stdio.h>
#include "rtthread.h"
#include <rthw.h>
/******************************************** 線程 1 ******************************************************/
#define THREAD_1_PRIORITY 4 /* 進程優先級 */
#define THREAD_1_STACK_SIZE 512 /* 進程棧空間大小 */
#define THREAD_1_TIMESLICE 10 /* 進程執行時間片個數 */
static struct rt_thread *thread_1_handle; /* 進程句柄 */
/******************************************** 線程 2 ******************************************************/
#define THREAD_2_PRIORITY 5 /* 進程優先級 */
#define THREAD_2_STACK_SIZE 512 /* 進程棧空間大小 */
#define THREAD_2_TIMESLICE 10 /* 進程執行時間片個數 */
static struct rt_thread *thread_2_handle; /* 進程句柄 */
/* 消息隊列 */
rt_mq_t mq_test;
/**
* @brief 線程1入口函數
* @param 無
* @retval 無
*/
void thread_1_entry(void* param)
{
uint8_t buffer[2];
while(1)
{
rt_mq_recv(mq_test, buffer, 2, 0xffffffff);
HAL_GPIO_TogglePin(GPIOC, LED1_Pin);
rt_thread_delay(4000); /* 精準延時1000時間片 */
}
}
/**
* @brief 線程2入口函數
* @param 無
* @retval 無
*/
void thread_2_entry(void* param)
{
uint8_t text[2] = {0,1};
while(1)
{
text[0] ++;
rt_mq_send_wait(mq_test, text, 2, 0xffff);
HAL_GPIO_TogglePin(GPIOC, LED2_Pin);
rt_thread_delay(200);
}
}
/**
* @brief 動態創建線程任務并啟動
* @param 無
* @retval 無
*/
void ThreadStart(void)
{
rt_base_t level = rt_hw_interrupt_disable();
/* 動態創建線程 */
thread_1_handle = rt_thread_create(
"thread_1", /* 線程句柄名稱*/
thread_1_entry, /* 函數入口 */
RT_NULL, /* 入口函數參數 */
THREAD_1_STACK_SIZE, /* 線程棧大小 */
THREAD_1_PRIORITY, /* 線程優先級 */
THREAD_1_TIMESLICE /* 線程時間片大小 */
);
rt_thread_startup(thread_1_handle); /* 啟動線程 */
thread_2_handle = rt_thread_create(
"thread_2", /* 線程句柄名稱*/
thread_2_entry, /* 函數入口 */
RT_NULL, /* 入口函數參數 */
THREAD_2_STACK_SIZE, /* 線程棧大小 */
THREAD_2_PRIORITY, /* 線程優先級 */
THREAD_2_TIMESLICE /* 線程時間片大小 */
);
rt_thread_startup(thread_2_handle); /* 啟動線程 */
/* 動態創建一個消息隊列 */
mq_test = rt_mq_create(
"mq_test", /* 消息隊列名稱 */
2, /* 消息大小 */
12, /* 消息數量 */
RT_IPC_FLAG_FIFO /* 選擇線程喚醒的順序方式 */
);
rt_hw_interrupt_enable(level);
}
執行流程如下:
- 初始階段
- 線程1立即阻塞等待消息(隊列空)
- 線程2開始高頻發送(每200 ticks發送1次)
- 隊列填滿階段
- 線程2發送12條消息后隊列滿 →
rt_mq_send_wait()開始阻塞 - 系統進入雙阻塞狀態(線程1和2均掛起)
- 線程2發送12條消息后隊列滿 →
- 恢復同步階段
- 線程1的rt_thread_delay(4000)到期后:
- 消費1條消息 → 釋放1個隊列空間
- 喚醒線程2繼續發送
- 穩定狀態:每4000 ticks消費1條,線程2每200 ticks補充1條
- 線程1的rt_thread_delay(4000)到期后:

浙公網安備 33010602011771號