操作一份实时监控文件流程,用到多个组件,缺点,如果当掉,监控的文件夹下进来文件,再启动不能执行,优化:可再写个脚本来把监控的文件夹下的文件移出,再放入,
代码开始:
import java.io.File
import cold_test.monitorutil.FileListener
import cold_test.tools.NowDate
import org.apache.commons.io.monitor._
object MonitorFile {
val rowCsv3="路径"
def main(args:Array[String] ){
println(NowDate.NowDate()+":开始等待数据进入。。。。")
val rowCsv3File=new File(rowCsv3)
val monitor:FileAlterationMonitor=new FileAlterationMonitor(5000)
val rowCsv3Fileobserver: FileAlterationObserver = new FileAlterationObserver(rowCsv3File)
val listener=new FileListener()
monitor.addObserver(rowCsv3Fileobserver)
rowCsv3Fileobserver.addListener(listener)
monitor.start()
}
}
2,重写监控FileAlterationListener,注意导包
import java.io._
import cold_test.tools.NowDate
import cold_test.utils.UsePythonShell
import org.apache.commons.io.monitor.{FileAlterationListener, FileAlterationObserver}
class FileListener extends FileAlterationListener{
override def onFileCreate(file: File) = {
val fileName: String = file.getName
println(NowDate.NowDate()+s"当前处理的文件为$fileName")
UsePythonShell.usePythonShell(fileName)
}
override def onFileDelete(directory: File) = {
println("onFileDelete" + directory.getName)
//super.onFileDelete(directory)
}
override def onFileChange(directory: File) = {
println("onFileChange" + directory.getName)
//super.onFileChange(directory)
}
override def onStart(fileAlterationObserver: FileAlterationObserver): Unit = {}
override def onDirectoryCreate(file: File): Unit = {}
override def onDirectoryChange(file: File): Unit = {}
override def onDirectoryDelete(file: File): Unit = {}
override def onStop(fileAlterationObserver: FileAlterationObserver): Unit = {}
}
3,调用本地pythonshell, 通过执行器runtime.exec(parms),通过装饰流输出信息readLine(),"Finish"执行完,puthdfs上,再执行Go!!
import java.io.{BufferedReader, File, FileOutputStream, InputStreamReader}
import cold_test.tools.NowDate
object UsePythonShell {
private val runtime = Runtime.getRuntime
private var parms: Array[String] = _
private var process: Process = _
def usePythonShell(fileName:String):Unit={
println(NowDate.NowDate()+"输出的文件名称为:"+fileName)
parms = Array("python","/路径/XXX.py",fileName)
println(NowDate.NowDate()+"开始调用PythonShell:"+fileName)
process = runtime.exec(parms)
// 接收命令执行的输出信息
val infoReader = new BufferedReader(new InputStreamReader(process.getInputStream, "UTF-8"))
//读取输出信息
val pythonResult = infoReader.readLine()
println(NowDate.NowDate()+s"接收${fileName}的PythonShell命令执行的输出信息:"+pythonResult)
try{
if (pythonResult.startsWith("Finish")){
//将原始文件删除,将Row和Avg文件Put到Hdfs上
UsePutHdfsShell.usePutHdfsShell(fileName)
//调用GoMethod,处理Row和Avg文件
UseGoMethodShell.useGoMethodShell(fileName)
}
}catch {
case e: Exception => {
println(s"$e:${fileName}文件的Go方法返回值为$pythonResult")
//如果返回的不是Finish就把错误日志写出到Log文件中
val pythonResultErro = s"PythonMethod has an error with handling $fileName"
val python2Scala_Result: FileOutputStream = new FileOutputStream(new File(s"/路径/XX.log"))
python2Scala_Result.write(pythonResultErro.getBytes())
python2Scala_Result.flush()
python2Scala_Result.close()
}
}
}
}
4,调Goshell执行的文件通过执行器runtime.exec(parms),通过装饰流输出信息readLine(),"Finish"执行完!!接收到Go返回的Finish结果后调用Druid把数据存入Oracle%%
import java.io.{BufferedReader, File, FileOutputStream, InputStreamReader}
import cold_test.tools.NowDate
import cold_test.utils.druidUtils.{DruidPutOracleFeatureScore, DruidPutOracleLowWindowsDetails, DruidPutOracleLowWindowsNumber}
object UseGoMethodShell {
private val runtime = Runtime.getRuntime
private var parms: Array[String] = _
private var process: Process = _
def useGoMethodShell(fileName:String):Unit={
println(NowDate.NowDate()+"开始调用GetGoMethodShell方法")
var filePath: String =s"路径"
parms = Array[String]("/路径",filePath)
// 执行命令
println(NowDate.NowDate()+s"开始调用${fileName}的GoMethodShell执行命令")
process = runtime.exec(parms)
// 接收命令执行的输出信息
val infoReader = new BufferedReader(new InputStreamReader(process.getInputStream, "UTF-8"))
//读取GO输出信息
val GoResult = infoReader.readLine()
println(NowDate.NowDate()+s"接收${fileName}的GoMethodShell命令执行的输出信息:"+GoResult)
try{
if (GoResult.startsWith("Finish")){
//接收到Go返回的Finish结果后调用Druid把数据存入Oracle
DruidPutOracleLowWindowsDetails.druidPutOracle(fileName)
DruidPutOracleLowWindowsNumber.druidPutOracle(fileName)
DruidPutOracleFeatureScore.druidPutOracle(fileName)
DeleteResultTable.deleteResultTableCsv(fileName)
}
}catch {
case e: Exception => {
println(s"$e:${fileName}文件的Go方法返回值为$fileName")
//如果返回的不是Finish就把错误日志写出到Log文件中
val GoResultErro = s"GoMethod has an error with handling $fileName"
val go2Scala_Result: FileOutputStream = new FileOutputStream(new File(s"/路径/XX.log"))
go2Scala_Result.write(GoResultErro.getBytes())
go2Scala_Result.flush()
go2Scala_Result.close()
}
}
}
}
5,德鲁伊数据连接
import com.alibaba.druid.pool.DruidDataSource;
import java.sql.Connection;
public class DruidPutOracleJava {
private static DruidDataSource dataSource=null;
/**
* 构造函数完成数据库的连接和连接对象的生成
* @throws Exception
*/
public DruidPutOracleJava(){
}
public void GetDbConnect() throws Exception {
try{
if(dataSource==null){
dataSource=new DruidDataSource();
//设置连接参数
dataSource.setUrl("jdbc:oracle:");
dataSource.setDriverClassName("oracle.jdbc.OracleDriver");
dataSource.setUsername("root");
dataSource.setPassword("root");
//配置初始化大小、最小、最大
dataSource.setInitialSize(1);
dataSource.setMinIdle(1);
dataSource.setMaxActive(200);
//连接泄漏监测
dataSource.setRemoveAbandoned(true);
dataSource.setRemoveAbandonedTimeout(5000000);
//配置获取连接等待超时的时间
dataSource.setMaxWait(2000000);
//配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒
dataSource.setTimeBetweenEvictionRunsMillis(20000);
//防止过期
dataSource.setValidationQuery("SELECT 'x' from dual");
dataSource.setTestWhileIdle(true);
dataSource.setTestOnBorrow(true);
}
}catch(Exception e){
throw e;
}
}
/**
* 取得已经构造生成的数据库连接
* @return 返回数据库连接对象
* @throws Exception
*/
public Connection getConnect() throws Exception{
Connection con=null;
try {
GetDbConnect();
con=dataSource.getConnection();
} catch (Exception e) {
throw e;
}
return con;
}
}
6,如果嫌慢可以批量插入
import java.sql.{Connection, Statement}
import cold_test.tools.NowDate
import druidPutOracle.DruidPutOracleJava
import scala.io.{BufferedSource, Source}
object DruidPutOracleLowWindowsDetails {
private val connect: DruidPutOracleJava = new DruidPutOracleJava
private val conn: Connection = connect.getConnect
private val statement: Statement = conn.createStatement()
def druidPutOracle(filename:String):Unit={
val lowWindowsDetails: BufferedSource = Source.fromFile(s"路径")
val lowWindowsDetailsArr: Iterator[String] = lowWindowsDetails.getLines()
println(NowDate.NowDate()+s":${filename}文件的lowWindowsDetails表开始写入Oracle数据库。。。。")
lowWindowsDetailsArr.foreach(str=>{
val strs: Array[String] = str.split(",")
val a1 = strs(0)
val a2 = strs(1)
val sql =
s"""
|INSERT INTO LOWWINDOW_ALL_FEATURE
|(FILENAME,FEATURENAME)
| VALUES
|('$a1','$a2')
""".stripMargin
statement.addBatch(sql)
})
statement.executeBatch()
conn.commit()
lowWindowsDetails.close()
println(NowDate.NowDate()+s":${filename}文件的表写入Oracle数据库完毕。。。。")
}
}
7,操作hdfs
object UsePutHdfsShell {
private val runtime = Runtime.getRuntime
def usePutHdfsShell(fileName:String):Unit={
runtime.exec(s"hdfs dfs -put /路径/")
runtime.exec(s"hdfs dfs -put /路径/")
runtime.exec(s"rm -rf //路径/$fileName")
}
}
如果返回的不是Finish就把错误日志写出到Log文件中