zoukankan      html  css  js  c++  java
  • storm 随机发送字符串

    Storm的程序叫做Topology,类似MapReduce job

    一个Topolog应该有Spout,代表数据源,和若干个bolt

    首先写一个Spout

    public class RandomSpout extends BaseRichSpout {
    	
    	private String[] names = new String[]{"zhangsan", "lisi", "wangwu", "zhaoliu"};
    
    	private SpoutOutputCollector collector;
    	/**
    	 * 初始化方法
    	 */
    	@Override
    	public void open(Map conf, TopologyContext context,
    			SpoutOutputCollector collector) {
    		
    		this.collector = collector;
    	}
    
    	/**
    	 * 发射一个Tuple到Topology都是通过这个方法来实现的
    	 */
    	@Override
    	public void nextTuple() {		
    		this.collector.emit(new Values(names[new Random().nextInt(names.length)]));
    	}
    
    	/**
    	 * 此方法用于声明当前Spout的Tuple发送流, 声明tuple中的字段的名称
    	 */
    	@Override
    	public void declareOutputFields(OutputFieldsDeclarer declarer) {
    		declarer.declare(new Fields("name"));
    	}
    
    	
    }
    

    写一个字符串大写的bolt

    public class UpperBolt extends BaseBasicBolt{
    
    	/**
    	 * bolt的核心方法,主要负责对数据进行处理 
    	 */
    	@Override
    	public void execute(Tuple input, BasicOutputCollector collector) {
    		collector.emit(new Values(input.getString(0).toUpperCase()));
    	}
    
    	@Override
    	public void declareOutputFields(OutputFieldsDeclarer declarer) {
    		declarer.declare(new Fields("name"));
    	}
    }
    

    写一个输出到文件的bolt

    public class ExtBolt extends BaseBasicBolt{
    	FileWriter fileWriter = null;
    
    	@Override
    	public void prepare(Map stormConf, TopologyContext context) {
    		// /home/hemingliang
    		File file = new File("/home/hadoop/stormoutput/"
    				+ UUID.randomUUID());
    		try {
    			fileWriter = new FileWriter(file);
    		} catch (IOException e) {
    			e.printStackTrace();
    		}
    	}
    	
     
    	
    	@Override
    	public void execute(Tuple input, BasicOutputCollector collector) {
    		String name = input.getString(0);
    		name = name + "_
    ";
    		try {
    			fileWriter.write(name);
    			fileWriter.flush();
    		} catch (IOException e) {
    			e.printStackTrace();
    		}
    	}
    
    	@Override
    	public void declareOutputFields(OutputFieldsDeclarer declarer) {
    		
    	}
    	
    	@Override
    	public void cleanup() {
    		try {
    			fileWriter.close();
    		} catch (IOException e) {
    			e.printStackTrace();
    		}
    	}
    	
    }
    

     

    打成jar包,比如为 storm.jar

    stormoutput目录应该存在,不然看不到数据

    执行 storm.jar : ./storm jar ~/storm.jar com.storm.RunTopology

    列出所有的Topology :  ./storm list

    接触一个Topology  : ./storm kill topology名称

  • 相关阅读:
    Asp.net中利用ExecuteNonQuery()执行存储过程返回1解决方案
    在workflow中,无法为实例 ID“...”传递接口类型“...”上的事件“...” 问题的解决方法。
    jquery获取一个table中的一行的每个td的内容
    自动ping博客服务程序
    总结JavaScript(Iframe、window.open、window.showModalDialog)父窗口与子窗口之间的操作
    sql server try...catch使用
    IAsyncResult异步设计
    jquery 判断一个对象是否存在
    数据库表扩展字段设计思路
    REST接口POST方法发送文件到服务器(C#)
  • 原文地址:https://www.cnblogs.com/heml/p/6054183.html
Copyright © 2011-2022 走看看