zoukankan      html  css  js  c++  java
  • zebra 的Thread机制

    http://blog.csdn.net/xuyanbo2008/article/details/7439751

    =======================


    1.thread的四种创建方法

    一个新的thread可以通过如下三种方式被创建,主要是看你需要创建的thread的类型:

    1,  thread_add_read:添加一个thread到read queue,该thread负责通过socket接受和读取从client端来的数据。 

    2,  thread_add_write:添加一个thread到write queue,该thread负责通过socket向client端填充和写数据。

    3,  thread_add_timer function calls:添加一个thread到timer queue,该thread负责定时一个event,例如update和redistribute一个route table.

    4,  thread_add_event:添加一个event thread到event queue。

    上面这三个函数的处理过程都差不多:

    1,  创建thread。首先在unuse queue查找,如果有unuse thread,就使用它,否则重新分配空间。

    2,  根据参数,对thread进行赋值。

    3,  将该thread加入到相应的queue中。

    2. thread的调用

    1,bgp daemon不断地从event queue中取出thread并且执行它。一旦该thread被执行了,将该thread的type设置为unuse。并且将该thread添加到unuse queue中。

    2,如果event queue为空时,bgp daemon 通过select函数监控读、写、异常三个描述符集。一旦有某个描述符准备就绪,则将该描述符所对应的thread加入ready queue.

    而对于timer queue中的thread,只有当select函数超时后才会进入ready queue.

    3.zebrad端的thread

    zebrad启动后会,在read queue中会出现两个thread,一个是等待来自local client端bgpd的连接,另一个是等待来自vty client端的连接。

    第1个thread

    zebra_init ( )-> zebra_serv_un ( )中创建一个thread,加入read queue。该thread的处理函数为zebra_accept,监听内部client的socket。

     zebra_client_create (client_sock);创建一个新的zebra client

      /* Register myself. */

      zebra_event (ZEBRA_SERV, accept_sock, NULL);继续监听server socket

      puts("<-zebra_accept");

      return 0;

    } 

    vty_accept,加入read queue。作为vty server监听internet的vty socket.。 

    vty_create(vty_sock, &su);

    根据vty_sock和ip地址信息su,创建一个新的vty。 

      puts("<-vty_accept");

      return 0;

    } 

    vty_flush

      vty_event (VTY_READ, vty_sock, vty);

    根据new client vty的vty_sock创建新的VTY_READ thread,加入read queue,该thread的处理函数为vty_read

      return vty;

    } 

    sockunion_bind(accept_sock, &su, port, NULL);

    将accept_socket文件描述符与一个特定的逻辑网络连系起来。服务器端使用su->sin.sin_addr.s_addr = htonl (INADDR_ANY);表示接受任何一个主机网络接口上的连接请求。

      if (ret < 0)

        {

          close (accept_sock);   /* Avoid sd leak. */

          return;

        } 

      /* Listen socket under queue 3. */

      ret = listen (accept_sock, 3);

    将accept_sock套接口设置成被动监听状态,用于接受连接,只能在服务器端使用。

      if (ret < 0)

        {

          zlog (NULL, LOG_WARNING, "can't listen socket");

          close (accept_sock);   /* Avoid sd leak. */

          return;

        } 

      /* Add vty server event. */

      vty_event(VTY_SERV, accept_sock, NULL);

    } 

    sockunion_bind操作

    /* Bind socket to specified address. */

    int sockunion_bind (int sock, union sockunion *su, unsigned short port,

                  union sockunion *su_addr)

    {

      int size = 0;

      int ret; 

      if (su->sa.sa_family == AF_INET)

        {

          size = sizeof (struct sockaddr_in);

          su->sin.sin_port = htons (port);

    #ifdef HAVE_SIN_LEN

          su->sin.sin_len = size;

    #endif /* HAVE_SIN_LEN */

          if (su_addr == NULL)

           su->sin.sin_addr.s_addr = htonl (INADDR_ANY);

    服务器一般将sin_addr.s_addr 字段设置为INADDR_ANY表示套接字应接收任何一个主机网络接口上的连接请求。

    客户端将sin_addr.s_addr字段设置为服务器主机的IP地址。

        }

    #ifdef HAVE_IPV6

      else if (su->sa.sa_family == AF_INET6)

        {

          size = sizeof (struct sockaddr_in6);

          su->sin6.sin6_port = htons (port);

    #ifdef SIN6_LEN

          su->sin6.sin6_len = size;

    #endif /* SIN6_LEN */

          if (su_addr == NULL)

           {

    #if defined(LINUX_IPV6) || defined(NRL)

             bzero (&su->sin6.sin6_addr, sizeof (struct in6_addr));

    #else

             su->sin6.sin6_addr = in6addr_any;

    #endif /* LINUX_IPV6 */

           }

        }

    #endif /* HAVE_IPV6 */ 

      ret = bind (sock, (struct sockaddr *)su, size);

      if (ret < 0)

        zlog (NULL, LOG_WARNING, "can't bind socket : %s", strerror (errno)); 

      return ret;

    } 

    vty_event操作 

    /* struct thread_master *master; */

    static void vty_event (enum event event, int sock, struct vty *vty)

    {

      struct thread *vty_serv_thread;

      switch (event)

        {

        case VTY_SERV:

          vty_serv_thread = thread_add_read (master, vty_accept, vty, sock);

          vector_set_index (Vvty_serv_thread, sock, vty_serv_thread);

          break;

        case VTY_READ:

          vty->t_read = thread_add_read (master, vty_read, vty, sock);

          /* Time out treatment. */

          if (vty->v_timeout)

           {

             if (vty->t_timeout)

               thread_cancel (vty->t_timeout);

             vty->t_timeout =

               thread_add_timer (master, vty_timeout, vty, vty->v_timeout);

           }

          break;

        case VTY_WRITE:

          if (! vty->t_write)

           vty->t_write = thread_add_write (master, vty_flush, vty, sock);

          break;

        case VTY_TIMEOUT_RESET:

          if (vty->t_timeout)

           {

             thread_cancel (vty->t_timeout);

             vty->t_timeout = NULL;

           }

          if (vty->v_timeout)

           {

             vty->t_timeout =

               thread_add_timer (master, vty_timeout, vty, vty->v_timeout);

           }

          break;

        }

    } 

    第2 个thread

    bgp_serv_sock( )-> bgp_serv_sock_family( )中创建一个thread,不是通过event的方式添加的。该thread的处理函数为bgp_accept。作为bgp_server,接受来自 internet上的bgp连接。

    bgp_serv_sock_family操作

    port = 179  family = AF_INET

    /* Make bgpd's server socket. */

    void bgp_serv_sock_family (unsigned short port, int family)

    {

      int ret;

      int bgp_sock;

      union sockunion su;

      bzero (&su, sizeof (union sockunion));

      /* Specify address family. */

      su.sa.sa_family = family;

      bgp_sock = sockunion_stream_socket (&su);  产生一个BGP socket

      sockopt_reuseaddr (bgp_sock);

      sockopt_reuseport (bgp_sock);

      ret = sockunion_bind (bgp_sock, &su, port, NULL);

     

      ret = listen (bgp_sock, 3);

      if (ret < 0)

        {

          zlog (NULL, LOG_INFO, "Can't listen bgp server socket : %s",

               strerror (errno));

          return;

        } 

      thread_add_read (master, bgp_accept, NULL, bgp_sock); 添加一个thread 到readlist中。

    } 

    VTY server和BGP server 在使用accept操作的方法如下:

    他们均是通过创建一个THREAD_READ 类型的thread,加到Master的readlist的队列后面,thread 的处理函数会执行accept操作。

    VTY accept:

       vty_serv_thread = thread_add_read (master, vty_accept, vty, sock);

    BGP server accept:

      thread_add_read (master, bgp_accept, NULL, bgp_sock); 

    bgpd和zebrad间通信

    bgp和zebra是通过zebra message进行通信,格式如下:

    报文头3字节 (前两字节length,后1字节为command type)

    报文体长度不定。

     

    /* Zebra message types. */

    #define ZEBRA_INTERFACE_ADD                1

    #define ZEBRA_INTERFACE_DELETE             2

    #define ZEBRA_INTERFACE_ADDRESS_ADD        3

    #define ZEBRA_INTERFACE_ADDRESS_DELETE     4

    #define ZEBRA_INTERFACE_UP                 5

    #define ZEBRA_INTERFACE_DOWN               6

    #define ZEBRA_IPV4_ROUTE_ADD               7

    #define ZEBRA_IPV4_ROUTE_DELETE            8

    #define ZEBRA_IPV6_ROUTE_ADD               9

    #define ZEBRA_IPV6_ROUTE_DELETE           10

    #define ZEBRA_REDISTRIBUTE_ADD            11

    #define ZEBRA_REDISTRIBUTE_DELETE         12

    #define ZEBRA_REDISTRIBUTE_DEFAULT_ADD    13

    #define ZEBRA_REDISTRIBUTE_DEFAULT_DELETE 14

    #define ZEBRA_IPV4_NEXTHOP_LOOKUP         15

    #define ZEBRA_IPV6_NEXTHOP_LOOKUP         16

     

    bgpd和zebrad之间的api接口

    bgp端接受到message后,会执行相应的bgp action:

     

    bgp action func                                         message type

    int (*interface_add) (int, struct zclient *, zebra_size_t);        ZEBRA_INTERFACE_ADD

    int (*interface_delete) (int, struct zclient *, zebra_size_t);                       ZEBRA_INTERFACE_DELETE

    int (*interface_up) (int, struct zclient *, zebra_size_t);                           ZEBRA_INTERFACE_UP

    int (*interface_down) (int, struct zclient *, zebra_size_t);                       ZEBRA_INTERFACE_DOWN

    int (*interface_address_add) (int, struct zclient *, zebra_size_t);          ZEBRA_INTERFACE_ADDRESS_ADD

    int (*interface_address_delete) (int, struct zclient *, zebra_size_t);  ZEBRA_INTERFACE_ADDRESS_DELETE

    int (*ipv4_route_add) (int, struct zclient *, zebra_size_t);                   ZEBRA_IPV4_ROUTE_ADD

    int (*ipv4_route_delete) (int, struct zclient *, zebra_size_t);                  ZEBRA_IPV4_ROUTE_DELETE

    int (*ipv6_route_add) (int, struct zclient *, zebra_size_t);                    ZEBRA_IPV6_ROUTE_ADD

    int (*ipv6_route_delete) (int, struct zclient *, zebra_size_t);                 ZEBRA_IPV6_ROUTE_DELETE

     

    zebra 端接受到message后,会执行相应的zebra action:

     

    zebra action func                                                              message type

    void zread_interface_add (struct zserv *client, u_short length)                         ZEBRA_INTERFACE_ADD

    void zread_interface_delete (struct zserv *client, u_short length)                     ZEBRA_INTERFACE_DELETE

    void zread_ipv4_add (struct zserv *client, u_short length)                                    ZEBRA_IPV4_ROUTE_ADD

    void zread_ipv4_delete (struct zserv *client, u_short length)                                   ZEBRA_IPV4_ROUTE_DELETE

    void zread_ipv6_add (struct zserv *client, u_short length)                              ZEBRA_IPV6_ROUTE_ADD

    void zread_ipv6_delete (struct zserv *client, u_short length)                           ZEBRA_IPV6_ROUTE_DELETE

    void zebra_redistribute_add (int command, struct zserv *client, int length)              ZEBRA_REDISTRIBUTE_ADD

    void zebra_redistribute_delete (int command, struct zserv *client, int length)           ZEBRA_REDISTRIBUTE_DELETE

    voidzebra_redistribute_default_add (int command,

    struct zserv *client, int length)                                  ZEBRA_REDISTRIBUTE_DEFAULT_ADD

    void zebra_redistribute_default_delete (int command,

    struct zserv *client, int length)                               ZEBRA_REDISTRIBUTE_DEFAULT_DELETE

    void zread_ipv4_nexthop_lookup (struct zserv *client, u_short length)     ZEBRA_IPV4_NEXTHOP_LOOKUP

    void zread_ipv6_nexthop_lookup (struct zserv *client, u_short length)        ZEBRA_IPV6_NEXTHOP_LOOKUP

     

    bgp action:将local_client_socket中数据,写入bgp数据库。

    zebra action:将zebra数据库中的信息写入local_server_subsocket,让local client端进行读取。

    bgp peer间通信

    bgp_accept操作

    /* Accept bgp connection. */

    int bgp_accept (struct thread *thread)

    {

      int bgp_sock;

      int accept_sock;

      union sockunion su;

      struct peer *peer;

      struct peer *peer1;

      char buf[SU_ADDRSTRLEN];

     

      /* Regiser accept thread. */

      accept_sock = THREAD_FD (thread);

        printf("->bgp_accept [%d]\n",accept_sock);

      thread_add_read (master, bgp_accept, NULL, accept_sock);

     

      /* Accept client connection. */

      bgp_sock = sockunion_accept (accept_sock, &su);

      if (bgp_sock < 0)

        {

          zlog_err ("[Error] BGP socket accept failed (%s)", strerror (errno));

             printf("[Error] BGP socket accept failed (%s)", strerror (errno));

             puts("<-bgp_accept 2");

          return -1;

        }

     

      if (BGP_DEBUG (events, EVENTS))

        zlog_info ("[Event] BGP connection from host %s", inet_sutop (&su, buf));

      printf("[Event] BGP connection from host %s", inet_sutop (&su, buf));

      /* Check remote IP address */

      peer1 = peer_lookup_by_su (&su);

      if (! peer1 || peer1->status == Idle)

        {

          if (BGP_DEBUG (events, EVENTS))

           {

             if (! peer1)

               zlog_info ("[Event] BGP connection IP address %s is not configured",

                         inet_sutop (&su, buf));

             else

               zlog_info ("[Event] BGP connection IP address %s is Idle state",

                         inet_sutop (&su, buf));

           }

          close (bgp_sock);

           puts("<-bgp_accept 2");

          return -1;

        }

     

      /* Make dummy peer until read Open packet. */

      if (BGP_DEBUG (events, EVENTS))

        zlog_info ("[Event] Make dummy peer structure until read Open packet");

     printf("[Event] Make dummy peer structure until read Open packet\n");

      {

        char buf[SU_ADDRSTRLEN + 1];

     

        peer = peer_create_accept ();

        SET_FLAG (peer->sflags, PEER_STATUS_ACCEPT_PEER);

        peer->su = su;

        peer->fd = bgp_sock;

        peer->status = Active;

     

        /* Make peer's address string. */

        sockunion2str (&su, buf, SU_ADDRSTRLEN);

        peer->host = strdup (buf);

      }

     

      BGP_EVENT_ADD (peer, TCP_connection_open); 创建一个event thread执行bgp_event函数

      puts("<-bgp_accept 0");

      return 0;

    }

     

    bgp_event操作

    /* Execute event process. */

    int bgp_event (struct thread *thread)

    {

      int ret;

      int event;

      int next;

      struct peer *peer;

     

      peer = THREAD_ARG (thread); // get peer

      event = THREAD_VAL (thread); // get FSM event  eg.TCP_connection_open

      puts("->bgp_event");

      /* Logging this event. */

      next = FSM [peer->status -1][event - 1].next_state;  next为下一个状态

     

      if (BGP_DEBUG (fsm, FSM))

        plog_info (peer->log, "%s [FSM] %s (%s->%s)", peer->host,

                  bgp_event_str[event],

                  LOOKUP (bgp_status_msg, peer->status),

                  LOOKUP (bgp_status_msg, next));

      printf("%s [FSM] %s (%s->%s)", peer->host,

                  bgp_event_str[event],

                  LOOKUP (bgp_status_msg, peer->status),

                  LOOKUP (bgp_status_msg, next));

     

      /* Call function. */

      ret = (*(FSM [peer->status - 1][event - 1].func))(peer); 执行本状态处理函数

     

      /* When function do not want proceed next job return -1. */

      if (ret < 0)

          {

          puts("<-bgp_event 1");

        return ret;

          }

      /* If status is changed. */

      if (next != peer->status)               判断状态是否需要转变

        fsm_change_status (peer, next);

     

      /* Make sure timer is set. */

      bgp_timer_set (peer);

      puts("<-bgp_event 0");

      return 0;

    }


    Meet so Meet. C plusplus I-PLUS....
  • 相关阅读:
    Windows2012 cannot access netapp CIFS share
    Import SHA2 SSL cert to Windows IIS7
    IE11登陆交行网银崩溃
    Understanding and Managing SMTP Virtual Servers
    IIS SMTP Queue stuck
    C#夯实基础之多线程三:线程的优先级
    C#夯实基础之多线程二:主线程、前台线程与后台线程
    ORA-00257: archiver error. Connect internal only, until freed.
    C#夯实基础之多线程一:初识多线程
    在Oracle中恢复被DROP掉的表
  • 原文地址:https://www.cnblogs.com/iplus/p/4467311.html
Copyright © 2011-2022 走看看