zoukankan      html  css  js  c++  java
  • storm WC 2 (根据日志)

    1.产生虚拟日志

    package les7.readFileTopo;
    
    import java.io.File;
    import java.io.FileOutputStream;
    import java.io.IOException;
    import java.util.Random;
    
    public class GetData {
    
        /**
         * @param args
         */
        public static void main(String[] args) {
            File logFile = new File("track.log");
            Random random = new Random();
    
            String[] hosts = { "movie information" };
            String[] session_id = { "ABYH6Y4V4SCVXTG6DPB4VH9U12", "XXYH6YCGFJYERTT834R52FDXV9U34", "BBYH61456FGHHJ7JL89RG5VV9UYU7",
                    "CYYH6Y2345GHI899OFG4V9U567", "VVVYH6Y4V4SFXZ56JIPDPB4V678" };
            String[] time = { "2019-03-07 08:40:50", "2019-03-07 08:40:51", "2019-03-07 08:40:52", "2019-03-07 08:40:53",
                    "2019-03-07 09:40:49", "2019-03-07 10:40:49", "2019-03-07 11:40:49", "2019-03-07 12:40:49" };
            
            StringBuffer sbBuffer = new StringBuffer() ;
            for (int i = 0; i < 5000; i++) {
                sbBuffer.append(hosts[0]+"	"+session_id[random.nextInt(5)]+"	"+time[random.nextInt(8)]+"
    ");
            }
            if(! logFile.exists())
            {
                try {
                    logFile.createNewFile();
                } catch (IOException e) {
                    System.out.println("Create logFile fail !");
                }
            }
            byte[] b = (sbBuffer.toString()).getBytes();
            
            FileOutputStream fs;
            try {
                fs = new FileOutputStream(logFile);
                fs.write(b);
                fs.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
    }
    View Code

    2.spout自定义数据流入拓扑逻辑

    package les7.readFileTopo;
    
    import java.io.BufferedReader;
    import java.io.FileInputStream;
    import java.io.InputStreamReader;
    import java.util.Map;
    
    import org.apache.storm.spout.SpoutOutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.IRichSpout;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Values;
    
    public class ReadFileSpout implements IRichSpout{
    
        /**
         * 
         */
        private static final long serialVersionUID = 1L;
        FileInputStream fis;
        InputStreamReader isr;
        BufferedReader br;            
    
        SpoutOutputCollector collector = null;
        
        
        @Override
        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
            // 初始化方法
            try {
                this.collector = collector;
                this.fis = new FileInputStream("track.log");
                this.isr = new InputStreamReader(fis, "UTF-8");
                this.br = new BufferedReader(isr);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        @Override
        public void close() {
            // TODO 关闭Topo
            
        }
    
        @Override
        public void activate() {
            // TODO 激活Topo
            
        }
    
        @Override
        public void deactivate() {
            // TODO 停用Topo
            
        }
        String str = null;
        String[] str01=null;
        @Override
        public void nextTuple() {
            // TODO 核心方法,死循环,获取外部Touple,emit到下一级组件
            try {
                while ((str = this.br.readLine()) != null) {
                    //                // 过滤动作
                    //
                                    str01=str.split("	");
                                    collector.emit(new Values(str));
                    //
                                    Thread.sleep(3);
                    //                //to do
                }
            } catch (Exception e) {
                // TODO: handle exception
            }
        }
    
        @Override
        public void ack(Object msgId) {
            // TODO 如果开启Acker,成功执行Tuple后会回调该4方法,告知Storm框架该Tuple已经被成功执行。
            
        }
    
        @Override
        public void fail(Object msgId) {
            // TODO 如果开启Acker,当失败执行Tuple后会回调该方法,告知Storm框架该Tuple已经被执行失败。
            // 以便我们手工编码实现失败重发,并控制重发次数。
            
        }
    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            // TODO 定义输出的列名
            declarer.declare(new Fields("log"));
        }
    
        @Override
        public Map<String, Object> getComponentConfiguration() {
            // TODO 可以在代码里设置一下属性。该方法基本是废弃不用的。
            return null;
        }
        
    
    }
    View Code

    3.bolt处理逻辑

    package les7.readFileTopo;
    
    import java.util.Map;
    
    import org.apache.storm.task.OutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.IRichBolt;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Tuple;
    import org.apache.storm.tuple.Values;
    
    public class FileBolt implements IRichBolt {
    
        /**
         * 
         */
        private static final long serialVersionUID = 1L;
    
        OutputCollector collector = null;
        @Override
        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
            // TODO 初始化函数
    
            this.collector = collector;
        }
    
        Integer num = 0;
        String[] words;
        @Override
        public void execute(Tuple tuple) {
            // TODO 死循环,核心方法,处理业务逻辑
            String value =tuple.getString(0);
            //分词
            String[] words = value.split("	");
    
            //输出
            for(String w:words){
                collector.emit(new Values(w,1));
            }
    
            }
    
    
        @Override
        public void cleanup() {
            // TODO 销毁方法,基本不用4
    
        }
    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            // TODO 定义输出列名
    
            declarer.declare(new Fields("word","count"));
        }
    
        @Override
        public Map<String, Object> getComponentConfiguration() {
            // TODO Auto-generated method stub
            return null;
        }
    
    }
    View Code

    4.bolt输出逻辑

    package les7.readFileTopo;
    
    import java.util.HashMap;
    import java.util.Map;
    
    import org.apache.storm.task.OutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.IRichBolt;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.tuple.Tuple;
    import org.apache.storm.tuple.Values;
    
    import static org.apache.htrace.Tracer.LOG;
    
    public class PrintBolt implements IRichBolt {
    
        /**
         * 
         */
        private static final long serialVersionUID = 1L;
        private Map<String, Integer> result = new HashMap<>();
        private OutputCollector collector;
    
        @Override
        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
            // TODO Auto-generated method stub
            this.collector = collector;
    
        }
    
        @Override
        public void execute(Tuple tuple) {
    
            String word = tuple.getStringByField("word");
            int count = tuple.getIntegerByField("count");
    
            //求和
            if(result.containsKey(word)){
                //如果已经存在,累加
                int total = result.get(word);
                result.put(word, total+count);
            }else{
                //这是一个新单词
                result.put(word, count);
            }
    
            //输出到屏幕
            System.out.println("统计的结果是:" + result);
    
            //输出给下一个组件                                               单词           总频率
            this.collector.emit(new Values(word,result.get(word)));
        }
    
        @Override
        public void cleanup() {
            // TODO Auto-generated method stub
    
        }
    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            // TODO Auto-generated method stub
    
        }
    
        @Override
        public Map<String, Object> getComponentConfiguration() {
            // TODO Auto-generated method stub
            return null;
        }
    
    }
    View Code

    5.书写拓扑逻辑代码

    package les7.readFileTopo;
    
    import org.apache.storm.Config;
    import org.apache.storm.LocalCluster;
    import org.apache.storm.StormSubmitter;
    import org.apache.storm.topology.TopologyBuilder;
    
    //展示信息数据
    
    public class FileCountTopo {
    
        public static void main(String[] args) {
            // TODO Auto-generated method stub
    
            TopologyBuilder builder = new TopologyBuilder();
            
            builder.setSpout("spout", new ReadFileSpout(),1) ;
            builder.setBolt("b1", new FileBolt(),2).shuffleGrouping("spout");
            builder.setBolt("PrintBolt", new PrintBolt(),1).shuffleGrouping("b1");
            
            Config conf = new Config();
            conf.setDebug(true);
    
            if (args.length > 0) {
                try {
                    //提交到集群
                    StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }else {
                //本地模式提交
                LocalCluster localCluster = new LocalCluster();
                localCluster.submitTopology("mytopology", conf, builder.createTopology());
            }
            
            
            
            
        }
    
    }
    View Code
    RUSH B
  • 相关阅读:
    无限维
    黎曼流形
    why we need virtual key word
    TOJ 4119 Split Equally
    TOJ 4003 Next Permutation
    TOJ 4002 Palindrome Generator
    TOJ 2749 Absent Substrings
    TOJ 2641 Gene
    TOJ 2861 Octal Fractions
    TOJ 4394 Rebuild Road
  • 原文地址:https://www.cnblogs.com/tangsonghuai/p/11169032.html
Copyright © 2011-2022 走看看