zoukankan      html  css  js  c++  java
  • ubus

    openwrt提供了一个系统总线ubus,类似linux桌面操作系统的d-bus,目标是提供系统级的进程间通信(IPC)功能。

    为了提供各种后台进程和应用程序之间的通信机制,ubus被开发出来,由3部分组成:精灵进程,接口库和实用工具。

    工程的核心是ubusd精灵进程,它提供了一个总线层,在系统启动时运行,负责进程间的消息路由和传递。其他进程注册到ubusd进程进行消息的发送和接收。这个接口是用linux文件socket和TLV手法消息实现的。每一个进程在指定命名空间下注册自己的路径。每一个路径都可以提供带有各种参数的多个函数处理过程。函数处理过程程序可以在完成处理后返回消息。

    接口库名称为libubus.so,其他进程可以通过该动态链接库来简化对ubus总线的访问。

    实用工具ubus是提供命令行的接口调用工具。可以基于该工具来进行脚本编程,也可以使用ubus来诊断问题。

    ubus是为进程间发送消息而设计的,不适合传输大量数据(进程间传输大量数据应采用共享内存)。

    网址:http://git.openwrt.org/project/ubus.git

    $ ls ubus-2015-05-25
    cli.c           libubus.h           libubus-req.c  ubus_common.h  ubusd_id.c   ubusd_proto.c
    CMakeLists.txt  libubus-internal.h  libubus-sub.c  ubusd.c        ubusd_id.h   ubusmsg.h
    examples        libubus-io.c        lua            ubusd_event.c  ubusd_obj.c
    libubus.c       libubus-obj.c       systemd        ubusd.h        ubusd_obj.h
    ubusd: ubusd.c ubusd_id.c ubusd_obj.c ubusd_proto.c ubusd_event.c
    libubus: libubus.c libubus-io.c libubus-obj.c libubus-sub.c libubus-req.c
    cli/ubus: cli.c

     1. ubusd

    /etc/init.d/ubus中提供ubusd进程的启动,在系统进程启动完成后立即启动。他是在网络进程netifd之前启动的,该进程提供一个文件套接字接口和其他应用程序通信。

    ubusd采用SOCK_STREAM的Unix域套接字实现与服务进程间通讯。

    提供的功能:

    1)提供注册对象和方法供其他实体调用。

    2)调用其他应用程序提供的注册对象的控制接口。

    3)在特定对象上注册监听事件。

    4)向特定对象发送事件消息。

    ubus将消息处理抽象为对象object和方法method的概念。一个对象中包含多个方法。对象和方法都有自己的名字,发送请求方在消息中指定要调用的对象和方法名字即可。

    ubus的另外一个概念就是订阅(subscriber)。客户端需要向服务器注册收到特定消息时的处理方法。这样当服务器在状态发生改变时会通过ubus总线来通知客户端。

    ubus可用于两个进程之间通信,进程之间以TLV格式传递消息。ubus能够以json格式和用户进行数据交换。常见场景(应用详见ubus实现进程间通信举例):

    1)客户端/服务器模式。

    2)订阅通知模式(struct ubus_subscriber)。

    3)事件模式(ubus_event_handler)。

    ubusd是一种总线消息服务器,任何消息均通过ubusd进程传递,因此多个进程在相互通信时,均通过ubus收发消息。

    编译安装

    1)去除lua支持

    #ADD_SUBDIRECTORY(lua)

    2)编译

    mkdir build; cd build; cmake ..; make ;make install
    [ 30%] Built target ubus
    [ 60%] Built target ubusd
    [ 70%] Built target cli
    [ 85%] Built target server
    [100%] Built target client
    Install the project...
    -- Install configuration: ""
    -- Installing: /usr/local/lib/libubus.so
    -- Installing: /usr/local/bin/ubus
    -- Set runtime path of "/usr/local/bin/ubus" to ""
    -- Installing: /usr/local/sbin/ubusd
    -- Up-to-date: /usr/local/include/ubusmsg.h
    -- Up-to-date: /usr/local/include/ubus_common.h
    -- Up-to-date: /usr/local/include/libubus.h
    -- Installing: /usr/lib/systemd/system/ubus.socket
    -- Installing: /usr/lib/systemd/system/ubus.service

    2. libubus

    1) 数据结构

    struct ubus_event_handler {
        struct ubus_object obj;
        ubus_event_handler_t cb;
    };
    
    struct ubus_context {
        struct list_head requests;
        struct avl_tree objects;    /** client端object链表头 */
        struct list_head pending;
    
        struct uloop_fd sock;       /** client端sock对象 */
    
        uint32_t local_id;          /** ubusd分配的client id */
        uint16_t request_seq;
        int stack_depth;
    
    /** 断开连接回调函数 */
        void (*connection_lost)(struct ubus_context *ctx);
    
        struct {
            struct ubus_msghdr hdr;
            char data[UBUS_MAX_MSGLEN];
        } msgbuf;                   /** 通信报文 */
    };
    
    struct ubus_object_data {
        uint32_t id;
        uint32_t type_id;
        const char *path;
        struct blob_attr *signature;
    };
    
    struct ubus_request_data {
        uint32_t object;
        uint32_t peer;
        uint16_t seq;
    
        /* internal use */
        bool deferred;
        int fd;
    };
    
    struct ubus_request {
        struct list_head list;
    
        struct list_head pending;
        int status_code;
        bool status_msg;
        bool blocked;
        bool cancelled;
        bool notify;
    
        uint32_t peer;
        uint16_t seq;
    
        ubus_data_handler_t raw_data_cb;
        ubus_data_handler_t data_cb;
        ubus_fd_handler_t fd_cb;
        ubus_complete_handler_t complete_cb;
    
        struct ubus_context *ctx;
        void *priv;
    };
    
    struct ubus_notify_request {
        struct ubus_request req;
    
        ubus_notify_complete_handler_t status_cb;
        ubus_notify_complete_handler_t complete_cb;
    
        uint32_t pending;
        uint32_t id[UBUS_MAX_NOTIFY_PEERS + 1];
    };
    
    struct ubus_auto_conn {
        struct ubus_context ctx;
        struct uloop_timeout timer;
        const char *path;
        ubus_connect_handler_t cb;
    };

    2)接口函数

    * 初始化client端context结构,并连接ubusd
    struct ubus_context *ubus_connect(const char *path)
     * 与ubus_connect()函数基本功能相同,但此函数在连接断开后会自动进行重连
    void ubus_auto_connect(struct ubus_auto_conn *conn)
     * 注册新事件
    int ubus_register_event_handler(struct ubus_context *ctx, struct ubus_event_handler *ev, const char *pattern)
     * 发出事件消息
    int ubus_send_event(struct ubus_context *ctx, const char *id, struct blob_attr *data)
    
     * 向ubusd查询指定UBUS_ATTR_OBJPATH对应对象信息内容
     * 内容通过输入回调函数ubus_lookup_handler_t由调用者自行处理
    int ubus_lookup(struct ubus_context *ctx, const char *path, ubus_lookup_handler_t cb, void *priv)
    
     * 向ubusd查询指定UBUS_ATTR_OBJPATH对应的ID号
    int ubus_lookup_id(struct ubus_context *ctx, const char *path, uint32_t *id)

    * 向uloop注册fd
    static inline void ubus_add_uloop(struct ubus_context *ctx)
    {
        uloop_fd_add(&ctx->sock, ULOOP_BLOCKING | ULOOP_READ);
    }

    3. libubus-obj

    数据结构

    struct ubus_object {
        struct avl_node avl;  /** 关系到struct ubus_context的objects */
    
        const char *name;     /** UBUS_ATTR_OBJPATH */
        uint32_t id;          /** 由ubusd server分配的obj id */
    
        const char *path;
        struct ubus_object_type *type;
    
    /** 第1次被订阅或最后1次补退订时被调用 */
        ubus_state_handler_t subscribe_cb;
        bool has_subscribers;    /** 此对象是否被订阅 */
    
        const struct ubus_method *methods;  /** 方法数组 */
        int n_methods;                      /** 方法数组个数 */
    };
    
    struct ubus_object_type {
        const char *name;
        uint32_t id;            /** 由ubusd server分配的obj type id */
    
        const struct ubus_method *methods;  /** 方法数组 */
        int n_methods;                      /** 方法数组个数 */
    };
    
    struct ubus_method {
        const char *name;         /** 方法名称 */
        ubus_handler_t handler;   /** 方法处理回调函数 */
    
        unsigned long mask;                   /** 参数过滤掩码 */
        const struct blobmsg_policy *policy;  /** 参数过滤列表 */
        int n_policy;                         /** 参数过滤列表个数 */
    };
    #define UBUS_OBJECT_TYPE(_name, _methods)       
        {                       
            .name = _name,              
            .id = 0,                
            .n_methods = ARRAY_SIZE(_methods),  
            .methods = _methods         
        }
    
    #define __UBUS_METHOD_NOARG(_name, _handler)        
        .name = _name,                  
        .handler = _handler
    
    #define __UBUS_METHOD(_name, _handler, _policy)     
        __UBUS_METHOD_NOARG(_name, _handler),       
        .policy = _policy,              
        .n_policy = ARRAY_SIZE(_policy)
    
    #define UBUS_METHOD(_name, _handler, _policy)       
        { __UBUS_METHOD(_name, _handler, _policy) }
    
    #define UBUS_METHOD_MASK(_name, _handler, _policy, _mask) 
        {                       
            __UBUS_METHOD(_name, _handler, _policy),
            .mask = _mask               
        }
    
    #define UBUS_METHOD_NOARG(_name, _handler)      
        { __UBUS_METHOD_NOARG(_name, _handler) }

    接口函数

    * client端向ubusd server请求增加一个新object
    int ubus_add_object(struct ubus_context *ctx, struct ubus_object *obj)
    
     * client端向ubusd server请求删除一个object
    int ubus_remove_object(struct ubus_context *ctx, struct ubus_object *obj)
    
     * 处理收到与object相关报文
    void __hidden ubus_process_obj_msg(struct ubus_context *ctx, struct ubus_msghdr *hdr)
    
     * 处理UBUS_MSG_INVOKE报文
    static void
    ubus_process_invoke(struct ubus_context *ctx, struct ubus_msghdr *hdr,
                struct ubus_object *obj, struct blob_attr **attrbuf)
    
     * 处理UBUS_MSG_UNSUBSCRIBE报文
    static void
    ubus_process_unsubscribe(struct ubus_context *ctx, struct ubus_msghdr *hdr,
                 struct ubus_object *obj, struct blob_attr **attrbuf)
    
     * 处理UBUS_MSG_NOTIFY报文
    static void
    ubus_process_notify(struct ubus_context *ctx, struct ubus_msghdr *hdr,
                struct ubus_object *obj, struct blob_attr **attrbuf)

    4. 示例

    1) 下面看下logd中ubus操作,ubos的logd注册了一个log对象,两个方法read和write。

    static const struct ubus_method log_methods[] = { 
        { .name = "read", .handler = read_log, .policy = &read_policy, .n_policy = 1 },
        { .name = "write", .handler = write_log, .policy = &write_policy, .n_policy = 1 },
    };
    
    static struct ubus_object_type log_object_type =
        UBUS_OBJECT_TYPE("log", log_methods);
    
    static struct ubus_object log_object = { 
        .name = "log",
        .type = &log_object_type,
        .methods = log_methods,
        .n_methods = ARRAY_SIZE(log_methods),
    };
    static const struct blobmsg_policy read_policy =
        { .name = "lines", .type = BLOBMSG_TYPE_INT32 };
    
    static const struct blobmsg_policy write_policy =
        { .name = "event", .type = BLOBMSG_TYPE_STRING };

    看下两个方法的处理函数(根据policy解析blobmsg):

    static int
    read_log(struct ubus_context *ctx, struct ubus_object *obj,
            struct ubus_request_data *req, const char *method,
            struct blob_attr *msg)
    {
        struct client *cl;
        struct blob_attr *tb;
        struct log_head *l;
        int count = 0;
        int fds[2];
        int ret;
    
        if (msg) {   // 解析blobmsg
            blobmsg_parse(&read_policy, 1, &tb, blob_data(msg), blob_len(msg));
            if (tb)
                count = blobmsg_get_u32(tb);
        }
    // 具体业务处理
        if (pipe(fds) == -1) {
            fprintf(stderr, "logd: failed to create pipe: %s
    ", strerror(errno));
            return -1;
        }
        ubus_request_set_fd(ctx, req, fds[0]);
        cl = calloc(1, sizeof(*cl));
        cl->s.stream.notify_state = client_notify_state;
        cl->fd = fds[1];
        ustream_fd_init(&cl->s, cl->fd);
        list_add(&cl->list, &clients);
        l = log_list(count, NULL);
        while ((!tb || count) && l) {
            blob_buf_init(&b, 0);
            blobmsg_add_string(&b, "msg", l->data);
            blobmsg_add_u32(&b, "id", l->id);
            blobmsg_add_u32(&b, "priority", l->priority);
            blobmsg_add_u32(&b, "source", l->source);
            blobmsg_add_u64(&b, "time", l->ts.tv_sec * 1000LL);
            l = log_list(count, l);
            ret = ustream_write(&cl->s.stream, (void *) b.head, blob_len(b.head) + sizeof(struct blob_attr), false);
            blob_buf_free(&b);
            if (ret < 0)
                break;
        }
        return 0;
    }
    static int
    write_log(struct ubus_context *ctx, struct ubus_object *obj,
            struct ubus_request_data *req, const char *method,
            struct blob_attr *msg)
    {
        struct blob_attr *tb;
        char *event;
    
        if (msg) {  //解析blobmsg
            blobmsg_parse(&write_policy, 1, &tb, blob_data(msg), blob_len(msg));
            if (tb) {
                event = blobmsg_get_string(tb);
                log_add(event, strlen(event) + 1, SOURCE_SYSLOG);
            }
        }
    
        return 0;
    }

    上层调度

    static struct ubus_auto_conn conn;
    
    uloop_init(); conn.cb
    = ubus_connect_handler; ubus_auto_connect(&conn);
    uloop_run();
    static void ubus_connect_handler(struct ubus_context *ctx) { int ret; ret = ubus_add_object(ctx, &log_object); if (ret) { fprintf(stderr, "Failed to add object: %s ", ubus_strerror(ret)); exit(1); } fprintf(stderr, "log: connected to ubus "); }

    logd是守护进程,永不退出,所以没必要remove object。

    2) 客户端/服务器模式

    /*
    ubus_invoke()的声明如下:
    int ubus_invoke(struct ubus_context *ctx, uint32_t obj, const char *method,
                    struct blob_attr *msg, ubus_data_handler_t cb, void *priv, int timeout);
                    
    其中cb参数是消息回调函数,其函数类型定义为:
    typedef void (*ubus_data_handler_t)(struct ubus_request *req,
                        int type, struct blob_attr *msg);
    参数type是请求消息的类型,如UBUS_MSG_INVOKE,UBUS_MSG_DATA等。
    参数msg存放从服务端得到的回复消息,struct blob_attr类型,同样用blobmsg_parse()来解析。
    参数req保存了请求方的消息属性,其中req->priv即ubus_invoke()中的priv参数,
    用这个参数可以零活的传递一些额外的数据。
    */
    // ser.c
    #include <stdio.h>
    #include <unistd.h>
    #include <libubox/uloop.h>
    #include <libubox/ustream.h>
    #include <libubox/utils.h>
    #include <libubox/blobmsg_json.h>
    #include <libubus.h>
    
    static int stu_no = 0;
    static char *ubus_socket = NULL;
    struct ubus_context *ctx = NULL;
    static struct blob_buf b;
    
    enum {
        STU_NO,
        __STU_MAX
    };
    
    static const struct blobmsg_policy stu_policy[__STU_MAX] ={
     [STU_NO] = {.name = "no", .type = BLOBMSG_TYPE_INT32 }
    };
    
    static int stu_add(struct ubus_context *ctx, struct ubus_object *obj,
                    struct ubus_request_data *req, const char *method,
                    struct blob_attr *msg)
    {
        struct blob_attr *tb[__STU_MAX];
    
        blobmsg_parse(stu_policy, ARRAY_SIZE(stu_policy), tb, blob_data(msg), blob_len(msg));
    
        if(tb[STU_NO])
            stu_no += blobmsg_get_u32(tb[STU_NO]);
    
        blob_buf_init(&b, 0);
        blobmsg_add_u32(&b, "no", stu_no);
        ubus_send_reply(ctx, req, b.head);
    
        return 0;
    }
    
    static int stu_sub(struct ubus_context *ctx, struct ubus_object *obj,
                    struct ubus_request_data *req, const char *method,
                    struct blob_attr *msg)
    {
        struct blob_attr *tb[__STU_MAX];
    
        blobmsg_parse(stu_policy, ARRAY_SIZE(stu_policy), tb, blob_data(msg), blob_len(msg));
    
        if(tb[STU_NO])
            stu_no -= blobmsg_get_u32(tb[STU_NO]);
    
        blob_buf_init(&b, 0);
        blobmsg_add_u32(&b, "no", stu_no);
        ubus_send_reply(ctx, req, b.head);
    
        return 0;
    }
    static const struct ubus_method stu_methods[] = {
    /*
        { .name = "add", .handler = stu_add, .policy = stu_policy, .n_policy = 1 },
        { .name = "sub", .handler = stu_sub, .policy = stu_policy, .n_policy = 1 }
    */
        UBUS_METHOD("add", stu_add, stu_policy),
        UBUS_METHOD("sub", stu_sub, stu_policy),
    };
    
    static struct ubus_object_type stu_object_type =
        UBUS_OBJECT_TYPE("stu", stu_methods);
    
    static struct ubus_object stu_object = {
        .name = "stu",
        .type = &stu_object_type,
        .methods = stu_methods,
        .n_methods = ARRAY_SIZE(stu_methods)
    };
    
    static void ubus_add_fd(void)
    {
        ubus_add_uloop(ctx);
    
    #ifdef FD_CLOEXEC
        fcntl(ctx->sock.fd, F_SETFD,
            fcntl(ctx->sock.fd, F_GETFD) | FD_CLOEXEC);
    #endif
    }
    
    static void ubus_reconn_timer(struct uloop_timeout *timeout)
    {
        static struct uloop_timeout retry =
        { .cb = ubus_reconn_timer,};
        int t = 2;
    
        if (ubus_reconnect(ctx, ubus_socket) != 0) {
            uloop_timeout_set(&retry, t * 1000);
            return;
        }
    
        ubus_add_fd();
    }
    
    static void ubus_connection_lost(struct ubus_context *ctx)
    {
        ubus_reconn_timer(NULL);
    }
    int main(int argc, char **argv)
    {
        char ch;
        int ret = 0;
    
        while((ch = getopt(argc, argv, "s:"))!= -1){
            switch(ch){
                case 's':
                    ubus_socket = optarg;
                    break;
                default:  /* ? */
                    break;
            }
        }
    
        uloop_init();
    
        ctx = ubus_connect(ubus_socket);
        if(!ctx){
            printf("ubus connect error.
    ");
            return -1;
        }
        ctx->connection_lost = ubus_connection_lost;
    
        ret = ubus_add_object(ctx, &stu_object);
        if(ret){
            printf("Failed to add object to ubus:%s
    ", ubus_strerror(ret));
            return 0;
        }
    
        //ubus_add_uloop(ctx);
        ubus_add_fd();
        uloop_run();
    
        if(ctx)
            ubus_free(ctx);
    
        uloop_done();
        return 0;
    }
    #include <stdio.h>
    #include <unistd.h>
    #include <libubox/uloop.h>
    #include <libubox/ustream.h>
    #include <libubox/utils.h>
    #include <libubox/blobmsg_json.h>
    #include <libubus.h>
    
    static struct blob_buf b;
    
    static void receive_call_result_data(struct ubus_request *req, int type, struct blob_attr *msg)
    {
        char *str;
        if(!msg)
            return;
        
        str = blobmsg_format_json_indent(msg, true, 0); 
        printf("%s
    ", str);
    
        free(str);
    }
    
    static int client_main(struct ubus_context *ctx, int argc, char**argv)
    {
        uint32_t id; 
        int ret;
    
        printf("argc =%d, argv[0] =%s
    ", argc, argv[0]);
        if(argc != 3)
            return -2; 
    
        blob_buf_init(&b, 0); 
        if(!blobmsg_add_json_from_string(&b, argv[2])){
            printf("Failed to parse message data
    ");
            return -1; 
        }   
    
        ret = ubus_lookup_id(ctx, argv[0], &id);
        if(ret)
            return ret;
    
        return ubus_invoke(ctx, id, argv[1], b.head, receive_call_result_data, NULL, 30*1000);
    }
    
    int main(int argc, char **argv)
    {
        struct ubus_context *ctx = NULL;
        char ch;
        char *ubus_socket = NULL;
        int ret = 0;
    
        while((ch = getopt(argc, argv, "s:"))!= -1){
            switch(ch){
                case 's':
                    ubus_socket = optarg;
                    break;
                default:  /* ? */
                    break;
            }
        }
    
        argc -= optind;
        argv += optind;
    
        ctx = ubus_connect(ubus_socket);
        if(!ctx){
            printf("ubus connect error.
    ");
            return -1;
        }
    
        ret = client_main(ctx, argc, argv);
        if(ret < 0)
            return ret;
    
        if(ctx)
            ubus_free(ctx);
    
        return 0;
    }

     编译:

    gcc cli.c -Wall -lubox -lubus -lblobmsg_json -o cli
    gcc ser.c -Wall -lubox -lubus -lblobmsg_json -o ser

    执行:

    ubuntu:~/test/ubus/test$ sudo ./cli stu add '{"no":20}'
    argc =3, argv[0] =stu
    {
        "no": 20
    }
    ubuntu:~/test/ubus/test$ sudo ./cli stu add '{"no":20}'
    argc =3, argv[0] =stu
    {
        "no": 40
    }
    ubuntu:~/test/ubus/test$ sudo ./cli stu add '{"no":20}'
    argc =3, argv[0] =stu
    {
        "no": 60
    }

    参考文档:

    openwrt中使用ubus实现进程通信

    ubus实现进程间通信举例

    openwrt ubus简介以及libubus开发说明

    ubus [1] - ubusd

    ubus [2] - libubus

    ubus [3] - cli

    在 Linux 上实现基于 Socket 的多进程实时通信 IBM

  • 相关阅读:
    JMeter基础篇--录制web脚本(四)
    jmeter的基础使用(二)
    jmeter安装教程(一)
    delete用法(删除表内容)
    update用法(修改表内容)
    IPy对网段的处理
    netmiko
    读写conf文件
    读写json文件
    excel及数据处理
  • 原文地址:https://www.cnblogs.com/embedded-linux/p/6791560.html
Copyright © 2011-2022 走看看