zoukankan      html  css  js  c++  java
  • OpenvSwitch2.4.0源码解读

    原文发表在我的博客主页,转载请注明出处!

    一.前言

    OpenvSwitch,虚拟交换机,以下简称OVS,是云计算和SDN领域非常重要的一个开源交换机,如果需要深入研究云计算和SDN的数据平面,读懂OVS的源码是非常重要的,现有的关于OVS的资料都是OpenvSwitch2.3.*版本的,而ubuntu14.04已经问世好久,其支持OVS2.4.0+版本的源码分析却没有找见。本文参考了大量的资料,从一个初学者的角度出发(侧重于OpenFlow协议的实现),对OVS2.4.0源码按照数据流程进行简单的分析。

    二.概述

    关于OVS的概述可以参见我的另一篇博客
    在阅读代码的时候,推荐Source InsightSublime Text 3
    常用修改建议:
    在工作中一般在这几个地方来修改内核代码以达到自己的目的:第一个是datapath.c中的ovs_dp_process_received_packet(struct vportp, struct sk_buffskb)函数内添加相应的代码来达到自己的目的,因为对于每个数据包来说这个函数都是必经之地;第二个就是自己去设计自己的流表了;第三个和第二个是相关联的,就是根据流表来设计自己的action,完成自己想要的功能。
    OpenFlow修改建议:
    主要关注ofproto中的文件,如ofproto.c和connmgr.c文件,其中ofproto.c中的handle_openflow函数是做SDN相关工作的主要修改的地方。

    三.源码分析

    • 从main函数开始
    int
    main(int argc, char *argv[])
    {
        char *unixctl_path = NULL;
        struct unixctl_server *unixctl;
        char *remote;
        bool exiting;
        int retval;
    
        set_program_name(argv[0]);         //设置程序名称、版本、编译日期等信息
        retval = dpdk_init(argc,argv);
        if (retval < 0) {
            return retval;
        }
    
        argc -= retval;
        argv += retval;
    
        ovs_cmdl_proctitle_init(argc, argv);              //复制出输入的参数列表到新的存储中,让argv指向这块内存,主要是为了后面的proctitle_set()函数(在deamonize_start()->monitor_daemon()中调用,可能修改原argv存储)做准备
        service_start(&argc, &argv);
        remote = parse_options(argc, argv, &unixctl_path);    //解析参数,其中unixctl_path存储unixctrl域的sock名,作为接收外部控制命令的渠道;而remote存储连接到ovsdb的信息,即连接到配置数据库的sock名
        fatal_ignore_sigpipe();                   //忽略pipe读信号的结束
        ovsrec_init();                            //数据表结构初始化,包括13张数据表
    
        daemonize_start();                        //让进程变为守护程序
    
        if (want_mlockall) {
    #ifdef HAVE_MLOCKALL
            if (mlockall(MCL_CURRENT | MCL_FUTURE)) {
                VLOG_ERR("mlockall failed: %s", ovs_strerror(errno));
            }
    #else
            VLOG_ERR("mlockall not supported on this system");
    #endif
        }
    
        retval = unixctl_server_create(unixctl_path, &unixctl);     //创建一个unixctl server(存放unixctl),并监听在////unixctl_path指定的punix路径
        if (retval) {
            exit(EXIT_FAILURE);
        }
        unixctl_command_register("exit", "", 0, 0, ovs_vswitchd_exit, &exiting);   //注册unixctl命令
    
        bridge_init(remote);                           //读取数据库做一些初始化工作
        free(remote);
    
        exiting = false;
        while (!exiting) {
            memory_run();
            if (memory_should_report()) {
                struct simap usage;
    
                simap_init(&usage);
                bridge_get_memory_usage(&usage);
                memory_report(&usage);
                simap_destroy(&usage);
            }
            bridge_run();
            unixctl_server_run(unixctl);      //从unixctl指定的server中获取数据,并执行对应的配置命令
            netdev_run();					  //执行在netdev_classes上定义的每个netdev_classs实体,调用他们的run()
    
            memory_wait();
            bridge_wait();
            unixctl_server_wait(unixctl);
            netdev_wait();
            if (exiting) {
                poll_immediate_wake();
            }
            poll_block();                  //阻塞,直到之前被poll_fd_wait()注册过的事件发生,或者等待时间超过
            if (should_service_stop()) {
                exiting = true;
            }
        }
        bridge_exit();
        unixctl_server_destroy(unixctl);
        service_stop();
    
        return 0;
    }
    
    • 进入bridge_run()函数,这个函数在Bridge.c文件中,ofproto_class类型在ofproto_classes[]变量中声明。而ofproto_classes[]变量是通过ofproto_init()函数来初始化的,在ofproto.c文件中,继续调用ofproto_class_register()函数,初始化之后仅含有一个变量——ofproto_dpif_class。而这个类定义在ofproto-dpif.c文件中,声明了各个变量和操作函数。
    void
    bridge_run(void)
    {
        static struct ovsrec_open_vswitch null_cfg;
        const struct ovsrec_open_vswitch *cfg;
    
        bool vlan_splinters_changed;
    
        ovsrec_open_vswitch_init(&null_cfg);
    
        ovsdb_idl_run(idl);
    
        if (ovsdb_idl_is_lock_contended(idl)) {
            static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 1);
            struct bridge *br, *next_br;
    
            VLOG_ERR_RL(&rl, "another ovs-vswitchd process is running, "
                        "disabling this process (pid %ld) until it goes away",
                        (long int) getpid());
    
            HMAP_FOR_EACH_SAFE (br, next_br, node, &all_bridges) {
                bridge_destroy(br);
            }
            /* Since we will not be running system_stats_run() in this process
             * with the current situation of multiple ovs-vswitchd daemons,
             * disable system stats collection. */
            system_stats_enable(false);
            return;
        } else if (!ovsdb_idl_has_lock(idl)
                   || !ovsdb_idl_has_ever_connected(idl)) {
            /* Returns if not holding the lock or not done retrieving db
             * contents. */
            return;
        }
        cfg = ovsrec_open_vswitch_first(idl);
    
        /* Initialize the ofproto library.  This only needs to run once, but
         * it must be done after the configuration is set.  If the
         * initialization has already occurred, bridge_init_ofproto()
         * returns immediately. */
        bridge_init_ofproto(cfg);
    
        /* Once the value of flow-restore-wait is false, we no longer should
         * check its value from the database. */
        if (cfg && ofproto_get_flow_restore_wait()) {
            ofproto_set_flow_restore_wait(smap_get_bool(&cfg->other_config,
                                            "flow-restore-wait", false));
        }
    
        bridge_run__();
    
        /* Re-configure SSL.  We do this on every trip through the main loop,
         * instead of just when the database changes, because the contents of the
         * key and certificate files can change without the database changing.
         *
         * We do this before bridge_reconfigure() because that function might
         * initiate SSL connections and thus requires SSL to be configured. */
        if (cfg && cfg->ssl) {
            const struct ovsrec_ssl *ssl = cfg->ssl;
    
            stream_ssl_set_key_and_cert(ssl->private_key, ssl->certificate);
            stream_ssl_set_ca_cert_file(ssl->ca_cert, ssl->bootstrap_ca_cert);
        }
    
        /* If VLAN splinters are in use, then we need to reconfigure if VLAN
         * usage has changed. */
        vlan_splinters_changed = false;
        if (vlan_splinters_enabled_anywhere) {
            struct bridge *br;
    
            HMAP_FOR_EACH (br, node, &all_bridges) {
                if (ofproto_has_vlan_usage_changed(br->ofproto)) {
                    vlan_splinters_changed = true;
                    break;
                }
            }
        }
    
        if (ovsdb_idl_get_seqno(idl) != idl_seqno || vlan_splinters_changed) {
            struct ovsdb_idl_txn *txn;
    
            idl_seqno = ovsdb_idl_get_seqno(idl);
            txn = ovsdb_idl_txn_create(idl);
            bridge_reconfigure(cfg ? cfg : &null_cfg);
    
            if (cfg) {
                ovsrec_open_vswitch_set_cur_cfg(cfg, cfg->next_cfg);
                discover_types(cfg);
            }
    
            /* If we are completing our initial configuration for this run
             * of ovs-vswitchd, then keep the transaction around to monitor
             * it for completion. */
            if (initial_config_done) {
                /* Always sets the 'status_txn_try_again' to check again,
                 * in case that this transaction fails. */
                status_txn_try_again = true;
                ovsdb_idl_txn_commit(txn);
                ovsdb_idl_txn_destroy(txn);
            } else {
                initial_config_done = true;
                daemonize_txn = txn;
            }
        }
    
        if (daemonize_txn) {
            enum ovsdb_idl_txn_status status = ovsdb_idl_txn_commit(daemonize_txn);
            if (status != TXN_INCOMPLETE) {
                ovsdb_idl_txn_destroy(daemonize_txn);
                daemonize_txn = NULL;
    
                /* ovs-vswitchd has completed initialization, so allow the
                 * process that forked us to exit successfully. */
                daemonize_complete();
    
                vlog_enable_async();
    
                VLOG_INFO_ONCE("%s (Open vSwitch) %s", program_name, VERSION);
            }
        }
    
        run_stats_update();
        run_status_update();
        run_system_stats();
    }
    
    • 继续调用bridge_run__函数,在里面先是调用了ofproto_type_run(type)函数,接着调用了ofproto_run(br->ofproto)函数,接下来一个一个看
    static void
    bridge_run__(void)
    {
        struct bridge *br;
        struct sset types;
        const char *type;
    
        /* Let each datapath type do the work that it needs to do. */
        sset_init(&types);
        ofproto_enumerate_types(&types);
        SSET_FOR_EACH (type, &types) {
            ofproto_type_run(type);
        }
        sset_destroy(&types);
    
        /* Let each bridge do the work that it needs to do. */
        HMAP_FOR_EACH (br, node, &all_bridges) {
            ofproto_run(br->ofproto);                                  //处理all_bridge上的每个bridge
        }
    }
    
    • 先看ofproto_type_run(type)函数,调用type_run()函数,这个函数来自于ofproto_dpif.c文件中的type_run()函数,在这个函数中,如果上层同意接收数据,则调用udpif_set_threads(backer->dpif, n_handlers, n_revalidators);通知udpif它需要多少个线程去处理upcalls。接着会调用udpif_start_threads(udpif, n_handlers, n_revalidators),继续调用udpif_upcall_handler(),这个处理线程从dpif(datapath interface)upcalls,对其进行处理,然后安装相应的流表,然后继续调用recv_upcalls(handler)函数,在这个函数中会调用process_upcall()函数来处理upcall。
    • ofproto_run()函数在ofproto.c文件中,里面调用了ofproto_class->run(p),根据前面的分析,这个调用了ofproto-dpif.c文件中的ofproto_dpif_class的run,他还调用了connmgr_run(p->connmgr, handle_openflow)函数来处理来自控制器的OpenFlow消息:
    int
    ofproto_run(struct ofproto *p)
    {
        int error;
        uint64_t new_seq;
    
        error = p->ofproto_class->run(p);
        if (error && error != EAGAIN) {
            VLOG_ERR_RL(&rl, "%s: run failed (%s)", p->name, ovs_strerror(error));
        }
    
        run_rule_executes(p);
    
        /* Restore the eviction group heap invariant occasionally. */
        if (p->eviction_group_timer < time_msec()) {
            size_t i;
    
            p->eviction_group_timer = time_msec() + 1000;
    
            for (i = 0; i < p->n_tables; i++) {
                struct oftable *table = &p->tables[i];
                struct eviction_group *evg;
                struct rule *rule;
    
                if (!table->eviction_fields) {
                    continue;
                }
    
                if (table->n_flows > 100000) {
                    static struct vlog_rate_limit count_rl =
                        VLOG_RATE_LIMIT_INIT(1, 1);
                    VLOG_WARN_RL(&count_rl, "Table %"PRIuSIZE" has an excessive"
                                 " number of rules: %d", i, table->n_flows);
                }
    
                ovs_mutex_lock(&ofproto_mutex);
                CLS_FOR_EACH (rule, cr, &table->cls) {
                    if (rule->idle_timeout || rule->hard_timeout) {
                        if (!rule->eviction_group) {
                            eviction_group_add_rule(rule);
                        } else {
                            heap_raw_change(&rule->evg_node,
                                            rule_eviction_priority(p, rule));
                        }
                    }
                }
    
                HEAP_FOR_EACH (evg, size_node, &table->eviction_groups_by_size) {
                    heap_rebuild(&evg->rules);
                }
                ovs_mutex_unlock(&ofproto_mutex);
            }
        }
    
        if (p->ofproto_class->port_poll) {
            char *devname;
    
            while ((error = p->ofproto_class->port_poll(p, &devname)) != EAGAIN) {
                process_port_change(p, error, devname);
            }
        }
    
        new_seq = seq_read(connectivity_seq_get());
        if (new_seq != p->change_seq) {
            struct sset devnames;
            const char *devname;
            struct ofport *ofport;
    
            /* Update OpenFlow port status for any port whose netdev has changed.
             *
             * Refreshing a given 'ofport' can cause an arbitrary ofport to be
             * destroyed, so it's not safe to update ports directly from the
             * HMAP_FOR_EACH loop, or even to use HMAP_FOR_EACH_SAFE.  Instead, we
             * need this two-phase approach. */
            sset_init(&devnames);
            HMAP_FOR_EACH (ofport, hmap_node, &p->ports) {
                uint64_t port_change_seq;
    
                port_change_seq = netdev_get_change_seq(ofport->netdev);
                if (ofport->change_seq != port_change_seq) {
                    ofport->change_seq = port_change_seq;
                    sset_add(&devnames, netdev_get_name(ofport->netdev));
                }
            }
            SSET_FOR_EACH (devname, &devnames) {
                update_port(p, devname);
            }
            sset_destroy(&devnames);
    
            p->change_seq = new_seq;
        }
    
        connmgr_run(p->connmgr, handle_openflow);
    
        return error;
    }
    
    • 上面函数调用ofproto-dpif.c中的run函数
      在run()函数中,会调用connmgr_send_packet_in()函数给每个控制器发送OFPT_PACKET_IN消息,这个函数调用schedule_packet_in()函数进行发包调度。
      可选调用netflow_run()和sflow_run()函数,进行对netflow和sflow的支持

    • 在ofproto_run()函数后面会调用connmgr_run()函数,之后调用ofconn_run函数,然后在这个函数里面,rconn_run()负责连接控制器;rconn_recv()函数负责从控制器接收数据,handle_openflow()函数负责处理从控制器得到的消息(这个函数在ofproto.c文件中)

    • 最后回到ovs-vswitchd.c文件中
      unixctl_server_run(unixctl):从unixctl指定的server中获取数据,并执行对应的配置命令
      netdev_run():执行在netdev_classes上定义的每个netdev_class实体,调用它们的run()。
      接着进行循环等待事件处理,包括memory, bridge, unixctl_server, netdev这些被poll_fd_wait()注册过的事件
      poll_block:阻塞,直到之前被poll_fd_wait()注册过的事件发生,或者等待时间超过poll_timer_wait()注册的最短时间
      退出bridge,关闭unixctl连接,取消信号的处理

    四.总结

    前面从初学者的角度,按照数据包流向,对OVS2.4.0源码进行了分析。对于研究SDN的人来说,ofproto模块是非常重要的,可以进一步详细阅读其源码。

  • 相关阅读:
    MySQL第40天------约束、表与表之间的关系
    MySQL第39天------数据存储引擎、创建表的完整语句、数据类型
    MySQL数据库第38天
    python全栈脱产第37天------进程池与线程池、协程、gevent模块、单线程下实现并发的套接字通信
    python全栈开发第36天------GIL全局解释锁、死锁现象和递归锁、信号量、Event事件、线程
    项目启动自动执行方法的几种方式
    请求类型分发处理机制
    集群中机器本地缓存同步实现机制:redis的发布订阅机制
    Spring事件发布与监听机制ApplicationEventPublisher,EventListener
    一种异步消费kafka消息的实现机制
  • 原文地址:https://www.cnblogs.com/cotyb/p/5103035.html
Copyright © 2011-2022 走看看