zoukankan      html  css  js  c++  java
  • 【Storm】storm安装、配置、使用以及Storm单词计数程序的实例分析

    前言:阅读笔记

    storm和hadoop集群非常像。hadoop执行mr。storm执行topologies。
    mr和topologies最关键的不同点是:mr执行终于会结束,而topologies永远执行直到你kill。

    storm集群有两种节点:master和worker。
    master执行一个后台进程Nimbus,和hadoop的jobtracker相似。
    Nimbus负责在集群中分发代码。为工作节点分配任务,并监控故障。

    worker执行一个后台进程Supervisor。

    supervisor监听分配来的任务,启动和停止worker进程去处理nimbus分配来的任务。
    每一个worker进程执行拓扑的一个子集;一个执行的拓扑结构由非常多分布在不同机器的worker进程构成。

    全部nimbus和supervisor之间的协调工作是有zk集群来做的。
    此外。nimbus和supervisor是fail-fast和stateless;全部状态保存在zk或者本地磁盘。
    守护进程能够是无状态的并且失效或重新启动时不会影响整个系统的健康。

    执行storm
    storm jar all-my-code.jar backtype.storm.MyTopology arg1 arg2
    storm jar负责连接nimbus而且上传jar。
    原始主要的storm提供了spouts和bolts做流转换。

    spouts和bolts是执行应用逻辑要实现的接口。

    spout是流的源。读进数据并以流的形式发送出去;
    bolt消费输入的流,处理或者以新的流发送出去。

    Storm会自己主动又一次分配失败的任务,而且storm保证不会有数据丢失。即使机器宕机

    下载安装

    http://storm.apache.org/downloads.html

    1、依赖安装

    yum install uuid -y
    yum install e2fsprogs -y
    yum install libuuid*
    yum install libtool -y
    yum install *c++* -y
    yum install git -y

    2、zk集群

    http://blog.csdn.net/simonchi/article/details/43019401

    3、zeromq&jzmq

    tar -xzvf zeromq-4.0.5.tar.gz
    ./autogen.sh
    ./configure && make && make install
    jzmq
    git clone git://github.com/nathanmarz/jzmq.git
    ./autogen.sh
    ./configure && make && make install

    4、python

    ./configure
    make
    make install
    rm -f  /usr/bin/python
    ln /usr/local/bin/python3.4 /usr/bin/python
    python -V
    vi /usr/bin/yum
    #!/usr/bin/python 改为 #!/usr/bin/python2.4

    配置执行

    storm.yaml

    storm.zookeeper.servers:
         - 192.168.11.176
         - 192.168.11.177
         - 192.168.11.178
    storm.zookeeper.port: 2181
    nimbus.host: "192.168.11.176"
    storm.local.dir: "/home/storm/workdir"
    supervisor.slots.ports:
      - 6700
      - 6701
      - 6702
      - 6703

    1、Nimbus: 在Storm主控节点上执行"bin/storm nimbus >/dev/null 2>&1 &"启动Nimbus后台程序,并放到后台执行。
    2、Supervisor: 在Storm各个工作节点上执行"bin/storm supervisor >/dev/null 2>&1 &"启动Supervisor后台程序,并放到后台执行;
    3、UI: 在Storm主控节点上执行"bin/storm ui >/dev/null 2>&1 &"启动UI后台程序。并放到后台执行,启动后能够通过http://{nimbus host}:8080观察集群的worker资源使用情况、Topologies的执行状态等信息。

    单词计数程序

    Spout

    package com.cmcc.chiwei.storm;
    
    import java.io.BufferedReader;
    import java.io.FileNotFoundException;
    import java.io.FileReader;
    import java.util.Map;
    import backtype.storm.spout.SpoutOutputCollector;
    import backtype.storm.task.TopologyContext;
    import backtype.storm.topology.OutputFieldsDeclarer;
    import backtype.storm.topology.base.BaseRichSpout;
    import backtype.storm.tuple.Fields;
    import backtype.storm.tuple.Values;
    
    public class WordReader extends BaseRichSpout {
    
    	private static final long serialVersionUID = 1L;
    	private SpoutOutputCollector collector;
    	private FileReader fileReader;
    	private boolean completed = false;
    	public boolean isDistributed() {
    		return false;
    	}
    	public void ack(Object msgId) {
    		System.out.println("OK:"+msgId);
    	}
    	public void close() {}
    	public void fail(Object msgId) {
    		System.out.println("FAIL:"+msgId);
    	}
    
    	/**
    	 * The only thing that the methods will do It is emit each 
    	 * file line
    	 */
    	public void nextTuple() {
    		if(completed){
    			try {
    				Thread.sleep(1000);
    			} catch (InterruptedException e) {
    				//Do nothing
    			}
    			return;
    		}
    		String str;
    		//Open the reader
    		BufferedReader reader = new BufferedReader(fileReader);
    		try{
    			//Read all lines
    			while((str = reader.readLine()) != null){
    				/**
    				 * By each line emmit a new value with the line as a their
    				 */
    				this.collector.emit(new Values(str),str);
    			}
    		}catch(Exception e){
    			throw new RuntimeException("Error reading tuple",e);
    		}finally{
    			completed = true;
    		}
    	}
    
    	/**
    	 * We will create the file and get the collector object
    	 */
    	public void open(Map conf, TopologyContext context,
    			SpoutOutputCollector collector) {
    		try {
    			this.fileReader = new FileReader(conf.get("wordsFile").toString());
    		} catch (FileNotFoundException e) {
    			throw new RuntimeException("Error reading file ["+conf.get("wordFile")+"]");
    		}
    		this.collector = collector;
    	}
    
    	/**
    	 * Declare the output field "word"
    	 */
    	public void declareOutputFields(OutputFieldsDeclarer declarer) {
    		declarer.declare(new Fields("line"));
    	}
    }
    

    Bolt1

    package com.cmcc.chiwei.storm;
    
    import backtype.storm.topology.BasicOutputCollector;
    import backtype.storm.topology.OutputFieldsDeclarer;
    import backtype.storm.topology.base.BaseBasicBolt;
    import backtype.storm.tuple.Fields;
    import backtype.storm.tuple.Tuple;
    import backtype.storm.tuple.Values;
    
    public class WordNormalizer extends BaseBasicBolt {
    
    	private static final long serialVersionUID = 1L;
    
    	public void cleanup() {}
    
    	/**
    	 * The bolt will receive the line from the
    	 * words file and process it to Normalize this line
    	 * 
    	 * The normalize will be put the words in lower case
    	 * and split the line to get all words in this 
    	 */
    	public void execute(Tuple input, BasicOutputCollector collector) {
            String sentence = input.getString(0);
            String[] words = sentence.split(" ");
            for(String word : words){
                word = word.trim();
                if(!word.isEmpty()){
                    word = word.toLowerCase();
                    collector.emit(new Values(word));
                }
            }
    	}
    	
    
    	/**
    	 * The bolt will only emit the field "word" 
    	 */
    	public void declareOutputFields(OutputFieldsDeclarer declarer) {
    		declarer.declare(new Fields("word"));
    	}
    }
    

    Bolt2

    package com.cmcc.chiwei.storm;
    
    import java.util.HashMap;
    import java.util.Map;
    
    import backtype.storm.task.TopologyContext;
    import backtype.storm.topology.BasicOutputCollector;
    import backtype.storm.topology.OutputFieldsDeclarer;
    import backtype.storm.topology.base.BaseBasicBolt;
    import backtype.storm.tuple.Tuple;
    
    public class WordCounter extends BaseBasicBolt {
    
    	private static final long serialVersionUID = 1L;
    	Integer id;
    	String name;
    	Map<String, Integer> counters;
    
    	/**
    	 * At the end of the spout (when the cluster is shutdown
    	 * We will show the word counters
    	 */
    	@Override
    	public void cleanup() {
    		System.out.println("-- Word Counter ["+name+"-"+id+"] --");
    		for(Map.Entry<String, Integer> entry : counters.entrySet()){
    			System.out.println(entry.getKey()+": "+entry.getValue());
    		}
    	}
    
    	/**
    	 * On create 
    	 */
    	@Override
    	public void prepare(Map stormConf, TopologyContext context) {
    		this.counters = new HashMap<String, Integer>();
    		this.name = context.getThisComponentId();
    		this.id = context.getThisTaskId();
    	}
    
    	public void declareOutputFields(OutputFieldsDeclarer declarer) {}
    
    
    	public void execute(Tuple input, BasicOutputCollector collector) {
    		String str = input.getString(0);
    		if(!counters.containsKey(str)){
    			counters.put(str, 1);
    		}else{
    			Integer c = counters.get(str) + 1;
    			counters.put(str, c);
    		}
    	}
    }
    

    Topology

    package com.cmcc.chiwei.storm;
    
    import backtype.storm.Config;
    import backtype.storm.LocalCluster;
    import backtype.storm.topology.TopologyBuilder;
    import backtype.storm.tuple.Fields;
    
    
    public class TopologyMain {
    	public static void main(String[] args) throws InterruptedException {
             
            //Topology创建拓扑,安排storm各个节点以及它们交换数据的方式
    		TopologyBuilder builder = new TopologyBuilder();
    		builder.setSpout("word-reader",new WordReader());
    		builder.setBolt("word-normalizer", new WordNormalizer())
    			.shuffleGrouping("word-reader");
    		builder.setBolt("word-counter", new WordCounter(),1)
    			.fieldsGrouping("word-normalizer", new Fields("word"));
    		
            //Configuration
    		Config conf = new Config();
    		conf.put("wordsFile", args[0]);
    		conf.setDebug(false);
            //Topology run
    		conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
    		LocalCluster cluster = new LocalCluster();
    		cluster.submitTopology("Getting-Started-Topology", conf, builder.createTopology());
    		Thread.sleep(2000);
    		cluster.shutdown();
    	}
    }
    

    words.txt

    hello
    world
    storm flume hadoop hdfs
    what's wrong flume ?
    what's up hdfs ?
    Hi,storm,what are you doing ?

    执行结果

    OK:hello
    OK:world
    OK:storm flume hadoop hdfs
    OK:what's wrong flume ?

    OK:what's up hdfs ? OK:Hi,storm,what are you doing ?


    -- Word Counter [word-counter-2] --
    what's: 2
    flume: 2
    hdfs: 2
    you: 1
    storm: 1
    up: 1
    hello: 1
    hadoop: 1
    hi,storm,what: 1
    are: 1
    doing: 1
    wrong: 1
    ?

    : 3 world: 1

    分析内容:

    spout
    读取原始数据,为bolt提供数据。

    bolt
    从spout或其他bolt接收数据并处理。处理结果可作为其他bolt的数据源或终于结果。

    nimbus
    主节点的守护进程。负责为工作节点分发任务。


    topology
    拓扑结构。storm的一个任务单元。

    define fields定义域,由spout和bolt提供。被bolt接收。
    一个storm集群就是在一连串的bolt之间转换spout传过来的数据。
    如:
    spout读到一行文本,文本行传给一个bolt。按单词分割后传给还有一个bolt,第二个bolt做计数累加。

    Spout

    open --> nextTuple

    Bolt1

    declareOutputFields --> execute

    Bolt2

    prepare --> execute --> cleanup


    更具体的内容,将在兴许慢慢解说,我也在研究中。

    。。

    。。


    望各位不吝不吝赐教!。

  • 相关阅读:
    如何在 ASP.NET 中(服务器端)主动清除(HTTP内容响应时)浏览器中的 Cookies 数据
    修复 dji spark 的 micro sd/tf 存储卡里不能正常播放的视频文件
    在 Windows 7 中安装 .NET Framework 时遇到错误:无法建立到信任根颁发机构的证书链
    【转】在 Windows 10 下,配置 Kinect v2 可用于 Windows Hello 验证身份
    安装SQL Server提示“等待数据库引擎恢复句柄失败”
    [转]如何禁止 IIS 在 C:WindowsSystem32LogFilesHTTPERR 中生成日志文件
    Kinect v2 记录
    处理 ASP.NET 中的异常:无法在发送 HTTP 标头之后进行重定向。
    在 Windows Server 2008 中部署带 SignalR 的网站出错
    ( ̄▽ ̄)" 关于河北ETC记账卡的默认密码
  • 原文地址:https://www.cnblogs.com/yfceshi/p/7088828.html
Copyright © 2011-2022 走看看