zoukankan      html  css  js  c++  java
  • Storm官方提供的trident单词计数的例子

    上代码:

     1 public class TridentWordCount {
     2   public static class Split extends BaseFunction {
     3     @Override
     4     public void execute(TridentTuple tuple, TridentCollector collector) {
     5       String sentence = tuple.getString(0);
     6       for (String word : sentence.split(" ")) {
     7         collector.emit(new Values(word));
     8       }
     9     }
    10   }
    11 
    12   public static StormTopology buildTopology(LocalDRPC drpc) {
    13     //这个是一个batch Spout  一次发3个..
    14     FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3, new Values("the cow jumped over the moon"),
    15         new Values("the man went to the store and bought some candy"), new Values("four score and seven years ago"),
    16         new Values("how many apples can you eat"), new Values("to be or not to be the person"));
    17     spout.setCycle(true);//Spout是否循环发送
    18 
    19     TridentTopology topology = new TridentTopology();
    20     TridentState wordCounts = topology.newStream("spout1", spout).parallelismHint(16)//类似于setSpout
    21             .each(new Fields("sentence"),new Split(), new Fields("word"))//setbolt
    22             .groupBy(new Fields("word")).persistentAggregate(new MemoryMapState.Factory(),new Count(), new Fields("count")).parallelismHint(16);
    23     
    24     topology.newDRPCStream("words", drpc).each(new Fields("args"), new Split(), new Fields("word")).groupBy(new Fields(
    25         "word")).stateQuery(wordCounts, new Fields("word"), new MapGet(), new Fields("count")).each(new Fields("count"),
    26         new FilterNull()).aggregate(new Fields("count"), new Sum(), new Fields("sum"));
    27     return topology.build();
    28   }
    29 
    30   public static void main(String[] args) throws Exception {
    31     Config conf = new Config();
    32     conf.setMaxSpoutPending(20);
    33     if (args.length == 0) {
    34       LocalDRPC drpc = new LocalDRPC();
    35       LocalCluster cluster = new LocalCluster();
    36       cluster.submitTopology("wordCounter", conf, buildTopology(drpc));
    37       for (int i = 0; i < 100; i++) {
    38         System.out.println("DRPC RESULT: " + drpc.execute("words", "cat the dog jumped"));
    39         Thread.sleep(1000);
    40       }
    41     }
    42     else {
    43       conf.setNumWorkers(3);
    44       StormSubmitter.submitTopologyWithProgressBar(args[0], conf, buildTopology(null));
    45     }
    46   }
    47 }
  • 相关阅读:
    一行代码轻松修改 Text Field 和 Text View 的光标颜色 — By 昉
    六种手势识别,你用了哪些?——董鑫
    Mac 屏幕录制Gif 制作 By-胡罗
    利用ICMP协议的PING命令获取客户端当前网络质量 by徐文棋
    iOS加载Gif图片的N种方式 By-H罗
    [手游项目3]-10-Go语言atomic原子操作
    [手游项目3]-9-Go语言sync.Map(在并发环境中使用的map)
    LRU原理和Redis实现
    Cleanup failed to process the following paths错误的解决
    [手游项目3]-8-排行榜redis实现
  • 原文地址:https://www.cnblogs.com/DreamDrive/p/6675985.html
Copyright © 2011-2022 走看看