PX4-3-uORB

uORB(Micro Object Request Broker,微对象请求代理器)是PX4中非常重要且关键的一个模块,用于各个模块之间的数据交互。实际上uORB是一套跨「进程」 的IPC通讯模块。在PX4中, 所有的功能被独立以进程模块为单位进行实现并工作。而进程间的数据交互就尤为重要,必须要实时高效。

uORB实际上是多个进程通过共享内存的方式,进行数据交互和共享。进程通过命名的「总线」交换的消息称之为「topic」(topic),在PX4中,一个topic仅包含一种消息类型。每个进程可以「订阅」或者「发布」topic,可以存在多个发布者,或者一个进程可以订阅多个topic。

由于PX4代码更新很快,uORB模块在各个版本中可能有所不同,如最新发布的1.12版本就有比较大的变化,将uORB模块由modules中移动到了plorforms/common中,uORB的定位有所改变,上升到了系统基础组件,不过这里还是以1.11.3版本这个去年发布的稳定版本为例讲解。

这一篇的内容比较长,需要花一些时间阅读,建议可以先大致阅读,如果对uORB的机制感兴趣可以收藏一下,后面结合实际代码阅读。

接口

uORB的外部接口提供了C和C++两种接口,应用程序可以使用C编写也可以使用C++

C接口

C接口定义在 src/modules/uORB/uORB.h

// 向消息网络公告一个发布者,它初始化这个topic公告,在/obj目录下创建一个topic节点(如果需要)并发布初始数据
orb_advert_t orb_advertise(const struct orb_metadata *meta, const void *data, unsigned int queue_size = 1)
// 向消息网络公告一个发布者,它初始化这个topic公告,在/obj目录下创建一个topic节点(如果需要)并发布初始数据,multi可用于创建同一topic的多个独立实例(每个实例都有自己的缓冲区)。这对于发布同一topic的多个发布者非常有用。订阅人然后订阅所有实例并选择要使用的源。 
orb_advert_t orb_advertise_multi(const struct orb_metadata *meta, const void *data, int *instance, ORB_PRIO priority,unsigned int queue_size = 1);
// 取消
int orb_unadvertise(orb_advert_t handle);
// 向topic发布一个新数据
int  orb_publish(const struct orb_metadata *meta, orb_advert_t handle, const void *data);
// 订阅一个topic
int  orb_subscribe(const struct orb_metadata *meta);
// 一个topic存在多个发布者时,订阅指定实例
int  orb_subscribe_multi(const struct orb_metadata *meta, unsigned instance);
// 取消订阅一个topic
int  orb_unsubscribe(int handle);
// 检测一个topic是否有数据更新
int  orb_check(int handle, bool *updated);
// 检查一个topic是否存在并且有数据发布
int  orb_exists(const struct orb_metadata *meta, int instance);
// 返回topic的优先级
int  orb_priority(int handle, enum ORB_PRIO *priority);
// 设置订阅topic更新的最小间隔,可用于防止订阅者过快的更新数据
int  orb_set_interval(int handle, unsigned interval);
// 获取订阅topic更新的最小间隔
int	orb_get_interval(int handle, unsigned *interval);

C++接口

C++的接口就简单很多,只需要new 对应的类就可以了

// 定义消息发布者
uORB::Publication<xxx_topic_s>		xxx_topic_pub{ORB_ID(xxx_topic)};
// 定义消息订阅者
uORB::Subscription		xxx_topic_sub{ORB_ID(xxx_topic)};
// 消息发布
xxx_topic_pub..publish(data);
// 消息订阅
if (xxx_topic_sub.updated()) {
	xxx_topic_sub.copy(&data);
}

添加新的消息

添加消息非常简单,在/msg目录下创建一个新的msg文件,然后在msg/CMakeLists.txt文件中添加新增加的文件即可

内部机制

自动生成代码

PX4使用了大量自动代码生成方法,减少用户的重复性代码编辑,比如Mavlink、uavcan、param的消息创建以及部分gazebo模型文件的生成。

uORB的代码生成脚本位于 msg/tools,使用python进行代码生成。

生成的文件在 build/xxx/msg/tmp目录下可以找到,包括一个头文件目录和一个cpp文件目录,阅读创建的文件内容可以大概了解主要创建了什么内容。以airspeed文件为例,airspeed.h文件中

//定义了消息的结构体
#ifdef __cplusplus
struct __EXPORT airspeed_s {
#else
struct airspeed_s {
#endif
	uint64_t timestamp;
	float indicated_airspeed_m_s;
	float true_airspeed_m_s;
	float air_temperature_celsius;
	float confidence;
#ifdef __cplusplus
#endif
};
// 申明消息的uORB元数据,用于uORB通过ORB_ID(_name)定位消息
ORB_DECLARE(airspeed);

airspeed.cpp文件中

constexpr char __orb_airspeed_fields[] = "uint64_t timestamp;float indicated_airspeed_m_s;float true_airspeed_m_s;float air_temperature_celsius;float confidence;";
// 定义消息的uORB元数据,对应头文件中的ORB_DECLARE(airspeed);
ORB_DEFINE(airspeed, struct airspeed_s, 24, __orb_airspeed_fields, static_cast<uint8_t>(ORB_ID::airspeed));

消息发布机制

消息是如何发布到uORB的呢,我们来看看消息advertise都做了哪些事情

// 消息公告的c接口
orb_advert_t orb_advertise(const struct orb_metadata *meta, const void *data)
// ->
// 调用uORB::Manager类的消息公告函数
uORB::Manager::get_instance()->orb_advertise(meta, data);
// ->
// 多消息实例公告函数的复用接口
orb_advert_t uORB::Manager::orb_advertise_multi(const struct orb_metadata *meta, const void *data, int *instance,ORB_PRIO priority, unsigned int queue_size)
// -> 
    // open消息节点
    int fd = node_open(meta, true, instance, priority);

        // ->
        // 调用消息节点管理类uORB::DeviceMaster的消息公告接口
        ret = _device_master->advertise(meta, advertiser, instance, priority);

            // ->
            // 创建消息节点,这个节点类定义了消息的主要读写方法
    		uORB::DeviceNode *node = new uORB::DeviceNode(meta, group_tries, devpath, priority);

    // 设置消息buffer的大小
    int result = px4_ioctl(fd, ORBIOCSETQUEUESIZE, (unsigned long)queue_size);

    // 发布消息以初始化消息对象
    result = orb_publish(meta, advertiser, data);

        // ->
        // 调用消息节点类的消息发布接口
        uORB::DeviceNode::publish(meta, handle, data);

            // ->
            // 调用消息写入接口
            ret = devnode->write(nullptr, (const char *)data, meta->o_size);

                // ->
                // 申请消息对象内存
    			if (nullptr == _data) {
    				_data = new uint8_t[_meta->o_size * _queue_size];
    			}
              

我们将消息公告接口的主要调用过程整理了一下,梳理了一下类的调用关系,消息公告主要完成:

  • open消息的字符设备对象 node_open
  • 实例化消息节点对象 new uORB::DeviceNode
  • 为消息申请内存 new uint8_t[_meta->o_size * _queue_size]

类的调用层级为

uORB::Manager 
    -> uORB::DeviceMaster 
        -> uORB::DeviceNode 
            -> cdev::CDev

消息发布和订阅的机制

梳理了消息公告的过程,uORB的内部机制就有一个雏形了,其他的接口也都封装在这几个类中

消息的主要操作方法主要封装在uORB::DeviceNode类中,主要就是消息的读写接口

这里不过多展开,大家可以结合源码分析几个接口函数

消息回调函数的机制

消息订阅有两种机制,异步订阅和消息回调

  • 异步订阅

      // 订阅消息的任务在循环中判断是否消息有更新,如果有更新则处理消息
      // 这是一个非阻塞的判断
      if (xxx_topic_sub.updated()) {
      	xxx_topic_sub.copy(&data);
      }
    
  • 消息回调

    可以注册回调函数,在消息发生更新时触发回调函数,进行任务数据更新

    参考src/modules/vtol_att_control/vtol_att_control_main.h中的例子

      // 定义一个消息订阅回调类
      uORB::SubscriptionCallbackWorkItem _actuator_inputs_fw{this, ORB_ID(actuator_controls_virtual_fw)};
      // 注册消息回调函数
      _actuator_inputs_fw.registerCallback()
    

我们看看这个消息回调类都做了哪些事情

类的调用层级为

SubscriptionCallbackWorkItem 
    -> SubscriptionCallback 
        -> SubscriptionInterval

在registerCallback函数中,任务类将回调函数注册到消息节点中,在消息发生更新时会调用这个callback方法

而调用的方法为

// src/modules/uORB/SubscriptionCallback.hpp 
// SubscriptionCallbackWorkItem
void call() override
{
	// schedule immediately if updated (queue depth or subscription interval)
	if ((_required_updates == 0)
	    || (_subscription.get_node()->published_message_count() >= (_subscription.get_last_generation() + _required_updates))) {
		if (updated()) {
			_work_item->ScheduleNow();
		}
	}
}

通过 _work_item->ScheduleNow() 方法进行任务的重调度

这里涉及 px4::WorkItem 类,PX4是如何通过ScheduleNow方法调度 VtolAttitudeControl::Run()这个run函数的呢

这个我们在下一个分享 PX4的任务调度 中分析。

我的微信公众号,文章同步更新,欢迎关注。

微信公众号