zoukankan      html  css  js  c++  java
  • Apache Storm技术实战之2 -- BasicDRPCTopology

    欢迎转载,转载请注明出处,徽沪一郎.

    本文通过BasicDRPCTopology的实例来分析DRPCTopology在提交的时候, Topology中究竟含有哪些内容?

    BasicDRPCTopology

    main函数

    DRPC 分布式远程调用(这个说法有意思,远程调用本来就是分布的,何须再加个D, <头文字D>看多了, :)

    public static void main(String[] args) throws Exception {
        LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("exclamation");
        builder.addBolt(new ExclaimBolt(), 3);
    
        Config conf = new Config();
    
        if (args == null || args.length == 0) {
          LocalDRPC drpc = new LocalDRPC();
          LocalCluster cluster = new LocalCluster();
    
          cluster.submitTopology("drpc-demo", conf, builder.createLocalTopology(drpc));
    
          for (String word : new String[]{ "hello", "goodbye" }) {
            System.out.println("Result for "" + word + "": " + drpc.execute("exclamation", word));
          }
    
          cluster.shutdown();
          drpc.shutdown();
        }
        else {
          conf.setNumWorkers(3);
          StormSubmitter.submitTopology(args[0], conf, builder.createRemoteTopology());
        }
      }

    问题: 上面的代码中只是添加了一个bolt,并没有设定Spout. 我们知道一个topology中最起码得有一个Spout,那么这里的Spout又隐身于何处呢?

    关键的地方就在builder.createLocalTopology, 调用关系如下

    • LinearDRPCTopologyBuilder::createLocalTopology
      •   LinearDRPCTopologyBuilder::createTopology()
        •   LinearDRPCTopologyBuilder::createTopology(new DRPCSpout(_function))

    原来DRPCTopology中使用的Spout是DRPCSpout.

    LinearDRPCTopology::createTopology

    既然代码已经读到此处,何不再进一步看看createTopology的实现.

    简要说明一下该段代码的处理逻辑:

    1. 设置DRPCSpout
    2. 以bolt为入参,创建CoordinatedBolt
    3. 添加JoinResult Bolt
    4. 添加ReturnResult Bolt: ReturnResultBolt连接到DRPCServer,并返回结果
        private StormTopology createTopology(DRPCSpout spout) {
            final String SPOUT_ID = "spout";
            final String PREPARE_ID = "prepare-request";
            
            TopologyBuilder builder = new TopologyBuilder();
            builder.setSpout(SPOUT_ID, spout);
            builder.setBolt(PREPARE_ID, new PrepareRequest())
                    .noneGrouping(SPOUT_ID);
            int i=0;
            for(; i<_components.size();i++) {
                Component component = _components.get(i);
                
                Map<String, SourceArgs> source = new HashMap<String, SourceArgs>();
                if (i==1) {
                    source.put(boltId(i-1), SourceArgs.single());
                } else if (i>=2) {
                    source.put(boltId(i-1), SourceArgs.all());
                }
                IdStreamSpec idSpec = null;
                if(i==_components.size()-1 && component.bolt instanceof FinishedCallback) {
                    idSpec = IdStreamSpec.makeDetectSpec(PREPARE_ID, PrepareRequest.ID_STREAM);
                }
                BoltDeclarer declarer = builder.setBolt(
                        boltId(i),
                        new CoordinatedBolt(component.bolt, source, idSpec),
                        component.parallelism);
                
                for(Map conf: component.componentConfs) {
                    declarer.addConfigurations(conf);
                }
                
                if(idSpec!=null) {
                    declarer.fieldsGrouping(idSpec.getGlobalStreamId().get_componentId(), PrepareRequest.ID_STREAM, new Fields("request"));
                }
                if(i==0 && component.declarations.isEmpty()) {
                    declarer.noneGrouping(PREPARE_ID, PrepareRequest.ARGS_STREAM);
                } else {
                    String prevId;
                    if(i==0) {
                        prevId = PREPARE_ID;
                    } else {
                        prevId = boltId(i-1);
                    }
                    for(InputDeclaration declaration: component.declarations) {
                        declaration.declare(prevId, declarer);
                    }
                }
                if(i>0) {
                    declarer.directGrouping(boltId(i-1), Constants.COORDINATED_STREAM_ID); 
                }
            }
            
            IRichBolt lastBolt = _components.get(_components.size()-1).bolt;
            OutputFieldsGetter getter = new OutputFieldsGetter();
            lastBolt.declareOutputFields(getter);
            Map<String, StreamInfo> streams = getter.getFieldsDeclaration();
            if(streams.size()!=1) {
                throw new RuntimeException("Must declare exactly one stream from last bolt in LinearDRPCTopology");
            }
            String outputStream = streams.keySet().iterator().next();
            List<String> fields = streams.get(outputStream).get_output_fields();
            if(fields.size()!=2) {
                throw new RuntimeException("Output stream of last component in LinearDRPCTopology must contain exactly two fields. The first should be the request id, and the second should be the result.");
            }
    
            builder.setBolt(boltId(i), new JoinResult(PREPARE_ID))
                    .fieldsGrouping(boltId(i-1), outputStream, new Fields(fields.get(0)))
                    .fieldsGrouping(PREPARE_ID, PrepareRequest.RETURN_STREAM, new Fields("request"));
            i++;
            builder.setBolt(boltId(i), new ReturnResults())
                    .noneGrouping(boltId(i-1));
            return builder.createTopology();
        }

    Bolt

    处理逻辑: 在接收到的每一个单词后面添加'!'.

     public static class ExclaimBolt extends BaseBasicBolt {
        @Override
        public void execute(Tuple tuple, BasicOutputCollector collector) {
          String input = tuple.getString(1);
          collector.emit(new Values(tuple.getValue(0), input + "!"));
        }
    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
          declarer.declare(new Fields("id", "result"));
        }
      }

    运行

    java -cp $(lein classpath) storm.starter.BasicDRPCTopology
  • 相关阅读:
    Srt字幕文件解析
    有意思的一些处理
    CMSampleBufferRef转换
    不知为什么的警告和报错
    X Postgres copy命令导入导出数据
    X Oracle打Patch报错Missing command :fuser
    X wal_segment_size参数的理解与调优
    X PostgreSQL 11、12 开启归档日志
    X 手动安装postgresql扩展插件
    P1501 [国家集训队]Tree II
  • 原文地址:https://www.cnblogs.com/hseagle/p/3511185.html
Copyright © 2011-2022 走看看