zoukankan      html  css  js  c++  java
  • storm 随机发送字符串

    Storm的程序叫做Topology,类似MapReduce job

    一个Topolog应该有Spout,代表数据源,和若干个bolt

    首先写一个Spout

    public class RandomSpout extends BaseRichSpout {
    	
    	private String[] names = new String[]{"zhangsan", "lisi", "wangwu", "zhaoliu"};
    
    	private SpoutOutputCollector collector;
    	/**
    	 * 初始化方法
    	 */
    	@Override
    	public void open(Map conf, TopologyContext context,
    			SpoutOutputCollector collector) {
    		
    		this.collector = collector;
    	}
    
    	/**
    	 * 发射一个Tuple到Topology都是通过这个方法来实现的
    	 */
    	@Override
    	public void nextTuple() {		
    		this.collector.emit(new Values(names[new Random().nextInt(names.length)]));
    	}
    
    	/**
    	 * 此方法用于声明当前Spout的Tuple发送流, 声明tuple中的字段的名称
    	 */
    	@Override
    	public void declareOutputFields(OutputFieldsDeclarer declarer) {
    		declarer.declare(new Fields("name"));
    	}
    
    	
    }
    

    写一个字符串大写的bolt

    public class UpperBolt extends BaseBasicBolt{
    
    	/**
    	 * bolt的核心方法,主要负责对数据进行处理 
    	 */
    	@Override
    	public void execute(Tuple input, BasicOutputCollector collector) {
    		collector.emit(new Values(input.getString(0).toUpperCase()));
    	}
    
    	@Override
    	public void declareOutputFields(OutputFieldsDeclarer declarer) {
    		declarer.declare(new Fields("name"));
    	}
    }
    

    写一个输出到文件的bolt

    public class ExtBolt extends BaseBasicBolt{
    	FileWriter fileWriter = null;
    
    	@Override
    	public void prepare(Map stormConf, TopologyContext context) {
    		// /home/hemingliang
    		File file = new File("/home/hadoop/stormoutput/"
    				+ UUID.randomUUID());
    		try {
    			fileWriter = new FileWriter(file);
    		} catch (IOException e) {
    			e.printStackTrace();
    		}
    	}
    	
     
    	
    	@Override
    	public void execute(Tuple input, BasicOutputCollector collector) {
    		String name = input.getString(0);
    		name = name + "_
    ";
    		try {
    			fileWriter.write(name);
    			fileWriter.flush();
    		} catch (IOException e) {
    			e.printStackTrace();
    		}
    	}
    
    	@Override
    	public void declareOutputFields(OutputFieldsDeclarer declarer) {
    		
    	}
    	
    	@Override
    	public void cleanup() {
    		try {
    			fileWriter.close();
    		} catch (IOException e) {
    			e.printStackTrace();
    		}
    	}
    	
    }
    

     

    打成jar包,比如为 storm.jar

    stormoutput目录应该存在,不然看不到数据

    执行 storm.jar : ./storm jar ~/storm.jar com.storm.RunTopology

    列出所有的Topology :  ./storm list

    接触一个Topology  : ./storm kill topology名称

  • 相关阅读:
    安装lnmp 时如何修改数据库数据存储地址及默认访问地址
    ubuntu 设置root用户密码并实现root用户登录
    解决ubuntu 远程连接问题
    linux 搭建FTP服务器
    PHP 根据ip获取对应的实际地址
    如何发布自己的composer包
    使用composer安装composer包报Your requirements could not be resolved to an installable set of packages
    laravel 框架配置404等异常页面
    使用Xshell登录linux服务器报WARNING! The remote SSH server rejected X11 forwarding request
    IoTSharp 已支持国产松果时序数据库PinusDB
  • 原文地址:https://www.cnblogs.com/heml/p/6054183.html
Copyright © 2011-2022 走看看