zoukankan      html  css  js  c++  java
  • Erlang下与其他程序和语言的通信机制(3)

      这部分主要聊C Nodes。首先我们需要了解下Erlang的分布式系统:

    • 分布式Erlang:

      分布式Erlang是由一组相互通信的Erlang运行时组成,其中每一个运行时称为一个Node。不同Node上的进程通过消息通信(TCP/IP),并且Node对于进程来说是透明的。比如Pid ! Msg,调用进程不需要知道Pid是在哪个Node上。但是当使用进程的注册名进行消息投递时,如果投递的进程不在同一个Node上,这时我们需要带上Node的名称才行,因为进程注册名只是进程所属Node上的本地信息。

      启动Node】

      一个Node通过添加命令行-name 或者 -sname启动运行时来创建,创建好的Node名称为name@host格式。其中name是Node的名称标示,类似于端口号,host为Node所存在的主机名。当使用-name启动表示使用长名,也就是说host部分为完整的主机名字。当使用-sname启动表示使用短名,host部分只是主机名的第一部分。长名一般用于配有DNS的网络环境,短名则适用于DNS不可用的环境,只需要所有Node处于同一个子网。长名Node与短名Node之间是不能通信的。

    % erl -name dilbert
    (dilbert@uab.ericsson.se)1> node().
    'dilbert@uab.ericsson.se'

    %%在ubuntu13.04下调用会失败。需要手动添加主机名。
    % erl -name dilbert@host % erl -sname dilbert (dilbert@uab)1> node(). dilbert@uab

      【Node连接】

      当Node第一次调用函数与其他Node通信时,这个Node将会与这个Node建立连接,比如调用spawn(Node,M,F,A),或者net_adm:ping(Node)。默认下,连接是具有传递性的,比如说Node A连接到Node B,而Node B之前已经连接到了Node C,那么Node A也会试图连接Node C。如果一个Node关闭,所有与这个Node相关的连接会相应关闭。这个特性可以通过启动时添加-connect_all false来关闭,其实这个特性是由global模块提供的,关闭了这个特性后,global模块也将不提供对Pid注册全局名称的服务。

      【global模块】

      global模块由global_name_server进程构成,这个进程存在于每个Node上,在Node启动时,自动启动(kernel app下)。它主要提供以下3中服务:

    1. 为Pid提供全局名称注册,类似于本地注册,注册后可以调用global:send为远程Node的Pid发送消息。如果注册的Pid终止了,注册的名字将自动消除。Erlang在实现这个服务时,并没有提供中心的服务进程来保存全局的注册名与Pid对应表,而是在每个Node上各自保存一份数据,因此Name与Pid的转换在本地计算,速度很快。但一旦全局表里的数据发生了变化,那么所有连接的Node都需要更新数据。当然这个过程是由Erlang帮你完成的。
    2. 提供全局锁服务。Pid可以对访问的资源进行加锁(set_lock),加锁后的资源只能被这个Pid访问,其它Pid访问将被拒绝。如果加锁的Pid终止,或者其上的Node终止,该资源的锁会自动解除。
    3. 维护全连接服务。上面说到Node的互相连接就是由这个服务提供的。

      【epmd】

      epmd全称Erlang Port Mapper Daemon,主要用于Node名称与机器地址的关联,当一个Node启动时,epmd将自动启动。它在分布式Erlang中扮演一个域名服务器的角色。当一个Node启动时将从OS Kernel中获得地址信息,然后Node将Node和名称与地址一起发送给本机上运行的epmd守护进程。在TCP/IP下,地址信息包含IP和端口。而Node名称则为一个原子(Name@Node)。这样一来,epmd就知道Node在哪个地址和端口上监听了。

      【安全性】

      Node之间连接的验证通过自己的magic cookie来进行判断。当一个Node试图连接另外一个Node时,首先需要对比magic cookies。如果相等,那么验证通过,反之则不然。只有通过验证的Node才能够相互连接。

      当Erlang network authentication server(auth)启动时,首先会去读取$Home/.erlang.cookie文件,如果文件不存在的话,则创建一个。这个文件的访问权限是400,里面就是保存的该Host上Node的cookie。也可以通过erlang:set_cookie在代码里设置Node的cookie。

      分布式Erlang相关的BIFs

     

      分布式Erlang相关的Cmd Line标记

     

      【分布式Erlang相关的模块】

     

    • C Node

      在开始C Node之前,先了解下Hidden Node。

      【Hidden Nodes】

       在有一些情况下,我们不希望Node A连接一个Node B后,Node A继续对Node B已知的其它Node进行连接。这时可以使用-hidden将Node标示为Hidden Node,并且也不会显示在Nodes调用的列表中。这部分主要在global_group中实现。大概实现是这样,global_group模块会根据启动的Node是否为Hidden,在net_kernel里注册该Node可见的范围。比如说这个Node为hidden,no_group,那么这个Node对任何Node不可见。如果这个Node属于一个group组时,那么这个Node在hidden时只对组内可见。同样group在配置时,也可以配置为hidden,这时不管组内Node是normal还是hidden,组内的Node都只对组内可见,可以参看pushlish_on_nodes那四个函数。Node的group可以通过Kernel的config文件配置,config文件通过-config引用,建立global_group的原因主要是当Node太多时,降低不必要的Node维护开销

    publish_on_nodes(normal, no_group) ->
        all;
    publish_on_nodes(hidden, no_group) ->
        [];
    publish_on_nodes(normal, {normal, _}) ->
        all;
    publish_on_nodes(hidden, {_, Nodes}) ->
        Nodes;
    publish_on_nodes(_, {hidden, Nodes}) ->
        Nodes.
    
    %%%====================================================================================
    %%% Update net_kernels publication list
    %%%====================================================================================
    update_publish_nodes(PubArg) ->
        update_publish_nodes(PubArg, no_group).
    update_publish_nodes(PubArg, MyGroup) ->
        net_kernel:update_publish_nodes(publish_on_nodes(PubArg, MyGroup)).

      【C Node】

      C Node在分布式Erlang中也是作为隐藏Node来处理的。在Erlang这边对他的访问和访问一个普通Node一样,也是直接通过{RegName,Node} ! Msg访问。如果Node使用短名,则Node按cN(N为整数)命名。如果是长名,则没有限制。比如说短命叫c1@dril,长名叫cnode@idril.ericsson.se。访问时RegName为一个原子any,它将被C忽略。

    -module(complex3).
    -export([foo/1, bar/1]).
    
    foo(X) ->
        call_cnode({foo, X}).
    bar(Y) ->
        call_cnode({bar, Y}).
    
    call_cnode(Msg) ->
        {any, c1@idril} ! {call, self(), Msg},
        receive
        {cnode, Result} ->
            Result
        end.

      在C这边,还是先使用erl_init函数初始化。短名通过erl_connect_init,长名通过erl_connect_xinit初始化Node。如果C Node作为客户端的话,就使用erl_connect连接其他Node,如果是服务器,就需要创建监听Socket,并通过erl_publish将Node的名称与地址端口对应暴露给epmd。最后是具体的消息处理部分类似于普通端口,可以参看第一篇  

    /* cnode_s.c */
    
    #include <stdio.h>
    #include <sys/types.h>
    #include <sys/socket.h>
    #include <netinet/in.h>
    
    #include "erl_interface.h"
    #include "ei.h"
    
    #define BUFSIZE 1000
    
    int main(int argc, char **argv) {
      int port;                                /* Listen port number */
      int listen;                              /* Listen socket */
      int fd;                                  /* fd to Erlang node */
      ErlConnect conn;                         /* Connection data */
    
      int loop = 1;                            /* Loop flag */
      int got;                                 /* Result of receive */
      unsigned char buf[BUFSIZE];              /* Buffer for incoming message */
      ErlMessage emsg;                         /* Incoming message */
    
      ETERM *fromp, *tuplep, *fnp, *argp, *resp;
      int res;
    
      port = atoi(argv[1]);
    
      erl_init(NULL, 0);
    
      if (erl_connect_init(1, "secretcookie", 0) == -1)
        erl_err_quit("erl_connect_init");
    
      /* Make a listen socket */
      if ((listen = my_listen(port)) <= 0)
        erl_err_quit("my_listen");
    
      if (erl_publish(port) == -1)
        erl_err_quit("erl_publish");
    
      if ((fd = erl_accept(listen, &conn)) == ERL_ERROR)
        erl_err_quit("erl_accept");
      fprintf(stderr, "Connected to %s
    
    ", conn.nodename);
    
      while (loop) {
    
        got = erl_receive_msg(fd, buf, BUFSIZE, &emsg);
        if (got == ERL_TICK) {
          /* ignore */
        } else if (got == ERL_ERROR) {
          loop = 0;
        } else {
    
          if (emsg.type == ERL_REG_SEND) {
        fromp = erl_element(2, emsg.msg);
        tuplep = erl_element(3, emsg.msg);
        fnp = erl_element(1, tuplep);
        argp = erl_element(2, tuplep);
    
        if (strncmp(ERL_ATOM_PTR(fnp), "foo", 3) == 0) {
          res = foo(ERL_INT_VALUE(argp));
        } else if (strncmp(ERL_ATOM_PTR(fnp), "bar", 3) == 0) {
          res = bar(ERL_INT_VALUE(argp));
        }
    
        resp = erl_format("{cnode, ~i}", res);
        erl_send(fd, fromp, resp);
    
        erl_free_term(emsg.from); erl_free_term(emsg.msg);
        erl_free_term(fromp); erl_free_term(tuplep);
        erl_free_term(fnp); erl_free_term(argp);
        erl_free_term(resp);
          }
        }
      } /* while */
    }
    
      
    int my_listen(int port) {
      int listen_fd;
      struct sockaddr_in addr;
      int on = 1;
    
      if ((listen_fd = socket(AF_INET, SOCK_STREAM, 0)) < 0)
        return (-1);
    
      setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
    
      memset((void*) &addr, 0, (size_t) sizeof(addr));
      addr.sin_family = AF_INET;
      addr.sin_port = htons(port);
      addr.sin_addr.s_addr = htonl(INADDR_ANY);
    
      if (bind(listen_fd, (struct sockaddr*) &addr, sizeof(addr)) < 0)
        return (-1);
    
      listen(listen_fd, 5);
      return listen_fd;
    }

      恩,这个议题终于写完了,当然要深入下去还有很多可以写。不过也写得有点疲了,先告一段落吧。

      

      

  • 相关阅读:
    from collections import defaultdict ; from collections import namedtuple
    向日葵,teamviewer
    Seach in google: "tensorflow:Error encountered when serializing"
    lsof, fuser 命令杀进程。target is busy (In some cases useful info about processes that use the device is found by lsof(8) or fuser(1).)
    个税专项 http://www.sohu.com/a/284804458_260616?_f=index_news_0
    Insert video into ppt
    Is the phrase "anything and everything" redundant?
    税率等级
    Gstreamer overview
    Deep Learning for Generic Object Detection: A Survey
  • 原文地址:https://www.cnblogs.com/pbblog/p/3420028.html
Copyright © 2011-2022 走看看