Sullivan's BLOG

琐碎,定格,在南方

zephyr 系统服务(一) ZBUS

zephyr 系统服务(一) ZBUS

前言

最近在做了一个比较复杂的项目,需要一个软件总线作为不同的模块之间的通讯方式,最后采用了Zephyr总线(ZBUS),ZBUS是ephyr中一个自带的系统服务,这个软件总线无论是名字还是文档和例程的详细度质量都远远优于其他内容,一看就是亲儿子。

由于zephyr的历史和背景原因,很多核心内容都是向linux看齐的, 对于linux内核而言,软件总线主要体现在进程间通信,像管道、信号/信号灯、共享内存、消息队列和socket这些比较基础的,又或是xBus,dbus这样重型的总线,难以使用在RTOS上,更何况zephyr内核所含的工作队列和data passing已经足够构建基础的通信机制, 因此ZBUS的诞生就是为了解决RTOS下没有方便使用的软总线的问题。

介绍

如果之前有学习过mqtt的话,会发现ZBUS 的整个模型和mqtt是非常相似的,使用者分为发布者和观察者,观察者又分为三类:

  1. 监听者(Listeners):总线发布消息后立即触发回调函数处理信息。
  2. 订阅者(Subscriber):在自定义的线程里调用函数主动去等待信息的发布,为阻塞行为。
  3. 消息订阅者(Message subscribers):总线发布消息后会将消息的拷贝传递至fifo里,在自定义的线程里调用函数主动去查询fifo内容,为异步非阻塞行为。

使用

首先我们需要定义通讯数据格式结构体,这里我定义一个电机数据结构体motor_data_msg

1
2
3
4
5
6
struct motor_data_msg
{
uint32_t motor_id;
float speed;
float position;
};

使用ZBUS_CHAN_DEFINE定义ZBUS总线

1
2
3
4
5
6
7
ZBUS_CHAN_DEFINE(motor_chan,                                                                /* 总线名字*/
motor_data_msg, /* 总线数据类型 */
motor_data_validator, /* 验证器(在回调函数里处理信息)*/
NULL, /* User data */
ZBUS_OBSERVERS(motor_data_recv), /* 观察者 */
ZBUS_MSG_INIT(.motor_can_id = motor_id::C12B, .speed = 0, .position = 0, ) /* 初始化信息 */
);

验证器函数主要用于处理发布者发布的信息,通过返回true或false过滤无效信息,这里我假设速度大于30的为无效信息,返回false后观察者不会接收到该信息。

1
2
3
4
5
6
7
8
9
bool motor_data_validator(const void *msg, size_t msg_size)
{
motor_data_msg *cm = (motor_data_msg *)msg;
if (cm->speed > 30)
{
return false;
}
return true;
}

一般来说不会使用监听者(Listeners),因为如果需要用回调函数去处理接收数据的话还得思考回调函数与各模块之间的通信问题,相当于根本就没有解决各模块之间的通信问题,这里我们采用订阅者(Subscriber)在一个新线程(模块)里去收集数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
ZBUS_SUBSCRIBER_DEFINE(motor_data_recv, 4); // 定义并初始化订阅者,4为消息队列数量
int main()
{
const zbus_channel *chan;
motor_data_msg msg;
zbus_obs_attach_to_thread(&motor_data_recv); //设定该观察者的优先度

while (!zbus_sub_wait(&motor_data_recv, &chan, K_FOREVER)) // 使用zbus_sub_wait()函数等待消息发布
{
if (&motor_chan == chan)
{
zbus_chan_read(&motor_chan, &msg, K_NO_WAIT); // 当消息发布后使用zbus_chan_read()函数读取消息储存在msg中
LOG_INF("recv: id: %d", static_cast<int>(msg.motor_can_id));
LOG_INF("recv: speed: %f", msg.speed);
LOG_INF("recv: position: %f", msg.position);
}
}
}

相比订阅者,消息订阅者(Message subscribers)省去了zbus_chan_read这个函数,当消息发布时就被拷贝到了消息队列中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
ZBUS_MSG_SUBSCRIBER_DEFINE(motor_data_recv); // 定义并初始化消息订阅者
int main()
{
const zbus_channel *chan;
motor_data_msg msg;

zbus_obs_attach_to_thread(&motor_data_recv);//设定该观察者的优先度

while (!zbus_sub_wait_msg(&motor_data_recv, &chan, &msg, K_FOREVER))
{
if (&motor_chan == chan)
{
zbus_chan_read(&motor_chan, &msg, K_NO_WAIT); //直接储存在msg里
LOG_INF("recv: speed: %f", (double)msg.speed);
LOG_INF("recv: position: %f", (double)msg.position);
}
}
}

接下来我们定义一个测试线程每隔一秒发布一次数据

1
2
3
4
5
6
7
8
9
10
static void test_thread(void *, void *, void *)
{
while (true)
{
motor_data_msg acc1 = {.motor_can_id = motor_id::C12B, .speed = 1.0, .position = 2.0};
zbus_chan_pub(&motor_chan, &acc1, K_SECONDS(1));
k_sleep(K_SECONDS(1));
}
}
K_THREAD_DEFINE(test_thread_tid, 1024, test_thread, NULL, NULL, NULL, 10, 0, 0);

不要忘记改一下kconfig

1
2
3
4
CONFIG_ZBUS=y
CONFIG_ZBUS_LOG_LEVEL_INF=y
CONFIG_CBPRINTF_FP_SUPPORT=y
CONFIG_ZBUS_MSG_SUBSCRIBER=y

接下来可以看到输出

1
2
3
4
5
6
7
8
9
10
*** Booting Zephyr OS build v3.7.0-761-g0c7e87714d92 ***
[00:00:00.208,000] <inf> main: recv: id: 0
[00:00:00.208,000] <inf> main: recv: speed: 0.000000
[00:00:00.208,000] <inf> main: recv: position: 0.007888
[00:00:01.208,000] <inf> main: recv: id: 0
[00:00:01.208,000] <inf> main: recv: speed: 0.000000
[00:00:01.208,000] <inf> main: recv: position: 0.007888
[00:00:02.208,000] <inf> main: recv: id: 0
[00:00:02.208,000] <inf> main: recv: speed: 0.000000
[00:00:02.208,000] <inf> main: recv: position: 0.007888

注意事项

使用zbus_sub_wait一定要注意超时时间,多个观察者之间的调度会因为优先级次序出现问题,如果有多个订阅者使用了zbus_obs_attach_to_thread重新定义优先级,使用K_NO_WAIT进行读取很可能会出现其他观察者霸占着ZBUS而出现超时,使用时需仔细考虑。

参考文档.