zoukankan      html  css  js  c++  java
  • java实时监听日志写入kafka(转)

    原文链接:http://www.sjsjw.com/kf_cloud/article/020376ABA013802.asp

    目的

    实时监听某目录下的日志文件,如有新文件切换到新文件,并同步写入kafka,同时记录日志文件的行位置,以应对进程异常退出,能从上次的文件位置开始读取(考虑到效率,这里是每100条记一次,可调整)

    源码:

    import java.io.BufferedReader;
    import java.io.BufferedWriter;
    import java.io.File;
    import java.io.FileInputStream;
    import java.io.FileNotFoundException;
    import java.io.FileReader;
    import java.io.FileWriter;
    import java.io.IOException;
    import java.io.LineNumberReader;
    import java.io.PrintWriter;
    import java.io.RandomAccessFile;
    import java.net.NoRouteToHostException;
    import java.util.ArrayList;  
    import java.util.Collection;  
    import java.util.List;  
    import java.util.Properties;  
    import java.util.Random;
    import java.util.concurrent.Executors;
    import java.util.concurrent.ScheduledExecutorService;
    import java.util.concurrent.TimeUnit;
    
    
      
    import kafka.javaapi.producer.Producer;  
    import kafka.producer.KeyedMessage;  
    import kafka.producer.ProducerConfig;  
    
    
    /*
     * 自己在源服务器写生产者往kafka插入数据,注意文件"producer.properties放在linux下该jar文件同一目录
     * 监听某个目录下的文件数据然后写入kafka
     * nohup java -jar portallog_producer.jar portallog /var/apache/logs portallog.position  >/home/sre/portalhandler/handler.log 2>&1 &
     * 
     * 
     */
    public class PortalLogTail_Line {  
      
        private Producer<String,String> inner;  
        java.util.Random ran = new Random();
        public PortalLogTail_Line() throws FileNotFoundException, IOException {  
            Properties properties = new Properties();  
         //   properties.load(ClassLoader.getSystemResourceAsStream("producer.properties"));  
          
            properties.load(new FileInputStream("producer.properties"));  
           
            ProducerConfig config = new ProducerConfig(properties); 
          
            inner = new Producer<String, String>(config);  
         
        }  
      
          
        public void send(String topicName,String message) {  
            if(topicName == null || message == null){  
                return;  
            }  
         //   KeyedMessage<String, String> km = new KeyedMessage<String, String>(topicName,message);  
            //随机作为key,hash分散到各个分区
          KeyedMessage<String, String> km = new KeyedMessage<String, String>(topicName,String.valueOf(ran.nextInt(9)),message);  
         //   KeyedMessage<String, String> km = new KeyedMessage<String, String>(topicName,message,message);
            inner.send(km);  
            
        }  
          
        public void send(String topicName,Collection<String> messages) {  
            if(topicName == null || messages == null){  
                return;  
            }  
            if(messages.isEmpty()){  
                return;  
            }  
            List<KeyedMessage<String, String>> kms = new ArrayList<KeyedMessage<String, String>>();  
            for(String entry : messages){  
                KeyedMessage<String, String> km = new KeyedMessage<String, String>(topicName,entry);  
                kms.add(km);  
            }  
            inner.send(kms);  
        }  
          
        public void close(){  
            inner.close();  
        }  
    
        
        
        public String getNewFile(File file)
        {
            File[] fs=file.listFiles();
            long maxtime=0;
            String newfilename="";
            for (int i=0;i<fs.length;i++)
            {
                if (fs[i].lastModified()>maxtime && fs[i].getName().contains("access"))
                {
                    maxtime=fs[i].lastModified();
                    newfilename=fs[i].getAbsolutePath();
                    
                }
            }
            return newfilename;
        }
          //写入文件名及行号
        public void writePosition(String path,int rn,String positionpath)
        {
            try {
                   BufferedWriter out = new BufferedWriter(new FileWriter(positionpath));
                   out.write(path+","+rn);
                   out.close();
            } catch (IOException e) {
            }
        }
        LineNumberReader randomFile=null;
         String newfile=null;
         String thisfile=null;
         String prefile=null;
         int ln=0;
         int beginln=0;
        public void realtimeShowLog(final File file,final String topicname, final String positionpath) throws IOException{     
          
            //启动一个线程每1秒钟读取新增的日志信息     
           new Thread(new Runnable(){     
                public void run() {     
                       thisfile=getNewFile(file);
                     prefile=thisfile;
                     //访问position文件,如果记录了文件路径,及行号,则定位,否则使用最新的文件
                     try {
                         BufferedReader br=new BufferedReader(new FileReader(positionpath));
                         String line=br.readLine();
                         if (line!=null &&line.contains(","))
                         {
                             thisfile=line.split(",")[0];
                              prefile=thisfile;
                              beginln=Integer.parseInt(line.split(",")[1]);
                         }
                         
                         
                     } catch (FileNotFoundException e2) {
                         // TODO Auto-generated catch block
                         e2.printStackTrace();
                     }
                      catch (IOException e2) {
                             // TODO Auto-generated catch block
                             e2.printStackTrace();
                         }
                     
                     //指定文件可读可写     
                         try {
                             randomFile = new LineNumberReader(new FileReader(thisfile));
                         } catch (FileNotFoundException e) {
                             // TODO Auto-generated catch block
                             e.printStackTrace();
                         }     
                  while (true)
                  {
                      try {
                         Thread.sleep(100);
                         
                     } catch (InterruptedException e1) {
                         // TODO Auto-generated catch block
                         e1.printStackTrace();
                     }
                      try {     
                          //获得变化部分的     
                        //  randomFile.seek(lastTimeFileSize);     
                          String tmp = "";     
                          while( (tmp = randomFile.readLine())!= null) {  
                              int currln=randomFile.getLineNumber();
                              //beginln默认为0
                              if (currln>beginln)
                                  send(topicname,new String(tmp.getBytes("utf8")));
                              
                              ln++;
                              
                              //每发生一条写一次影响效率,连续发100次后再记录位置
                              if (ln>100)
                                  {
                                  writePosition(thisfile,currln,positionpath);
                                  ln=0;
                                  }
                         
                          }   
                         thisfile=getNewFile(file);
                         if(!thisfile.equals(prefile))
                         
                         {
                             randomFile.close();
                             randomFile = new LineNumberReader(new FileReader(thisfile));
                            prefile=thisfile;
                           beginln=0;
                         }
                          
                         
                      } catch (IOException e) {     
                          throw new RuntimeException(e);     
                      }     
                  }
            }}).start();     
        }     
          
        /** 
         * @param args 
         * @throws Exception 
         */  
        public static void main(String[] args) throws Exception {  
            PortalLogTail_Line producer = new PortalLogTail_Line();   
            if (args.length!=3)
            {
                System.out.println("usage:topicname pathname positionpath");
                System.exit(1);
            }
            String topicname=args[0];
            String pathname=args[1];
            String positionpath=args[2]; 
            final File tmpLogFile = new File(pathname);
            producer.realtimeShowLog(tmpLogFile,topicname,positionpath); 
            
       
      
        }  
      
    } 
    producer.properties文件放在同级目录下
    
    metadata.broker.list=xxx:10909,xxx:10909
    
    # name of the partitioner class for partitioning events; default partition spreads data randomly
    #partitioner.class=
    
    # specifies whether the messages are sent asynchronously (async) or synchronously (sync)
    producer.type=sync
    #producer.type=async
    
    # specify the compression codec for all data generated: none , gzip, snappy.
    # the old config values work as well: 0, 1, 2 for none, gzip, snappy, respectivally
    compression.codec=none
    #compression.codec=gzip
    
    # message encoder
    serializer.class=kafka.serializer.StringEncoder

    测试

    最后执行: 

     nohup java -jar portallog_producer.jar portallog /var/apache/logs portallog.position  >/home/sre/portalhandler/handler.log 2>&1 &
  • 相关阅读:
    JS和C# 里的闭包及闭包在事件中的使用
    ***项目开发记录
    七牛云存储之应用视频上传系统开心得
    二维码及二维码接合短URL的应用
    EF批量添加,删除,修改的扩展
    ngTemplateOutlet递归的问题
    每日新知2019-06-03
    Spring boot初始
    纯前端播放本地音乐
    macbook 安装任意来源
  • 原文地址:https://www.cnblogs.com/gaopeng527/p/5266196.html
Copyright © 2011-2022 走看看