zoukankan      html  css  js  c++  java
  • storm 随机发送字符串

    Storm的程序叫做Topology,类似MapReduce job

    一个Topolog应该有Spout,代表数据源,和若干个bolt

    首先写一个Spout

    public class RandomSpout extends BaseRichSpout {
    	
    	private String[] names = new String[]{"zhangsan", "lisi", "wangwu", "zhaoliu"};
    
    	private SpoutOutputCollector collector;
    	/**
    	 * 初始化方法
    	 */
    	@Override
    	public void open(Map conf, TopologyContext context,
    			SpoutOutputCollector collector) {
    		
    		this.collector = collector;
    	}
    
    	/**
    	 * 发射一个Tuple到Topology都是通过这个方法来实现的
    	 */
    	@Override
    	public void nextTuple() {		
    		this.collector.emit(new Values(names[new Random().nextInt(names.length)]));
    	}
    
    	/**
    	 * 此方法用于声明当前Spout的Tuple发送流, 声明tuple中的字段的名称
    	 */
    	@Override
    	public void declareOutputFields(OutputFieldsDeclarer declarer) {
    		declarer.declare(new Fields("name"));
    	}
    
    	
    }
    

    写一个字符串大写的bolt

    public class UpperBolt extends BaseBasicBolt{
    
    	/**
    	 * bolt的核心方法,主要负责对数据进行处理 
    	 */
    	@Override
    	public void execute(Tuple input, BasicOutputCollector collector) {
    		collector.emit(new Values(input.getString(0).toUpperCase()));
    	}
    
    	@Override
    	public void declareOutputFields(OutputFieldsDeclarer declarer) {
    		declarer.declare(new Fields("name"));
    	}
    }
    

    写一个输出到文件的bolt

    public class ExtBolt extends BaseBasicBolt{
    	FileWriter fileWriter = null;
    
    	@Override
    	public void prepare(Map stormConf, TopologyContext context) {
    		// /home/hemingliang
    		File file = new File("/home/hadoop/stormoutput/"
    				+ UUID.randomUUID());
    		try {
    			fileWriter = new FileWriter(file);
    		} catch (IOException e) {
    			e.printStackTrace();
    		}
    	}
    	
     
    	
    	@Override
    	public void execute(Tuple input, BasicOutputCollector collector) {
    		String name = input.getString(0);
    		name = name + "_
    ";
    		try {
    			fileWriter.write(name);
    			fileWriter.flush();
    		} catch (IOException e) {
    			e.printStackTrace();
    		}
    	}
    
    	@Override
    	public void declareOutputFields(OutputFieldsDeclarer declarer) {
    		
    	}
    	
    	@Override
    	public void cleanup() {
    		try {
    			fileWriter.close();
    		} catch (IOException e) {
    			e.printStackTrace();
    		}
    	}
    	
    }
    

     

    打成jar包,比如为 storm.jar

    stormoutput目录应该存在,不然看不到数据

    执行 storm.jar : ./storm jar ~/storm.jar com.storm.RunTopology

    列出所有的Topology :  ./storm list

    接触一个Topology  : ./storm kill topology名称

  • 相关阅读:
    Asp.Net Core 项目从 1.0.1 升级到 1.1.0 的小补丁
    C# 正则表达式小坑 -- not enough
    RK 61 键盘 Ubuntu 下键位映射修改方案
    编程急转弯
    ASP.NET Core 使用 AutoFac 注入 DbContext
    分享一个微软风格的博客园主题
    EntityFramework Core 学习笔记 —— 添加主键约束
    NYOJ 69 数的长度(数学)
    NYOJ 67 三角形面积(线代,数学)
    NYOJ 66 分数拆分
  • 原文地址:https://www.cnblogs.com/heml/p/6054183.html
Copyright © 2011-2022 走看看