zoukankan      html  css  js  c++  java
  • Erlang下与其他程序和语言的通信机制(3)

      这部分主要聊C Nodes。首先我们需要了解下Erlang的分布式系统:

    • 分布式Erlang:

      分布式Erlang是由一组相互通信的Erlang运行时组成,其中每一个运行时称为一个Node。不同Node上的进程通过消息通信(TCP/IP),并且Node对于进程来说是透明的。比如Pid ! Msg,调用进程不需要知道Pid是在哪个Node上。但是当使用进程的注册名进行消息投递时,如果投递的进程不在同一个Node上,这时我们需要带上Node的名称才行,因为进程注册名只是进程所属Node上的本地信息。

      启动Node】

      一个Node通过添加命令行-name 或者 -sname启动运行时来创建,创建好的Node名称为name@host格式。其中name是Node的名称标示,类似于端口号,host为Node所存在的主机名。当使用-name启动表示使用长名,也就是说host部分为完整的主机名字。当使用-sname启动表示使用短名,host部分只是主机名的第一部分。长名一般用于配有DNS的网络环境,短名则适用于DNS不可用的环境,只需要所有Node处于同一个子网。长名Node与短名Node之间是不能通信的。

    % erl -name dilbert
    (dilbert@uab.ericsson.se)1> node().
    'dilbert@uab.ericsson.se'

    %%在ubuntu13.04下调用会失败。需要手动添加主机名。
    % erl -name dilbert@host % erl -sname dilbert (dilbert@uab)1> node(). dilbert@uab

      【Node连接】

      当Node第一次调用函数与其他Node通信时,这个Node将会与这个Node建立连接,比如调用spawn(Node,M,F,A),或者net_adm:ping(Node)。默认下,连接是具有传递性的,比如说Node A连接到Node B,而Node B之前已经连接到了Node C,那么Node A也会试图连接Node C。如果一个Node关闭,所有与这个Node相关的连接会相应关闭。这个特性可以通过启动时添加-connect_all false来关闭,其实这个特性是由global模块提供的,关闭了这个特性后,global模块也将不提供对Pid注册全局名称的服务。

      【global模块】

      global模块由global_name_server进程构成,这个进程存在于每个Node上,在Node启动时,自动启动(kernel app下)。它主要提供以下3中服务:

    1. 为Pid提供全局名称注册,类似于本地注册,注册后可以调用global:send为远程Node的Pid发送消息。如果注册的Pid终止了,注册的名字将自动消除。Erlang在实现这个服务时,并没有提供中心的服务进程来保存全局的注册名与Pid对应表,而是在每个Node上各自保存一份数据,因此Name与Pid的转换在本地计算,速度很快。但一旦全局表里的数据发生了变化,那么所有连接的Node都需要更新数据。当然这个过程是由Erlang帮你完成的。
    2. 提供全局锁服务。Pid可以对访问的资源进行加锁(set_lock),加锁后的资源只能被这个Pid访问,其它Pid访问将被拒绝。如果加锁的Pid终止,或者其上的Node终止,该资源的锁会自动解除。
    3. 维护全连接服务。上面说到Node的互相连接就是由这个服务提供的。

      【epmd】

      epmd全称Erlang Port Mapper Daemon,主要用于Node名称与机器地址的关联,当一个Node启动时,epmd将自动启动。它在分布式Erlang中扮演一个域名服务器的角色。当一个Node启动时将从OS Kernel中获得地址信息,然后Node将Node和名称与地址一起发送给本机上运行的epmd守护进程。在TCP/IP下,地址信息包含IP和端口。而Node名称则为一个原子(Name@Node)。这样一来,epmd就知道Node在哪个地址和端口上监听了。

      【安全性】

      Node之间连接的验证通过自己的magic cookie来进行判断。当一个Node试图连接另外一个Node时,首先需要对比magic cookies。如果相等,那么验证通过,反之则不然。只有通过验证的Node才能够相互连接。

      当Erlang network authentication server(auth)启动时,首先会去读取$Home/.erlang.cookie文件,如果文件不存在的话,则创建一个。这个文件的访问权限是400,里面就是保存的该Host上Node的cookie。也可以通过erlang:set_cookie在代码里设置Node的cookie。

      分布式Erlang相关的BIFs

     

      分布式Erlang相关的Cmd Line标记

     

      【分布式Erlang相关的模块】

     

    • C Node

      在开始C Node之前,先了解下Hidden Node。

      【Hidden Nodes】

       在有一些情况下,我们不希望Node A连接一个Node B后,Node A继续对Node B已知的其它Node进行连接。这时可以使用-hidden将Node标示为Hidden Node,并且也不会显示在Nodes调用的列表中。这部分主要在global_group中实现。大概实现是这样,global_group模块会根据启动的Node是否为Hidden,在net_kernel里注册该Node可见的范围。比如说这个Node为hidden,no_group,那么这个Node对任何Node不可见。如果这个Node属于一个group组时,那么这个Node在hidden时只对组内可见。同样group在配置时,也可以配置为hidden,这时不管组内Node是normal还是hidden,组内的Node都只对组内可见,可以参看pushlish_on_nodes那四个函数。Node的group可以通过Kernel的config文件配置,config文件通过-config引用,建立global_group的原因主要是当Node太多时,降低不必要的Node维护开销

    publish_on_nodes(normal, no_group) ->
        all;
    publish_on_nodes(hidden, no_group) ->
        [];
    publish_on_nodes(normal, {normal, _}) ->
        all;
    publish_on_nodes(hidden, {_, Nodes}) ->
        Nodes;
    publish_on_nodes(_, {hidden, Nodes}) ->
        Nodes.
    
    %%%====================================================================================
    %%% Update net_kernels publication list
    %%%====================================================================================
    update_publish_nodes(PubArg) ->
        update_publish_nodes(PubArg, no_group).
    update_publish_nodes(PubArg, MyGroup) ->
        net_kernel:update_publish_nodes(publish_on_nodes(PubArg, MyGroup)).

      【C Node】

      C Node在分布式Erlang中也是作为隐藏Node来处理的。在Erlang这边对他的访问和访问一个普通Node一样,也是直接通过{RegName,Node} ! Msg访问。如果Node使用短名,则Node按cN(N为整数)命名。如果是长名,则没有限制。比如说短命叫c1@dril,长名叫cnode@idril.ericsson.se。访问时RegName为一个原子any,它将被C忽略。

    -module(complex3).
    -export([foo/1, bar/1]).
    
    foo(X) ->
        call_cnode({foo, X}).
    bar(Y) ->
        call_cnode({bar, Y}).
    
    call_cnode(Msg) ->
        {any, c1@idril} ! {call, self(), Msg},
        receive
        {cnode, Result} ->
            Result
        end.

      在C这边,还是先使用erl_init函数初始化。短名通过erl_connect_init,长名通过erl_connect_xinit初始化Node。如果C Node作为客户端的话,就使用erl_connect连接其他Node,如果是服务器,就需要创建监听Socket,并通过erl_publish将Node的名称与地址端口对应暴露给epmd。最后是具体的消息处理部分类似于普通端口,可以参看第一篇  

    /* cnode_s.c */
    
    #include <stdio.h>
    #include <sys/types.h>
    #include <sys/socket.h>
    #include <netinet/in.h>
    
    #include "erl_interface.h"
    #include "ei.h"
    
    #define BUFSIZE 1000
    
    int main(int argc, char **argv) {
      int port;                                /* Listen port number */
      int listen;                              /* Listen socket */
      int fd;                                  /* fd to Erlang node */
      ErlConnect conn;                         /* Connection data */
    
      int loop = 1;                            /* Loop flag */
      int got;                                 /* Result of receive */
      unsigned char buf[BUFSIZE];              /* Buffer for incoming message */
      ErlMessage emsg;                         /* Incoming message */
    
      ETERM *fromp, *tuplep, *fnp, *argp, *resp;
      int res;
    
      port = atoi(argv[1]);
    
      erl_init(NULL, 0);
    
      if (erl_connect_init(1, "secretcookie", 0) == -1)
        erl_err_quit("erl_connect_init");
    
      /* Make a listen socket */
      if ((listen = my_listen(port)) <= 0)
        erl_err_quit("my_listen");
    
      if (erl_publish(port) == -1)
        erl_err_quit("erl_publish");
    
      if ((fd = erl_accept(listen, &conn)) == ERL_ERROR)
        erl_err_quit("erl_accept");
      fprintf(stderr, "Connected to %s
    
    ", conn.nodename);
    
      while (loop) {
    
        got = erl_receive_msg(fd, buf, BUFSIZE, &emsg);
        if (got == ERL_TICK) {
          /* ignore */
        } else if (got == ERL_ERROR) {
          loop = 0;
        } else {
    
          if (emsg.type == ERL_REG_SEND) {
        fromp = erl_element(2, emsg.msg);
        tuplep = erl_element(3, emsg.msg);
        fnp = erl_element(1, tuplep);
        argp = erl_element(2, tuplep);
    
        if (strncmp(ERL_ATOM_PTR(fnp), "foo", 3) == 0) {
          res = foo(ERL_INT_VALUE(argp));
        } else if (strncmp(ERL_ATOM_PTR(fnp), "bar", 3) == 0) {
          res = bar(ERL_INT_VALUE(argp));
        }
    
        resp = erl_format("{cnode, ~i}", res);
        erl_send(fd, fromp, resp);
    
        erl_free_term(emsg.from); erl_free_term(emsg.msg);
        erl_free_term(fromp); erl_free_term(tuplep);
        erl_free_term(fnp); erl_free_term(argp);
        erl_free_term(resp);
          }
        }
      } /* while */
    }
    
      
    int my_listen(int port) {
      int listen_fd;
      struct sockaddr_in addr;
      int on = 1;
    
      if ((listen_fd = socket(AF_INET, SOCK_STREAM, 0)) < 0)
        return (-1);
    
      setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
    
      memset((void*) &addr, 0, (size_t) sizeof(addr));
      addr.sin_family = AF_INET;
      addr.sin_port = htons(port);
      addr.sin_addr.s_addr = htonl(INADDR_ANY);
    
      if (bind(listen_fd, (struct sockaddr*) &addr, sizeof(addr)) < 0)
        return (-1);
    
      listen(listen_fd, 5);
      return listen_fd;
    }

      恩,这个议题终于写完了,当然要深入下去还有很多可以写。不过也写得有点疲了,先告一段落吧。

      

      

  • 相关阅读:
    std thread
    windows更新包发布地址
    How to set up logging level for Spark application in IntelliJ IDEA?
    spark 错误 How to set heap size in spark within the Eclipse environment?
    hadoop 常用命令
    windows 安装hadoop 3.2.1
    windows JAVA_HOME 路径有空格,执行软连接
    day01MyBatisPlus条件构造器(04)
    day01MyBatisPlus的CRUD 接口(03)
    day01MyBatisPlus入门(02)
  • 原文地址:https://www.cnblogs.com/pbblog/p/3420028.html
Copyright © 2011-2022 走看看