zoukankan      html  css  js  c++  java
  • storm实时计算实例(socket实时接入)

    介绍

    实现了一个简单的从实时日志文件监听,写入socket服务器,再接入Storm计算的一个流程。

    源码

    日志监听实时写入socket服务器

     
    [java] view plain copy
     
    1. package socket;  
    2.   
    3. import java.io.BufferedReader;  
    4. import java.io.File;       
    5. import java.io.IOException;       
    6. import java.io.InputStreamReader;  
    7. import java.io.PrintWriter;  
    8. import java.io.RandomAccessFile;       
    9. import java.net.Socket;  
    10. import java.util.concurrent.Executors;       
    11. import java.util.concurrent.ScheduledExecutorService;       
    12. import java.util.concurrent.TimeUnit;       
    13. /* 
    14.  * 监测数据,通过socket远程发送到另外服务器 ,见MyServerMulti 
    15.  * ClientRead再通过服务器从socket里读 
    16.  *  
    17.  */  
    18.       
    19. public class LogViewToSocket {       
    20.     private long lastTimeFileSize = 0;  //上次文件大小       
    21.     /**    
    22.      * 实时输出日志信息    
    23.      * @param logFile 日志文件    
    24.      * @throws IOException    
    25.      */      
    26.       
    27.     public String getNewFile(File file)  
    28.     {  
    29.         File[] fs=file.listFiles();  
    30.         long maxtime=0;  
    31.         String newfilename="";  
    32.         for (int i=0;i<fs.length;i++)  
    33.         {  
    34.             if (fs[i].lastModified()>maxtime)  
    35.             {  
    36.                 maxtime=fs[i].lastModified();  
    37.                 newfilename=fs[i].getAbsolutePath();  
    38.                   
    39.             }  
    40.         }  
    41.         return newfilename;  
    42.     }  
    43.      RandomAccessFile randomFile=null;  
    44.      String newfile=null;  
    45.      String thisfile=null;  
    46.     public void realtimeShowLog(final File logFile,final PrintWriter out) throws IOException{       
    47.            newfile=getNewFile(logFile);  
    48.         //指定文件可读可写       
    49.             randomFile = new RandomAccessFile(new File(newfile),"r");       
    50.         //启动一个线程每1秒钟读取新增的日志信息       
    51.         ScheduledExecutorService exec =        
    52.             Executors.newScheduledThreadPool(1);       
    53.         exec.scheduleWithFixedDelay(new Runnable(){       
    54.             public void run() {       
    55.                 try {       
    56.                     //获得变化部分的       
    57.                     randomFile.seek(lastTimeFileSize);       
    58.                     String tmp = "";       
    59.                     while( (tmp = randomFile.readLine())!= null) {       
    60.                         System.out.println(new String(tmp.getBytes("ISO8859-1")));   
    61.                         out.println(new String(tmp.getBytes("ISO8859-1")));  
    62.                         out.flush();   
    63.                     }     
    64.                    thisfile=getNewFile(logFile);  
    65.                    if(!thisfile.equals(newfile))  
    66.                      
    67.                    {  
    68.                        randomFile = new RandomAccessFile(new File(newfile),"r");  
    69.                        lastTimeFileSize=0;  
    70.                    }  
    71.                    else  
    72.                          
    73.                     lastTimeFileSize = randomFile.length();       
    74.                      
    75.                 } catch (IOException e) {       
    76.                     throw new RuntimeException(e);       
    77.                 }       
    78.             }       
    79.         }, 0, 1, TimeUnit.SECONDS);       
    80.     }       
    81.            
    82.     public static void main(String[] args) throws Exception {       
    83.         LogViewToSocket view = new LogViewToSocket();       
    84.   
    85.             Socket socket=new Socket("192.168.27.100",5678);   
    86.      
    87.         PrintWriter out=new PrintWriter(socket.getOutputStream());      
    88.            
    89.             
    90.   
    91.         final File tmpLogFile = new File("/home/hadoop/test");       
    92.         view.realtimeShowLog(tmpLogFile,out);   
    93.        // socket.close();  
    94.           
    95.     }       
    96.       
    97. }      
     

    socket服务器处理

    [java] view plain copy
     
    1. import java.io.BufferedReader;    
    2. import java.io.IOException;    
    3. import java.io.InputStreamReader;    
    4. import java.io.PrintWriter;    
    5. import java.net.ServerSocket;    
    6. import java.net.Socket;    
    7. import java.net.SocketAddress;  
    8. import java.util.*;  
    9.     
    10. public class MyServerMulti {    
    11.     private static Socket socket1;  
    12.   
    13.     public static void main(String[] args) throws IOException {    
    14.         ServerSocket server = new ServerSocket(5678);    
    15.           int i=0;  
    16.           ArrayList<PrintWriter> outs=new ArrayList<PrintWriter>();  
    17.             
    18.           /* 
    19.            * 一个client socket发送数据过来, server端再发到其他client socket端 
    20.            *  
    21.            */  
    22.           Socket socket1=null;  
    23.         while (true) {  
    24.               
    25.             Socket socket = server.accept();    
    26.              i++;  
    27.              System.out.println(i);  
    28.              System.out.println(socket.getInetAddress());  
    29.                  PrintWriter out= new PrintWriter(socket.getOutputStream());  
    30.                  outs.add(out);  
    31.                  if(i==1)  
    32.                       socket1=socket;  
    33.                  if(i==2)  
    34.                        
    35.                  invoke(socket1,outs);  
    36.                    
    37.               
    38.         }    
    39.     }    
    40.         
    41.     private static void invoke(final Socket client, final ArrayList<PrintWriter> outs) throws IOException {    
    42.         new Thread(new Runnable() {    
    43.             public void run() {    
    44.                 BufferedReader in = null;    
    45.                 PrintWriter out = null;    
    46.                 PrintWriter out1 = null;  
    47.                 try {    
    48.                     in = new BufferedReader(new InputStreamReader(client.getInputStream()));    
    49.                     out = new PrintWriter(client.getOutputStream());    
    50.     
    51.                     while (true) {    
    52.                         String msg = in.readLine();    
    53.                         System.out.println(msg);    
    54.                         out.println("Server received " + msg);    
    55.                         out.flush();    
    56.                           
    57.                         /*数据转发送到多个client*/  
    58.                         for(int i=0;i<outs.size();i++)  
    59.                         {  
    60.                             out1=outs.get(i);  
    61.                             System.out.println(i);  
    62.                             System.out.println("send msg:"+msg);  
    63.                              out1.println(msg);  
    64.                             out1.flush();  
    65.                         }  
    66.                           
    67.                         System.out.println(client.getInetAddress());  
    68.                         if (msg.equals("bye")) {    
    69.                             break;    
    70.                         }    
    71.                     }    
    72.                 } catch(IOException ex) {    
    73.                     ex.printStackTrace();    
    74.                 } finally {    
    75.                     try {    
    76.                         in.close();    
    77.                     } catch (Exception e) {}    
    78.                     try {    
    79.                         out.close();    
    80.                     } catch (Exception e) {}    
    81.                     try {    
    82.                         client.close();    
    83.                     } catch (Exception e) {}    
    84.                 }    
    85.             }    
    86.         }).start();    
    87.     }    
    88. }    

    storm topology

    [java] view plain copy
     
    1. import java.io.BufferedReader;  
    2. import java.io.BufferedWriter;  
    3. import java.io.File;  
    4. import java.io.FileNotFoundException;  
    5. import java.io.FileOutputStream;  
    6. import java.io.FileReader;  
    7. import java.io.FileWriter;  
    8. import java.io.IOException;  
    9. import java.io.InputStreamReader;  
    10. import java.io.OutputStreamWriter;  
    11. import java.io.PrintWriter;  
    12. import java.io.RandomAccessFile;  
    13. import java.net.Socket;  
    14. import java.net.UnknownHostException;  
    15. import java.util.Map;  
    16.    
    17. //import mytest.ThroughputTest.GenSpout;  
    18.    
    19. import backtype.storm.Config;  
    20. import backtype.storm.LocalCluster;  
    21. import backtype.storm.StormSubmitter;  
    22. import backtype.storm.generated.AlreadyAliveException;  
    23. import backtype.storm.generated.InvalidTopologyException;  
    24. import backtype.storm.spout.SpoutOutputCollector;  
    25. import backtype.storm.task.OutputCollector;  
    26. import backtype.storm.task.TopologyContext;  
    27. import backtype.storm.topology.BasicOutputCollector;  
    28. import backtype.storm.topology.OutputFieldsDeclarer;  
    29. import backtype.storm.topology.TopologyBuilder;  
    30. import backtype.storm.topology.base.BaseBasicBolt;  
    31. import backtype.storm.topology.base.BaseRichBolt;  
    32. import backtype.storm.topology.base.BaseRichSpout;  
    33. import backtype.storm.tuple.Fields;  
    34. import backtype.storm.tuple.Tuple;  
    35. import backtype.storm.tuple.Values;  
    36. import backtype.storm.utils.Utils;  
    37. /* 
    38.  *  
    39.  * 
    40.  *  storm jar stormtest.jar socket.SocketProcess /home/hadoop/out_socket.txt true 
    41.  *  
    42.  */  
    43.    
    44. public class SocketProcess {  
    45.          public static class  SocketSpout extends BaseRichSpout {  
    46.    
    47.                    /** 
    48.                     */  
    49.               static Socket sock=null;  
    50.               static BufferedReader in=null;  
    51.               String str=null;  
    52.                    private static final long serialVersionUID = 1L;  
    53.                    private SpoutOutputCollector _collector;  
    54.                    private BufferedReader br;  
    55.                    private String dataFile;  
    56.                    private BufferedWriter bw2;  
    57.                     RandomAccessFile randomFile;  
    58.                     private long lastTimeFileSize = 0;   
    59.                     int cnt=0;  
    60.                    //定义spout文件  
    61.                     SocketSpout(){  
    62.                        
    63.                    }  
    64.    
    65.                    //定义如何读取spout文件  
    66.                    @Override  
    67.                    public void open(Map conf, TopologyContext context,  
    68.                                      SpoutOutputCollector collector) {  
    69.                             // TODO Auto-generated method stub  
    70.                             _collector = collector;  
    71.                             try {  
    72.                                 sock=new Socket("192.168.27.100",5678);  
    73.                                  in=     
    74.                                     new BufferedReader(new InputStreamReader(sock.getInputStream()));     
    75.                             } catch (UnknownHostException e) {  
    76.                                 // TODO Auto-generated catch block  
    77.                                 e.printStackTrace();  
    78.                             } catch (IOException e) {  
    79.                                 // TODO Auto-generated catch block  
    80.                                 e.printStackTrace();  
    81.                             }  
    82.                          
    83.                    }  
    84.    
    85.                    //获取下一个tuple的方法  
    86.                    @Override  
    87.                    public void nextTuple() {  
    88.                             // TODO Auto-generated method stub  
    89.                        if(sock==null){  
    90.                              try {  
    91.                                 sock=new Socket("192.168.27.100",5678);  
    92.                                  in=     
    93.                                         new BufferedReader(new InputStreamReader(sock.getInputStream()));    
    94.                             } catch (UnknownHostException e) {  
    95.                                 // TODO Auto-generated catch block  
    96.                                 e.printStackTrace();  
    97.                             } catch (IOException e) {  
    98.                                 // TODO Auto-generated catch block  
    99.                                 e.printStackTrace();  
    100.                             }   
    101.                        }  
    102.                          
    103.                          
    104.                        while(true){      
    105.                             
    106.                         try {  
    107.                             str = in.readLine();  
    108.                         } catch (IOException e) {  
    109.                             // TODO Auto-generated catch block  
    110.                             e.printStackTrace();  
    111.                         }  
    112.                         System.out.println(str);    
    113.                         _collector.emit(new Values(str));  
    114.                         if(str.equals("end")){      
    115.                             break;      
    116.                             }   
    117.                         }  
    118.                          
    119.                          
    120.                          
    121.                          
    122.                          
    123.                          
    124.                          
    125.                               
    126.                               
    127.                    }  
    128.    
    129.    
    130.                    @Override  
    131.                    public void declareOutputFields(OutputFieldsDeclarer declarer) {  
    132.                             // TODO Auto-generated method stub  
    133.                             declarer.declare(new Fields("line"));  
    134.                    }  
    135.                     
    136.          }  
    137.           
    138.    
    139.          public static class Process extends BaseRichBolt{  
    140.    
    141.                    private String _seperator;  
    142.                    private String _outFile;  
    143.                    PrintWriter pw;  
    144.                    private OutputCollector _collector;  
    145.                    private BufferedWriter bw;  
    146.                     
    147.                    public Process(String outFile) {  
    148.                              
    149.                             this._outFile   = outFile;  
    150.                              
    151.                    }  
    152.                     
    153.                    //把输出结果保存到外部文件里面。  
    154.                    @Override  
    155.                    public void prepare(Map stormConf, TopologyContext context,  
    156.                                      OutputCollector collector) {  
    157.                             // TODO Auto-generated method stub  
    158.                             this._collector = collector;  
    159.                             File out = new File(_outFile);  
    160.                             try {  
    161. //                                  br = new BufferedWriter(new FileWriter(out));  
    162.                                      bw = new BufferedWriter(new OutputStreamWriter(   
    163.                              new FileOutputStream(out, true)));   
    164.                             } catch (IOException e1) {  
    165.                                      // TODO Auto-generated catch block  
    166.                                      e1.printStackTrace();  
    167.                             }                  
    168.                    }  
    169.                     
    170.                    //blot计算单元,把tuple中的数据添加一个bkeep和回车。然后保存到outfile指定的文件中。  
    171.                    @Override  
    172.                    public void execute(Tuple input) {  
    173.                             // TODO Auto-generated method stub  
    174.                             String line = input.getString(0);  
    175. //                         System.out.println(line);  
    176.                        //     String[] str = line.split(_seperator);  
    177.                          //   System.out.println(str[2]);  
    178.                             try {  
    179.                                      bw.write(line+",bkeep"+" ");  
    180.                                      bw.flush();  
    181.                             } catch (IOException e) {  
    182.                                      // TODO Auto-generated catch block  
    183.                                      e.printStackTrace();  
    184.                             }  
    185.                              
    186.                             _collector.emit(new Values(line));  
    187.                    }  
    188.    
    189.                    @Override  
    190.                    public void declareOutputFields(OutputFieldsDeclarer declarer) {  
    191.                             // TODO Auto-generated method stub  
    192.                             declarer.declare(new Fields("line"));  
    193.                    }  
    194.                     
    195.          }  
    196.           
    197.          public static void main(String[] argv) throws AlreadyAliveException, InvalidTopologyException{  
    198.                   
    199.                    String outFile   = argv[0]; //输出文件  
    200.                    boolean distribute = Boolean.valueOf(argv[1]);       //本地模式还是集群模式  
    201.                    TopologyBuilder builder = new TopologyBuilder();  //build一个topology  
    202.         builder.setSpout("spout", new  SocketSpout(), 1);   //指定spout  
    203.         builder.setBolt("bolt", new Process(outFile),1).shuffleGrouping("spout");  //指定bolt,包括bolt、process和grouping  
    204.         Config conf = new Config();  
    205.         if(distribute){  
    206.             StormSubmitter.submitTopology("SocketProcess", conf, builder.createTopology());  
    207.         }else{  
    208.                  LocalCluster cluster = new LocalCluster();  
    209.                  cluster.submitTopology("SocketProcess", conf, builder.createTopology());  
    210.         }  
    211.          }         
    212. }  


    最后执行
    [java] view plain copy
     
    1. storm jar stormtest.jar socket.SocketProcess /home/hadoop/out_socket.txt true  
    spout接受从socket服务器实时发送过来的数据,经过topology处理,最终将数据写入out_socket.txt文件
    转:http://blog.csdn.net/u011750989/article/details/18547015
  • 相关阅读:
    [Codeforces Round #516][Codeforces 1063C/1064E. Dwarves, Hats and Extrasensory Abilities]
    接入gitment为hexo添加评论功能
    常用SQL语句
    小米前端二面面经
    将hexo的评论系统由gitment改为Valine
    同步与异步
    前端构建工具对比
    前端向后台发送请求有哪些方式
    关于hexo markdown添加的图片在github page中无法显示的问题
    使用TensorBoard可视化工具
  • 原文地址:https://www.cnblogs.com/hd-zg/p/6905284.html
Copyright © 2011-2022 走看看