zoukankan      html  css  js  c++  java
  • WAMP-网络程序消息协议

    WAMP-网络程序消息协议

    wamp是一个开放式的标准的websocket子协议,在一个统一协议中提供两种应用程序的消息模式: 远程过程调用  +  发布&订阅

    它在使用不同语言的开放的WebSocket协议中提供统一应用程序路由。使用WAMP,可以用松耦合实现实时通信的应用程序组件构建分布式系统。

    因为应用程序通常对两种形式的通信都有自然的需要,并且不应该要求为这些形式使用不同的协议/手段, 所以这就是为什么WAMP提供这这两种通信模式。

    WAMP的核心是为应用程序组件提供两种通信模式,以便彼此通信:

             发布&订阅  + 远程过程调用

    一、统一应用路由

    WAMP提供给了我们所谓的应用程序的统一应用路由:在一个协议中的应用组件之间路由两个事件(用于PubSub)和路由呼叫(用于RPC)。

    统一路由可能最好通过与传统方法进行对比来解释。 在客户端 - 服务器模型中,远程过程调用直接从调用者到被调用者:

      调用者-->被调者

    在客户端 - 服务器模型中,Caller需要知道Callee驻留在哪里以及如何到达它,这引入Caller和Callee之间的强耦合。 这是很麻烦的,因为应用程序可能很快变得复杂和无法维护。

    所以WAMP如何解决这一点的呢?

    在发布 - 订阅模型中,发布者向抽象的“主题”提交信息,并且订阅者仅通过它们对应“主题”的兴趣而间接地接收到信息。 两个人不知道彼此。 它们通过“主题”和通常称为代理的中间体解耦: 发布者-->代理商-->订阅者

    代理商保留订阅:谁目前订阅了哪个主题。 当发布者向主题发布一些信息(“事件”)时,代理将查找当前对该主题订阅的用户:确定发布到主题上的订阅者的集合,然后将信息(“事件”)转发给所有这些订阅者。 而确定信息接收者(独立于提交的信息)和将信息转发到接收者的行为就被称为路由。

    现在,WAMP将松耦合的好处转化为RPC。与客户端 - 服务器模型不同,WAMP还通过引入中介:经销商来解除Caller和Callees的联系: 调用者-->经销商-->被调者

    类似于Broker在PubSub中的作用,经销商负责将来自Caller的呼叫路由到被叫方,并路由返回结果或者错误,反之亦然。 两者不知道彼此:对等体驻留在哪里以及如何到达它,这些内容被封装在经销商中。

    使用WAMP,被叫方在经销商处以抽象名称注册过程:标识过程的URI。 当调用者想要调用远程过程时,它与经销商谈话,并且仅提供要调用的过程的URI加上任何调用参数。 经销商将在他的注册程序书中查找要援引的程序。 书中的信息包括执行程序的Callee所在的位置,以及如何到达它。

    实际上,Caller和Callees是分离的,应用程序可以使用RPC并仍然受益于松耦合。

    WAMP调用路由器:路由 = 代理商 + 经销商,路由器能够路由呼叫,因此可以支持使用RPC和PubSub的灵活的解耦架构。

    这里是一个例子。 想象一下,你有一个像Arduino Yun这样的小型嵌入式设备,具有传感器(如温度传感器)和致动器(如灯或电机)连接。 您希望将设备集成到整个系统中,用户面向前端以控制执行器,并连续处理后端组件中的传感器值。

     使用WAMP,可以有一个基于浏览器的UI及嵌入式设备和后端实时交谈,从基于浏览器的UI打开设备上的灯自然通过在设备(1)上调用远程过程来完成。 并且由设备连续生成的传感器值通过发布和订阅(2)自然地传输到后端组件(并且可能是其他组件)。

    二、多语言化

    WAMP的设计有一流的支持,支持不同的语言。 WAMP中没有要求特定于单个编程语言。 只要编程语言具有WAMP实现,它就可以与使用WAMP支持的任何其他语言编写的应用程序组件实现透明通信。

    三、什么是RPC?什么是发布-订阅?

    远程过程调用(RPC)是一种涉及三个角色的同级的消息模式:

    呼叫者、被叫者、经销商

    调用者通过提供过程URI和调用的任何参数来向远程过程发出调用。 被调者将使用提供的调用参数执行该过程,并将调用结果返回给调用者。

    调用者注册它们向经销商提供的程序,呼叫者首先向经销商发起程序呼叫,经销商将来自呼叫者的呼叫路由实现到被调者,并将呼叫结果从被调者路由返回到呼叫者。

    Caller和Callee通常运行应用程序代码,而经销商作为一个通用路由器,用于远程过程调用解耦Caller和Callees。

    发布和订阅(PubSub)是一种包含三个角色的同行的消息模式:

    发布者、订阅者、经纪人

    发布者通过提供主题URI和事件的任何有效内容将主题发布,主题的订阅者将事件内容接收。

    订阅者订阅他们对Brokers感兴趣的主题,发布者首先在Brokers发布。 代理将发布者传入的事件路由到订阅相应主题的订阅者。

    发布者和订阅者通常运行应用程序代码,而代理作为通用路由器用于将发布者与订阅者分离。

    下面是一个简单的WAMP应用小程序,搭建一个CrossBar服务器,开启服务器,就可以运行wamp小程序了。

    发布者(Publish):

    package ws.wamp.jawampa;
    
    import java.io.IOException;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.TimeUnit;
    
    import rx.Scheduler;
    import rx.Subscription;
    import rx.functions.Action0;
    import rx.functions.Action1;
    import rx.schedulers.Schedulers;
    import ws.wamp.jawampa.ApplicationError;
    import ws.wamp.jawampa.Request;
    import ws.wamp.jawampa.WampError;
    import ws.wamp.jawampa.WampClient;
    import ws.wamp.jawampa.WampClientBuilder;
    
    public class Publish {
        
        String url;
        String realm;
        int flag = 0;
        WampClient client;
        
        Subscription addProcSubscription;
        Subscription counterPublication;
        Subscription onHelloSubscription;
        
        // Scheduler for this example
        ExecutorService executor = Executors.newSingleThreadExecutor();
        Scheduler rxScheduler = Schedulers.from(executor);
        
        static final int TIMER_INTERVAL = 1000; // 1s
        int counter = 0;
        
        Publish(String url, String realm) throws Exception {
            this.url = url;
            this.realm = realm;
        }
        
        
        public static void main(String[] args) throws Exception {
          if (args.length != 2) {
              System.out.println("Need 2 commandline arguments: Router URL and ream name");
              return;
          }
          String args1 ="ws://172.16.100.55:8080/ws";
          String args2 = "realm1";
          new Publish(args1, args2).run();
      }
      
      void run() {
          
          WampClientBuilder builder = new WampClientBuilder();
          try {
              builder.witUri(url)
                     .withRealm(realm)
                     .withInfiniteReconnects()
                     .withCloseOnErrors(true)
                     .withReconnectInterval(5, TimeUnit.SECONDS);
              client = builder.build();
          } catch (WampError e) {
              e.printStackTrace();
              return;
          }
    
          // Subscribe on the clients status updates
          client.statusChanged()
                .observeOn(rxScheduler)
                .subscribe(new Action1<WampClient.Status>() {
              @Override
              public void call(WampClient.Status t1) {
                  System.out.println("Session status changed to " + t1);
    
                  if (t1 == WampClient.Status.Connected) {
                      
                   
                      // PUBLISH and CALL every second .. forever
                      counter = 0;
                      final int published = 4;
                      onHelloSubscription= client.publish("glf", published)
                                    .observeOn(rxScheduler)
                                    .subscribe(new Action1<Long>() {
                                      @Override
                                      public void call(Long t1) {
                                          System.out.println("published to 'oncounter' with counter " + published);
                                      }
                                  }, new Action1<Throwable>() {
                                      @Override
                                      public void call(Throwable e) {
                                          System.out.println("Error during publishing to 'oncounter': " + e);
                                      }
                                  });
                              
                               CALL a remote procedure
                                
                                  flag =1;
                                   System.out.println(flag);
                              counter++;
                         
                  }
                  else if (t1 == WampClient.Status.Disconnected) {
                      closeSubscriptions();
                  }
              }
          }, new Action1<Throwable>() {
              @Override
              public void call(Throwable t) {
                  System.out.println("Session ended with error " + t);
              }
          }, new Action0() {
              @Override
              public void call() {
                  System.out.println("Session ended normally");
              }
          });
    
          client.open();
          
          waitUntilKeypressed();
          while (flag == 0){
              try {
                  Thread.sleep(1000);
              } catch (InterruptedException e) {
                  
                  e.printStackTrace();
              }
              
              System.out.println("waiting");
              }
          System.out.println("Shutting down");
          closeSubscriptions();
          client.close();
          try {
              client.getTerminationFuture().get();
          } catch (Exception e) {}
          
          executor.shutdown();
      }
      
      void closeSubscriptions() {
          if (onHelloSubscription != null)
              onHelloSubscription.unsubscribe();
          onHelloSubscription = null;
          if (counterPublication != null)
              counterPublication.unsubscribe();
          counterPublication = null;
          if (addProcSubscription != null)
              addProcSubscription.unsubscribe();
          addProcSubscription = null;
      }
      
      private void waitUntilKeypressed() {
          try {
              System.in.read();
              while (System.in.available() > 0) {
                  System.in.read();
              }
          } catch (IOException e) {
              e.printStackTrace();
          }
      }
    
    }
    View Code

    订阅者(Subscribe):

    package ws.wamp.jawampa;
    
    import java.io.IOException;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.TimeUnit;
    
    import rx.Scheduler;
    import rx.Subscription;
    import rx.functions.Action0;
    import rx.functions.Action1;
    import rx.schedulers.Schedulers;
    import ws.wamp.jawampa.WampError;
    import ws.wamp.jawampa.WampClient;
    import ws.wamp.jawampa.WampClientBuilder;
    
    public class Subscribe {
    
        String url;
        String realm;
    
        WampClient client;
    
        Subscription glfSubscription;
    
        ExecutorService executor = Executors.newSingleThreadExecutor();
        Scheduler rxScheduler = Schedulers.from(executor);
    
        static final int TIMER_INTERVAL = 1000; // 1s
        int counter = 0;
    
        Subscribe(String url, String realm) throws Exception {
            this.url = url;
            this.realm = realm;
        }
    
        
        public static void main(String[] args) throws Exception {
    
            String args1 = "ws://172.16.100.55:8080/ws";
            String args2 = "realm1";
            new Subscribe(args1, args2).run();
        }
    
        void run() {
    
            WampClientBuilder builder = new WampClientBuilder();
            try {
                builder.witUri(url).withRealm(realm).withInfiniteReconnects().withCloseOnErrors(true)
                        .withReconnectInterval(5, TimeUnit.SECONDS);
                client = builder.build();
            } catch (WampError e) {
                e.printStackTrace();
                return;
            }
    
            // Subscribe on the clients status updates
            client.statusChanged().observeOn(rxScheduler).subscribe(new Action1<WampClient.Status>() {
                @Override
                public void call(WampClient.Status t1) {
                    System.out.println("工作状态变为 " + t1);
    
                    if (t1 == WampClient.Status.Connected) {
                        // SUBSCRIBE to a topic and receive events
                        glfSubscription = client.makeSubscription("glf", String.class).observeOn(rxScheduler)
                                .subscribe(new Action1<String>() {
                                     @Override
                                    public void call(String msg) {
                                        System.out.println("主题 'glf' 接收: " + msg);
                                    }
                                }, new Action1<Throwable>() {
                                     @Override
                                    public void call(Throwable e) {
                                        System.out.println("failed to subscribe 'websocket': " + e);
                                    }
                                }, new Action0() {
                                     @Override
                                    public void call() {
                                        System.out.println("'websocket' subscription ended");
                                    }
                                });
    
                    } else if (t1 == WampClient.Status.Disconnected) {
                        closeSubscriptions();
                    }
                }
            }, new Action1<Throwable>() {
                 @Override
                public void call(Throwable t) {
                    System.out.println("Session ended with error " + t);
                }
            }, new Action0() {
                 @Override
                public void call() {
                    System.out.println("工作正常结束");
                }
            });
    
            client.open();
    
            waitUntilKeypressed();
            System.out.println("关闭");
            closeSubscriptions();
            client.close();
            try {
                client.getTerminationFuture().get();
            } catch (Exception e) {
            }
    
            executor.shutdown();
        }
        void closeSubscriptions() {
            if (glfSubscription != null)
                glfSubscription.unsubscribe();
            glfSubscription = null;
    
        }
    
        private void waitUntilKeypressed() {
            try {
                System.in.read();
                while (System.in.available() > 0) {
                    System.in.read();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
    }

    四、什么是WebSocket协议

    WebSocket是一种新的Web协议,当需要双向,实时通信时,克服HTTP的限制。 WebSocket将是WAMP的理想基础,因为它提供了与Web和浏览器兼容的双向实时消息传递。 不仅如此 - 我们可以在非浏览器环境中运行WebSocket。 从技术上讲,WAMP是一个正式注册的WebSocket子协议(在WebSocket之上运行),它使用JSON作为消息序列化格式。

  • 相关阅读:
    【python cookbook】找出序列中出现次数最多的元素
    2018/1/21 Netty通过解码处理器和编码处理器来发送接收POJO,Zookeeper深入学习
    读《风雨20年》小感
    两个知识点的回顾(const指针和动态链接库函数dlopen)
    小试牛刀
    chmod,chown和chgrp的区别
    node.js中使用node-schedule实现定时任务
    在 Node.js 上调用 WCF Web 服务
    nodejs发起HTTPS请求并获取数据
    openstack 之~云计算介绍
  • 原文地址:https://www.cnblogs.com/glfcdio/p/8213568.html
Copyright © 2011-2022 走看看