zoukankan      html  css  js  c++  java
  • storm trident的filter和函数

    目的:通过kafka输出的信息进行过滤,添加指定的字段后,进行打印

    SentenceSpout:

    package Trident;
    
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.Properties;
    
    import org.apache.storm.spout.SpoutOutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseRichSpout;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Values;
    
    import kafka.consumer.Consumer;
    import kafka.consumer.ConsumerConfig;
    import kafka.consumer.ConsumerIterator;
    import kafka.consumer.KafkaStream;
    import kafka.javaapi.consumer.ConsumerConnector;
    import kafka.serializer.StringDecoder;
    import kafka.utils.VerifiableProperties;
    
    /**
     * 从kafka获取数据 spout发射
     * @author BFD-593
     *
     */
    public class SentenceSpout extends BaseRichSpout{
    	//TODO
    	private SpoutOutputCollector collector;
    	private ConsumerConnector consumer;
    	private int index=0;
    	@Override
    	public void nextTuple() {
    		Map<String, Integer> topicCountMap = new HashMap<String, Integer>();  
            topicCountMap.put("helloworld", new Integer(1));  
      
            StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());  
            StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties());  
            Map<String, List<KafkaStream<String, String>>> consumerMap =   
            consumer.createMessageStreams(topicCountMap,keyDecoder,valueDecoder);  
            KafkaStream<String, String> stream = consumerMap.get("helloworld").get(0);  
            ConsumerIterator<String, String> it = stream.iterator();  
             
            int messageCount = 0;  
            while (it.hasNext()){  
            	String string = it.next().message().toString()+" 1"+" 2";
            	String name = string.split(" ")[0];
            	String value = string.split(" ")[1]==null?"":string.split(" ")[1];
            	String value2= string.split(" ")[2]==null?"":string.split(" ")[2];
                this.collector.emit(new Values(name,value,value2));
            }  
    	}
    
    	@Override
    	public void open(Map map, TopologyContext context, SpoutOutputCollector collector) {
    		this.collector =  collector;
    		Properties props = new Properties();
    		 // zookeeper 配置  
            props.put("zookeeper.connect", "192.168.170.185:2181");  
      
            // 消费者所在组  
            props.put("group.id", "testgroup");  
      
            // zk连接超时  
            props.put("zookeeper.session.timeout.ms", "4000");  
            props.put("zookeeper.sync.time.ms", "200");  
            props.put("auto.commit.interval.ms", "1000");  
            props.put("auto.offset.reset", "smallest");  
              
            // 序列化类  
            props.put("serializer.class", "kafka.serializer.StringEncoder");  
      
            ConsumerConfig config = new ConsumerConfig(props);  
    		this.consumer = Consumer.createJavaConsumerConnector(config);
    	}
    
    	@Override
    	public void declareOutputFields(OutputFieldsDeclarer declarer) {
    		Fields field = new Fields("name", "sentence","sentence2");
    		declarer.declare(field);
    	}
    
    }
    

    FunctionBolt:

    	package Trident;
    	
    	import org.apache.storm.trident.operation.BaseFunction;
    	import org.apache.storm.trident.operation.TridentCollector;
    	import org.apache.storm.trident.tuple.TridentTuple;
    	import org.apache.storm.tuple.Values;
    	/**
    	 * trident的函数操作:将spout发射的数据,添加一个fileds   gender的
    	 * 它不会替换掉原来的tuple
    	 * @author BFD-593
    	 *
    	 */
    	public class FunctionBolt extends BaseFunction{
    		
    		@Override
    		public void execute(TridentTuple tuple, TridentCollector collector) {
    			String str = tuple.getStringByField("name");
    			if(str.equals("a")){
    				collector.emit(new Values("男"));
    			}else{
    				collector.emit(new Values("女"));
    			}
    		}
    	
    	}
    

    MyFilter:

    package Trident;
    
    import java.util.Map;
    
    import org.apache.storm.trident.operation.BaseFilter;
    import org.apache.storm.trident.operation.TridentOperationContext;
    import org.apache.storm.trident.tuple.TridentTuple;
    /**
     * trident的过滤操作:将spout的发送的tuple,过滤掉fields0是a并且fields1是b的tuple
     * @author BFD-593
     *
     */
    public class MyFilter extends BaseFilter{
    	private TridentOperationContext context;
    	
    	@Override
    	public void prepare(Map conf, TridentOperationContext context) {
    		super.prepare(conf, context);
    		this.context = context;
    	}
    	@Override
    	public boolean isKeep(TridentTuple tuple) {
    		String name = tuple.getStringByField("name");
    		String value = tuple.getStringByField("sentence");
    		return (!"a".equals(name))||(!"b".equals(value));
    	}
    
    }
    

    PrintFilter:

    package Trident;
    
    import java.util.Iterator;
    import java.util.Map;
    
    import org.apache.storm.trident.operation.BaseFilter;
    import org.apache.storm.trident.operation.TridentOperationContext;
    import org.apache.storm.trident.tuple.TridentTuple;
    import org.apache.storm.tuple.Fields;
    /**
     * 过滤打印所有的fields以及值 
     * @author BFD-593
     *
     */
    public class PrintFilter extends BaseFilter{
    	private TridentOperationContext context = null;
    	
    	@Override
    	public void prepare(Map conf, TridentOperationContext context) {
    		super.prepare(conf, context);
    		this.context = context;
    	}
    	
    	@Override
    	public boolean isKeep(TridentTuple tuple) {
    		Fields fields = tuple.getFields();
    		Iterator<String> iterator = fields.iterator();
    		String str = "";
    		while(iterator.hasNext()){
    			String next = iterator.next();
    			Object value = tuple.getValueByField(next);
    			str = str + next +":"+ value+",";
    		}
    		System.out.println(str);
    		return true;
    	}
    
    }
    

    TopologyTrident:

    package Trident;
    
    import org.apache.kafka.common.utils.Utils;
    import org.apache.storm.Config;
    import org.apache.storm.LocalCluster;
    import org.apache.storm.trident.TridentTopology;
    import org.apache.storm.trident.operation.builtin.Count;
    import org.apache.storm.tuple.Fields;
    /**
     * trident的过滤操作、函数操作、分驱聚合操作
     * @author BFD-593
     *
     */
    public class TopologyTrident {
    	public static void main(String[] args) {
    		SentenceSpout spout = new SentenceSpout();
    		
    		TridentTopology topology = new TridentTopology();
    		topology.newStream("spout", spout).each(new Fields("name"),new FunctionBolt(),new Fields("gender")).each(new Fields("name","sentence"), new MyFilter())
    		.each(new Fields("name","sentence","sentence2","gender"), new PrintFilter());
    		
    		Config conf = new Config();
    		
    		LocalCluster clu = new LocalCluster();
    		clu.submitTopology("mytopology", conf, topology.build());
    		
    		Utils.sleep(100000000);
    		clu.killTopology("mytopology");
    		clu.shutdown();
    		
    	}
    }
    

      

     

    package Trident;
    import java.util.HashMap;import java.util.List;import java.util.Map;import java.util.Properties;
    import org.apache.storm.spout.SpoutOutputCollector;import org.apache.storm.task.TopologyContext;import org.apache.storm.topology.OutputFieldsDeclarer;import org.apache.storm.topology.base.BaseRichSpout;import org.apache.storm.tuple.Fields;import org.apache.storm.tuple.Values;
    import kafka.consumer.Consumer;import kafka.consumer.ConsumerConfig;import kafka.consumer.ConsumerIterator;import kafka.consumer.KafkaStream;import kafka.javaapi.consumer.ConsumerConnector;import kafka.serializer.StringDecoder;import kafka.utils.VerifiableProperties;
    /** * 从kafka获取数据 spout发射 * @author BFD-593 * */public class SentenceSpout extends BaseRichSpout{//TODOprivate SpoutOutputCollector collector;private ConsumerConnector consumer;private int index=0;@Overridepublic void nextTuple() {Map<String, Integer> topicCountMap = new HashMap<String, Integer>();          topicCountMap.put("helloworld", new Integer(1));            StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());          StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties());          Map<String, List<KafkaStream<String, String>>> consumerMap =           consumer.createMessageStreams(topicCountMap,keyDecoder,valueDecoder);          KafkaStream<String, String> stream = consumerMap.get("helloworld").get(0);          ConsumerIterator<String, String> it = stream.iterator();                   int messageCount = 0;          while (it.hasNext()){          String string = it.next().message().toString()+" 1"+" 2";        String name = string.split(" ")[0];        String value = string.split(" ")[1]==null?"":string.split(" ")[1];        String value2= string.split(" ")[2]==null?"":string.split(" ")[2];            this.collector.emit(new Values(name,value,value2));        }  }
    @Overridepublic void open(Map map, TopologyContext context, SpoutOutputCollector collector) {this.collector =  collector;Properties props = new Properties(); // zookeeper 配置          props.put("zookeeper.connect", "192.168.170.185:2181");            // 消费者所在组          props.put("group.id", "testgroup");            // zk连接超时          props.put("zookeeper.session.timeout.ms", "4000");          props.put("zookeeper.sync.time.ms", "200");          props.put("auto.commit.interval.ms", "1000");          props.put("auto.offset.reset", "smallest");                    // 序列化类          props.put("serializer.class", "kafka.serializer.StringEncoder");            ConsumerConfig config = new ConsumerConfig(props);  this.consumer = Consumer.createJavaConsumerConnector(config);}
    @Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {Fields field = new Fields("name", "sentence","sentence2");declarer.declare(field);}
    }

  • 相关阅读:
    spark的环境安装
    (7)zabbix资产清单inventory管理
    (6)zabbix主机与组配置
    (5)zabbix配置详解
    (4)zabbix监控第一台服务器
    (3)zabbix用户管理
    (2)zabbix硬件需求
    (1) zabbix进程构成
    centos7系统root无法通过su切换到某个普通用户
    01基础复习
  • 原文地址:https://www.cnblogs.com/wangjing666/p/6913047.html
Copyright © 2011-2022 走看看