zoukankan      html  css  js  c++  java
  • Storm 运行例子

    1.建立Java工程

    使用idea,添加lib库,拷贝storm中lib到工程中

    2.拷贝wordcount代码

    下载src包,解压找到

    apache-storm-0.9.4-srcapache-storm-0.9.4examplesstorm-startersrcjvmstormstarter目录下

    拷贝WordCountTopology.java内容;

    修改python处理方式;

     1 import backtype.storm.Config;
     2 import backtype.storm.LocalCluster;
     3 import backtype.storm.StormSubmitter;
     4 import backtype.storm.task.ShellBolt;
     5 import backtype.storm.topology.BasicOutputCollector;
     6 import backtype.storm.topology.IRichBolt;
     7 import backtype.storm.topology.OutputFieldsDeclarer;
     8 import backtype.storm.topology.TopologyBuilder;
     9 import backtype.storm.topology.base.BaseBasicBolt;
    10 import backtype.storm.tuple.Fields;
    11 import backtype.storm.tuple.Tuple;
    12 import backtype.storm.tuple.Values;
    13 import com.bigdata.storm.spout.*;
    14 
    15 import java.util.HashMap;
    16 import java.util.Map;
    17 /**
    18  * Created by Edward on 2016/8/17.
    19  */
    20 public class MyTest {
    21 
    22     public static class SplitSentence extends BaseBasicBolt{
    23 
    24         @Override
    25         public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
    26             String word = tuple.getString(0);
    27             String str[] = word.split(" ");
    28 
    29             System.out.println("Split Sentence:" + tuple.getSourceStreamId());
    30             for(int i=0; i<str.length; i++)
    31             {
    32                 basicOutputCollector.emit(new Values(str[i]));
    33             }
    34         }
    35 
    36         @Override
    37         public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
    38             outputFieldsDeclarer.declare(new Fields("word"));
    39         }
    40     }
    41 
    42     public static class WordCount extends BaseBasicBolt {
    43         Map<String, Integer> counts = new HashMap<String, Integer>();
    44 
    45         @Override
    46         public void execute(Tuple tuple, BasicOutputCollector collector) {
    47             String word = tuple.getString(0);
    48             Integer count = counts.get(word);
    49             if (count == null)
    50                 count = 0;
    51             count++;
    52             counts.put(word, count);
    53             System.out.println("Word Count:" + tuple.getSourceStreamId());
    54             collector.emit(new Values(word, count));
    55 
    56         }
    57 
    58         @Override
    59         public void declareOutputFields(OutputFieldsDeclarer declarer) {
    60             declarer.declare(new Fields("word", "count"));
    61         }
    62     }
    63 
    64     public static void main(String[] args) throws Exception {
    65 
    66         TopologyBuilder builder = new TopologyBuilder();
    67 
    68         builder.setSpout("spout", new RandomSentenceSpout(), 5);
    69 
    70         builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout");
    71         builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));
    72 
    73         Config conf = new Config();
    74         conf.setDebug(true);
    75 
    76 
    77         if (args != null && args.length > 0) {
    78             conf.setNumWorkers(3);
    79 
    80             StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
    81         }
    82         else {
    83             conf.setMaxTaskParallelism(3);
    84 
    85             LocalCluster cluster = new LocalCluster();
    86             cluster.submitTopology("word-count", conf, builder.createTopology());
    87 
    88             Thread.sleep(50000);
    89 
    90             cluster.shutdown();
    91         }
    92     }
    93 }

    3.拷贝随机生成spout代码

    找到 apache-storm-0.9.4-srcapache-storm-0.9.4examplesstorm-startersrcjvmstormstarterspout

    拷贝RandomSentenceSpout.java到工程中

     1 import backtype.storm.spout.SpoutOutputCollector;
     2 import backtype.storm.task.TopologyContext;
     3 import backtype.storm.topology.OutputFieldsDeclarer;
     4 import backtype.storm.topology.base.BaseRichSpout;
     5 import backtype.storm.tuple.Fields;
     6 import backtype.storm.tuple.Values;
     7 import backtype.storm.utils.Utils;
     8 
     9 import java.util.Map;
    10 import java.util.Random;
    11 
    12 public class RandomSentenceSpout extends BaseRichSpout {
    13     SpoutOutputCollector _collector;
    14     Random _rand;
    15 
    16     @Override
    17     public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
    18         _collector = collector;
    19         _rand = new Random();
    20     }
    21 
    22     @Override
    23     public void nextTuple() {
    24         Utils.sleep(10000);
    25         String[] sentences = new String[]{ "the cow jumped over the moon", "an apple a day keeps the doctor away",
    26                 "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature" };
    27         String sentence = sentences[_rand.nextInt(sentences.length)];
    28         Object id = new Object();
    29         System.out.println(id);
    30         //id message ID 用来保证可靠性的,如果失败fail 会返回 message id 信息
    31         _collector.emit(new Values(sentence), id);
    32     }
    33 
    34     @Override
    35     public void ack(Object id) {
    36         System.out.println("storm spout ack id = "+id);
    37     }
    38 
    39     @Override
    40     public void fail(Object id) {
    41     }
    42 
    43     @Override
    44     public void declareOutputFields(OutputFieldsDeclarer declarer) {
    45         declarer.declare(new Fields("word"));
    46     }
    47 
    48 }

    4.本地运行

    在idea中直接点击运行,观察运行过程

    5.集群运行

    将程序打包成jar,然后放到集群中运行。

    把jar包放到storm/test目录下,执行:

    bin/storm jar test/storm-sample.jar com.bigdata.storm.MyTest WordCount
  • 相关阅读:
    Python之路_Day6
    正则表达式
    tensorflow 使用 5 mnist 数据集, softmax 函数
    数据分析 大数据之路 六 matplotlib 绘图工具
    tensorflow 使用 4 非线性回归
    tensorflow 使用 3 模型学习
    数据分析 大数据之路 五 pandas 报表
    tensorflow 使用 2 Felch ,Feed
    tensorflow 使用 1 常量,变量
    数据分析 大数据之路 四 numpy 2
  • 原文地址:https://www.cnblogs.com/one--way/p/5781022.html
Copyright © 2011-2022 走看看