zoukankan      html  css  js  c++  java
  • storm WC 2 (根据日志)

    1.产生虚拟日志

    package les7.readFileTopo;
    
    import java.io.File;
    import java.io.FileOutputStream;
    import java.io.IOException;
    import java.util.Random;
    
    public class GetData {
    
        /**
         * @param args
         */
        public static void main(String[] args) {
            File logFile = new File("track.log");
            Random random = new Random();
    
            String[] hosts = { "movie information" };
            String[] session_id = { "ABYH6Y4V4SCVXTG6DPB4VH9U12", "XXYH6YCGFJYERTT834R52FDXV9U34", "BBYH61456FGHHJ7JL89RG5VV9UYU7",
                    "CYYH6Y2345GHI899OFG4V9U567", "VVVYH6Y4V4SFXZ56JIPDPB4V678" };
            String[] time = { "2019-03-07 08:40:50", "2019-03-07 08:40:51", "2019-03-07 08:40:52", "2019-03-07 08:40:53",
                    "2019-03-07 09:40:49", "2019-03-07 10:40:49", "2019-03-07 11:40:49", "2019-03-07 12:40:49" };
            
            StringBuffer sbBuffer = new StringBuffer() ;
            for (int i = 0; i < 5000; i++) {
                sbBuffer.append(hosts[0]+"	"+session_id[random.nextInt(5)]+"	"+time[random.nextInt(8)]+"
    ");
            }
            if(! logFile.exists())
            {
                try {
                    logFile.createNewFile();
                } catch (IOException e) {
                    System.out.println("Create logFile fail !");
                }
            }
            byte[] b = (sbBuffer.toString()).getBytes();
            
            FileOutputStream fs;
            try {
                fs = new FileOutputStream(logFile);
                fs.write(b);
                fs.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
    }
    View Code

    2.spout自定义数据流入拓扑逻辑

    package les7.readFileTopo;
    
    import java.io.BufferedReader;
    import java.io.FileInputStream;
    import java.io.InputStreamReader;
    import java.util.Map;
    
    import org.apache.storm.spout.SpoutOutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.IRichSpout;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Values;
    
    public class ReadFileSpout implements IRichSpout{
    
        /**
         * 
         */
        private static final long serialVersionUID = 1L;
        FileInputStream fis;
        InputStreamReader isr;
        BufferedReader br;            
    
        SpoutOutputCollector collector = null;
        
        
        @Override
        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
            // 初始化方法
            try {
                this.collector = collector;
                this.fis = new FileInputStream("track.log");
                this.isr = new InputStreamReader(fis, "UTF-8");
                this.br = new BufferedReader(isr);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        @Override
        public void close() {
            // TODO 关闭Topo
            
        }
    
        @Override
        public void activate() {
            // TODO 激活Topo
            
        }
    
        @Override
        public void deactivate() {
            // TODO 停用Topo
            
        }
        String str = null;
        String[] str01=null;
        @Override
        public void nextTuple() {
            // TODO 核心方法,死循环,获取外部Touple,emit到下一级组件
            try {
                while ((str = this.br.readLine()) != null) {
                    //                // 过滤动作
                    //
                                    str01=str.split("	");
                                    collector.emit(new Values(str));
                    //
                                    Thread.sleep(3);
                    //                //to do
                }
            } catch (Exception e) {
                // TODO: handle exception
            }
        }
    
        @Override
        public void ack(Object msgId) {
            // TODO 如果开启Acker,成功执行Tuple后会回调该4方法,告知Storm框架该Tuple已经被成功执行。
            
        }
    
        @Override
        public void fail(Object msgId) {
            // TODO 如果开启Acker,当失败执行Tuple后会回调该方法,告知Storm框架该Tuple已经被执行失败。
            // 以便我们手工编码实现失败重发,并控制重发次数。
            
        }
    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            // TODO 定义输出的列名
            declarer.declare(new Fields("log"));
        }
    
        @Override
        public Map<String, Object> getComponentConfiguration() {
            // TODO 可以在代码里设置一下属性。该方法基本是废弃不用的。
            return null;
        }
        
    
    }
    View Code

    3.bolt处理逻辑

    package les7.readFileTopo;
    
    import java.util.Map;
    
    import org.apache.storm.task.OutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.IRichBolt;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Tuple;
    import org.apache.storm.tuple.Values;
    
    public class FileBolt implements IRichBolt {
    
        /**
         * 
         */
        private static final long serialVersionUID = 1L;
    
        OutputCollector collector = null;
        @Override
        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
            // TODO 初始化函数
    
            this.collector = collector;
        }
    
        Integer num = 0;
        String[] words;
        @Override
        public void execute(Tuple tuple) {
            // TODO 死循环,核心方法,处理业务逻辑
            String value =tuple.getString(0);
            //分词
            String[] words = value.split("	");
    
            //输出
            for(String w:words){
                collector.emit(new Values(w,1));
            }
    
            }
    
    
        @Override
        public void cleanup() {
            // TODO 销毁方法,基本不用4
    
        }
    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            // TODO 定义输出列名
    
            declarer.declare(new Fields("word","count"));
        }
    
        @Override
        public Map<String, Object> getComponentConfiguration() {
            // TODO Auto-generated method stub
            return null;
        }
    
    }
    View Code

    4.bolt输出逻辑

    package les7.readFileTopo;
    
    import java.util.HashMap;
    import java.util.Map;
    
    import org.apache.storm.task.OutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.IRichBolt;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.tuple.Tuple;
    import org.apache.storm.tuple.Values;
    
    import static org.apache.htrace.Tracer.LOG;
    
    public class PrintBolt implements IRichBolt {
    
        /**
         * 
         */
        private static final long serialVersionUID = 1L;
        private Map<String, Integer> result = new HashMap<>();
        private OutputCollector collector;
    
        @Override
        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
            // TODO Auto-generated method stub
            this.collector = collector;
    
        }
    
        @Override
        public void execute(Tuple tuple) {
    
            String word = tuple.getStringByField("word");
            int count = tuple.getIntegerByField("count");
    
            //求和
            if(result.containsKey(word)){
                //如果已经存在,累加
                int total = result.get(word);
                result.put(word, total+count);
            }else{
                //这是一个新单词
                result.put(word, count);
            }
    
            //输出到屏幕
            System.out.println("统计的结果是:" + result);
    
            //输出给下一个组件                                               单词           总频率
            this.collector.emit(new Values(word,result.get(word)));
        }
    
        @Override
        public void cleanup() {
            // TODO Auto-generated method stub
    
        }
    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            // TODO Auto-generated method stub
    
        }
    
        @Override
        public Map<String, Object> getComponentConfiguration() {
            // TODO Auto-generated method stub
            return null;
        }
    
    }
    View Code

    5.书写拓扑逻辑代码

    package les7.readFileTopo;
    
    import org.apache.storm.Config;
    import org.apache.storm.LocalCluster;
    import org.apache.storm.StormSubmitter;
    import org.apache.storm.topology.TopologyBuilder;
    
    //展示信息数据
    
    public class FileCountTopo {
    
        public static void main(String[] args) {
            // TODO Auto-generated method stub
    
            TopologyBuilder builder = new TopologyBuilder();
            
            builder.setSpout("spout", new ReadFileSpout(),1) ;
            builder.setBolt("b1", new FileBolt(),2).shuffleGrouping("spout");
            builder.setBolt("PrintBolt", new PrintBolt(),1).shuffleGrouping("b1");
            
            Config conf = new Config();
            conf.setDebug(true);
    
            if (args.length > 0) {
                try {
                    //提交到集群
                    StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }else {
                //本地模式提交
                LocalCluster localCluster = new LocalCluster();
                localCluster.submitTopology("mytopology", conf, builder.createTopology());
            }
            
            
            
            
        }
    
    }
    View Code
    RUSH B
  • 相关阅读:
    委托demo
    事件demo
    数据结构与算法分析表ADT
    数据结构与算法分析栈ADT
    Access的“自动编号”问题
    C#获取时间函数
    在load事件中关闭窗体
    panel里面显示form的问题
    将RichTextBox 的内容直接写入数据库
    利用反射来创建一个Form.
  • 原文地址:https://www.cnblogs.com/tangsonghuai/p/11169032.html
Copyright © 2011-2022 走看看