zoukankan      html  css  js  c++  java
  • 监控文件生成数据存入数据库

    操作一份实时监控文件流程,用到多个组件,缺点,如果当掉,监控的文件夹下进来文件,再启动不能执行,优化:可再写个脚本来把监控的文件夹下的文件移出,再放入,
    
    代码开始:
    
    import java.io.File
    
    import cold_test.monitorutil.FileListener
    import cold_test.tools.NowDate
    import org.apache.commons.io.monitor._
    object MonitorFile {
    
      val rowCsv3="路径"
    
      def main(args:Array[String] ){
    
        println(NowDate.NowDate()+":开始等待数据进入。。。。")
    
        val rowCsv3File=new File(rowCsv3)
        val monitor:FileAlterationMonitor=new FileAlterationMonitor(5000)
        val rowCsv3Fileobserver: FileAlterationObserver = new FileAlterationObserver(rowCsv3File)
    
        val listener=new FileListener()
        monitor.addObserver(rowCsv3Fileobserver)
    
        rowCsv3Fileobserver.addListener(listener)
    
        monitor.start()
      }
    }
    
    
      2,重写监控FileAlterationListener,注意导包
    
    import java.io._
    
    import cold_test.tools.NowDate
    import cold_test.utils.UsePythonShell
    import org.apache.commons.io.monitor.{FileAlterationListener, FileAlterationObserver}
    
    class FileListener extends FileAlterationListener{
    
    
    override def onFileCreate(file: File) = {
      val fileName: String = file.getName
    
      println(NowDate.NowDate()+s"当前处理的文件为$fileName")
    
      UsePythonShell.usePythonShell(fileName)
    
    }
      override def onFileDelete(directory: File) = {
        println("onFileDelete" + directory.getName)
        //super.onFileDelete(directory)
      }
      override def onFileChange(directory: File) = {
        println("onFileChange" + directory.getName)
        //super.onFileChange(directory)
      }
    
      override def onStart(fileAlterationObserver: FileAlterationObserver): Unit = {}
      override def onDirectoryCreate(file: File): Unit = {}
      override def onDirectoryChange(file: File): Unit = {}
      override def onDirectoryDelete(file: File): Unit = {}
      override def onStop(fileAlterationObserver: FileAlterationObserver): Unit = {}
    }
    
    
      3,调用本地pythonshell, 通过执行器runtime.exec(parms),通过装饰流输出信息readLine(),"Finish"执行完,puthdfs上,再执行Go!!
    
    import java.io.{BufferedReader, File, FileOutputStream, InputStreamReader}
    
    import cold_test.tools.NowDate
    
    object UsePythonShell {
    
    
      private val runtime = Runtime.getRuntime
      private var parms: Array[String] = _
      private var process: Process = _
    
      def usePythonShell(fileName:String):Unit={
        println(NowDate.NowDate()+"输出的文件名称为:"+fileName)
        parms = Array("python","/路径/XXX.py",fileName)
    
        println(NowDate.NowDate()+"开始调用PythonShell:"+fileName)
        process = runtime.exec(parms)
    
        // 接收命令执行的输出信息
        val infoReader = new BufferedReader(new InputStreamReader(process.getInputStream, "UTF-8"))
        //读取输出信息
        val pythonResult = infoReader.readLine()
        println(NowDate.NowDate()+s"接收${fileName}的PythonShell命令执行的输出信息:"+pythonResult)
    
        try{
          if (pythonResult.startsWith("Finish")){
            //将原始文件删除,将Row和Avg文件Put到Hdfs上
             UsePutHdfsShell.usePutHdfsShell(fileName)
            //调用GoMethod,处理Row和Avg文件
             UseGoMethodShell.useGoMethodShell(fileName)
          }
        }catch {
          case e: Exception => {
            println(s"$e:${fileName}文件的Go方法返回值为$pythonResult")
            //如果返回的不是Finish就把错误日志写出到Log文件中
            val pythonResultErro = s"PythonMethod has an error with handling $fileName"
            val python2Scala_Result: FileOutputStream = new FileOutputStream(new File(s"/路径/XX.log"))
            python2Scala_Result.write(pythonResultErro.getBytes())
            python2Scala_Result.flush()
            python2Scala_Result.close()
          }
        }
    
      }
    }
    
    
      4,调Goshell执行的文件通过执行器runtime.exec(parms),通过装饰流输出信息readLine(),"Finish"执行完!!接收到Go返回的Finish结果后调用Druid把数据存入Oracle%%
    
    import java.io.{BufferedReader, File, FileOutputStream, InputStreamReader}
    
    import cold_test.tools.NowDate
    import cold_test.utils.druidUtils.{DruidPutOracleFeatureScore, DruidPutOracleLowWindowsDetails, DruidPutOracleLowWindowsNumber}
    
    object UseGoMethodShell {
    
      private val runtime = Runtime.getRuntime
      private var parms: Array[String] = _
      private var process: Process = _
    
      def useGoMethodShell(fileName:String):Unit={
    
        println(NowDate.NowDate()+"开始调用GetGoMethodShell方法")
    
        var filePath: String =s"路径"
    
        parms = Array[String]("/路径",filePath)
        // 执行命令
        println(NowDate.NowDate()+s"开始调用${fileName}的GoMethodShell执行命令")
        process = runtime.exec(parms)
    
        // 接收命令执行的输出信息
        val infoReader = new BufferedReader(new InputStreamReader(process.getInputStream, "UTF-8"))
        //读取GO输出信息
    
        val GoResult = infoReader.readLine()
        println(NowDate.NowDate()+s"接收${fileName}的GoMethodShell命令执行的输出信息:"+GoResult)
        try{
          if (GoResult.startsWith("Finish")){
            //接收到Go返回的Finish结果后调用Druid把数据存入Oracle
            DruidPutOracleLowWindowsDetails.druidPutOracle(fileName)
            DruidPutOracleLowWindowsNumber.druidPutOracle(fileName)
            DruidPutOracleFeatureScore.druidPutOracle(fileName)
            DeleteResultTable.deleteResultTableCsv(fileName)
          }
        }catch {
          case e: Exception => {
            println(s"$e:${fileName}文件的Go方法返回值为$fileName")
            //如果返回的不是Finish就把错误日志写出到Log文件中
            val GoResultErro = s"GoMethod has an error with handling $fileName"
            val go2Scala_Result: FileOutputStream = new FileOutputStream(new File(s"/路径/XX.log"))
            go2Scala_Result.write(GoResultErro.getBytes())
            go2Scala_Result.flush()
            go2Scala_Result.close()
          }
        }
    
        }
    }
    
    
      5,德鲁伊数据连接
    
    import com.alibaba.druid.pool.DruidDataSource;
    
    import java.sql.Connection;
    
    public class DruidPutOracleJava {
    
        private static DruidDataSource dataSource=null;
    
        /**
         * 构造函数完成数据库的连接和连接对象的生成
         * @throws Exception
         */
        public DruidPutOracleJava(){
    
        }
    
        public void GetDbConnect() throws Exception  {
            try{
                if(dataSource==null){
                    dataSource=new DruidDataSource();
                    //设置连接参数
                    dataSource.setUrl("jdbc:oracle:");
                    dataSource.setDriverClassName("oracle.jdbc.OracleDriver");
                    dataSource.setUsername("root");
                    dataSource.setPassword("root");
                    //配置初始化大小、最小、最大
                    dataSource.setInitialSize(1);
                    dataSource.setMinIdle(1);
                    dataSource.setMaxActive(200);
                    //连接泄漏监测
                    dataSource.setRemoveAbandoned(true);
                    dataSource.setRemoveAbandonedTimeout(5000000);
                    //配置获取连接等待超时的时间
                    dataSource.setMaxWait(2000000);
                    //配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒
                    dataSource.setTimeBetweenEvictionRunsMillis(20000);
                    //防止过期
                    dataSource.setValidationQuery("SELECT 'x' from dual");
                    dataSource.setTestWhileIdle(true);
                    dataSource.setTestOnBorrow(true);
                }
            }catch(Exception e){
                throw e;
            }
        }
    
        /**
         * 取得已经构造生成的数据库连接
         * @return 返回数据库连接对象
         * @throws Exception
         */
        public Connection getConnect() throws Exception{
            Connection con=null;
            try {
                GetDbConnect();
                con=dataSource.getConnection();
            } catch (Exception e) {
                throw e;
            }
            return con;
        }
    
    }
    
    
      6,如果嫌慢可以批量插入
    
    import java.sql.{Connection, Statement}
    
    import cold_test.tools.NowDate
    import druidPutOracle.DruidPutOracleJava
    
    import scala.io.{BufferedSource, Source}
    
    object DruidPutOracleLowWindowsDetails {
    
      private val connect: DruidPutOracleJava = new DruidPutOracleJava
      private val conn: Connection = connect.getConnect
      private val statement: Statement = conn.createStatement()
    
      def druidPutOracle(filename:String):Unit={
        val lowWindowsDetails: BufferedSource = Source.fromFile(s"路径")
        val lowWindowsDetailsArr: Iterator[String] = lowWindowsDetails.getLines()
        println(NowDate.NowDate()+s":${filename}文件的lowWindowsDetails表开始写入Oracle数据库。。。。")
        lowWindowsDetailsArr.foreach(str=>{
          val strs: Array[String] = str.split(",")
          val a1 = strs(0)
          val a2 = strs(1)
          val sql =
            s"""
               |INSERT INTO LOWWINDOW_ALL_FEATURE
               |(FILENAME,FEATURENAME)
               | VALUES
               |('$a1','$a2')
               """.stripMargin
          statement.addBatch(sql)
        })
        statement.executeBatch()
        conn.commit()
        lowWindowsDetails.close()
    
        println(NowDate.NowDate()+s":${filename}文件的表写入Oracle数据库完毕。。。。")
      }
    }
    
    
      7,操作hdfs
    
    object UsePutHdfsShell {
    
      private val runtime = Runtime.getRuntime
    
      def usePutHdfsShell(fileName:String):Unit={
    
    
    
        runtime.exec(s"hdfs dfs -put /路径/")
    
        runtime.exec(s"hdfs dfs -put /路径/")
    
        runtime.exec(s"rm -rf //路径/$fileName")
    
      }
    }
    
    
      
    
    如果返回的不是Finish就把错误日志写出到Log文件中
    
  • 相关阅读:
    nodejs+express+mysql实现restful风格的增删改查示例
    使百度统计排除自己
    node.js和JavaScript的关系
    完善chrome翻译插件ChaZD,支持有道智云api
    面向对象编程 —— java实现函数求导
    我的第一篇博客 —— 博客内容简介
    微信公众号支付
    Shiro的原理及Web搭建
    AOP 切面编程------JoinPoint ---- log日志
    quartz 不同时间间隔调度任务
  • 原文地址:https://www.cnblogs.com/Bread-Wang/p/9951745.html
Copyright © 2011-2022 走看看