zoukankan      html  css  js  c++  java
  • storm drpc分布式本地和远程调用模式讲解

    一.drpc 的介绍

      1.rpc

        RPC(Remote Procedure Call)—远程过程调用,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。

      2.drpc

        drpc(Distributed Remote Procedure Call) - 分布式的远程过程调用。

      我们回顾一下前面学习的storm知识,知道storm是一个分布式的流式计算框架,由1台nimbus 和多台supervisor 组成,nimbus主要是把任务分发到不同的supervisor 上,而supervisor是分布在不同节点上的。在一个完整的topology中,spout是用来收集上游jobs的,然后再由nimbus负责分发给相应的supervisor,然后由worker来执行jobs,而drcp的调用是当我们想调用这个topology中的函数时,直接通过调用supervisorr上的worker就行,然后由worker线程来执行调用的过程。

      个人理解是:storm的drcp就是满足用户直接调用这个topology处理信息的,以前的可能是固定的收集数据,现在需要时,可以随时调用

    二.drcp的调用

      1.本地的调用 

    public class HelloDRPC {
        
        private static final Logger LOG = LoggerFactory.getLogger(HelloDRPC.class);
        
        public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
            
            boolean isRemote = !(args.length == 0);
            LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("echo");
            builder.addBolt(new EchoBolt(),1);// 第2个参数是:并行度
            
            Config conf = new Config();
            conf.setDebug(false);
            conf.setNumWorkers(2);
            
            if(isRemote){
                
                StormSubmitter.submitTopology("HelloDRPC", conf, builder.createRemoteTopology());
                LOG.warn("==================================================");
                LOG.warn("the topology {} is submitted.","HelloDRPC");
                LOG.warn("==================================================");
            }else{
                LocalDRPC drpc = new LocalDRPC();
                LocalCluster cluster = new LocalCluster();
                cluster.submitTopology("HelloDRPC", conf, builder.createLocalTopology(drpc));
                Arrays.asList("hello","world","storm","java","kafka").forEach(
                        item -> 
                        {
                            String result = drpc.execute("echo",item );
                            LOG.warn(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> ");
                            LOG.warn(">>>>>>>>>>>>>>> result:{}",result);
                            LOG.warn(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> ");
                        }
                );
            }
        }
        
        public static class EchoBolt extends BaseBasicBolt{
    
            @Override
            public void execute(Tuple input, BasicOutputCollector collector) {
                LOG.warn(" ================> tuple:{}",input.getFields());
                Object id = input.getValue(0);
                String name = input.getString(1);
                System.err.println("------------------------------");
                System.err.println("id: "+id +" name: "+ name);
                System.err.println("------------------------------");
                collector.emit(new Values(id,"echo:  "+ name));
            }
    
            @Override
            public void declareOutputFields(OutputFieldsDeclarer declarer) {
                declarer.declare(new Fields("id","value"));
            }
        }
    }
    //在无参情况下,直接运行 会将数据全部发出

      2.远程过程调用

        远程过程调用是将drcp当作一个服务端,然后由client去连接调用。

        1.drcp的配置

    drpc.servers:
      - "master"
    drpc.port: 3777
    storm.thrift.transport: "org.apache.storm.security.auth.plain.PlainSaslTransportPlugin"

    本测试共3台虚拟机,1台nimbus 2台supervisor 在3台storm.yaml 分别配置以上内容,然后启动3台drcp服务
    storm drpc &  #后台启动

        2.启动服务端

         将HelloDRPC 当作服务端,打成jar包,然后启动

    storm jar storm_test-0.0.1-SNAPSHOT.jar com.drpc.HelloDRPC 1 

        3.客户端连接

    public class HelloClientDRPC {
    
        private static final Logger LOG = LoggerFactory.getLogger(HelloClientDRPC.class);
    
        public static void main(String[] args) throws DRPCExecutionException, AuthorizationException, TException {
            Config conf = new Config();
            conf.put("storm.thrift.transport", "org.apache.storm.security.auth.plain.PlainSaslTransportPlugin");
            conf.put(Config.STORM_NIMBUS_RETRY_TIMES, 3);
            conf.put(Config.STORM_NIMBUS_RETRY_INTERVAL, 10);
            conf.put(Config.STORM_NIMBUS_RETRY_INTERVAL_CEILING, 20);
    
            DRPCClient client = new DRPCClient(conf, "master", 3777);
            String result = client.execute("echo", "hello");
            System.out.println("result:"+result);
            LOG.info("=====================================> "+ result);
            
        }
    }

    //结果正常返回
  • 相关阅读:
    正则表达式替换所有符合条件的字符
    关于jquery ajax不执行success回调函数
    关于jquery绑定事件执行两次
    同步选中所有checkbox
    Jquery动态改变my97datepicker的日期形式
    关于button在td中时,zclip复制不能的问题
    关于各种高度的获取方法
    慎用--skip-grant-tables命令
    Mysql中判断是否存在
    前端html
  • 原文地址:https://www.cnblogs.com/MrRightZhao/p/11119214.html
Copyright © 2011-2022 走看看