zoukankan      html  css  js  c++  java
  • storm 随机发送字符串

    Storm的程序叫做Topology,类似MapReduce job

    一个Topolog应该有Spout,代表数据源,和若干个bolt

    首先写一个Spout

    public class RandomSpout extends BaseRichSpout {
    	
    	private String[] names = new String[]{"zhangsan", "lisi", "wangwu", "zhaoliu"};
    
    	private SpoutOutputCollector collector;
    	/**
    	 * 初始化方法
    	 */
    	@Override
    	public void open(Map conf, TopologyContext context,
    			SpoutOutputCollector collector) {
    		
    		this.collector = collector;
    	}
    
    	/**
    	 * 发射一个Tuple到Topology都是通过这个方法来实现的
    	 */
    	@Override
    	public void nextTuple() {		
    		this.collector.emit(new Values(names[new Random().nextInt(names.length)]));
    	}
    
    	/**
    	 * 此方法用于声明当前Spout的Tuple发送流, 声明tuple中的字段的名称
    	 */
    	@Override
    	public void declareOutputFields(OutputFieldsDeclarer declarer) {
    		declarer.declare(new Fields("name"));
    	}
    
    	
    }
    

    写一个字符串大写的bolt

    public class UpperBolt extends BaseBasicBolt{
    
    	/**
    	 * bolt的核心方法,主要负责对数据进行处理 
    	 */
    	@Override
    	public void execute(Tuple input, BasicOutputCollector collector) {
    		collector.emit(new Values(input.getString(0).toUpperCase()));
    	}
    
    	@Override
    	public void declareOutputFields(OutputFieldsDeclarer declarer) {
    		declarer.declare(new Fields("name"));
    	}
    }
    

    写一个输出到文件的bolt

    public class ExtBolt extends BaseBasicBolt{
    	FileWriter fileWriter = null;
    
    	@Override
    	public void prepare(Map stormConf, TopologyContext context) {
    		// /home/hemingliang
    		File file = new File("/home/hadoop/stormoutput/"
    				+ UUID.randomUUID());
    		try {
    			fileWriter = new FileWriter(file);
    		} catch (IOException e) {
    			e.printStackTrace();
    		}
    	}
    	
     
    	
    	@Override
    	public void execute(Tuple input, BasicOutputCollector collector) {
    		String name = input.getString(0);
    		name = name + "_
    ";
    		try {
    			fileWriter.write(name);
    			fileWriter.flush();
    		} catch (IOException e) {
    			e.printStackTrace();
    		}
    	}
    
    	@Override
    	public void declareOutputFields(OutputFieldsDeclarer declarer) {
    		
    	}
    	
    	@Override
    	public void cleanup() {
    		try {
    			fileWriter.close();
    		} catch (IOException e) {
    			e.printStackTrace();
    		}
    	}
    	
    }
    

     

    打成jar包,比如为 storm.jar

    stormoutput目录应该存在,不然看不到数据

    执行 storm.jar : ./storm jar ~/storm.jar com.storm.RunTopology

    列出所有的Topology :  ./storm list

    接触一个Topology  : ./storm kill topology名称

  • 相关阅读:
    微信助力活动表结构设计
    mysql的in查询分析
    PHP从数组中找到指定元素的位置
    Java程序使用Alpine Linux报错java.lang.NoClassDefFoundError: Could not initialize class org.xerial.snappy.Snappy解决
    Eclipse/Idea 代码格式化部分忽略
    修改Linux桌面高分屏下QT程序界面的缩放
    折腾linux随笔 之 关闭Budgie默认自动隐藏应用的菜单栏 与 Gnome系桌面应用菜单无内容解决
    Debian Buster 使用Lxde在界面中打开url提示错误解决
    Portainer容器可视化管理工具使用文档
    Lxde添加触摸板双击功能、防误触
  • 原文地址:https://www.cnblogs.com/heml/p/6054183.html
Copyright © 2011-2022 走看看