zoukankan      html  css  js  c++  java
  • Java线程监控及中断

    我们系统中经常有耗费时间长的任务,但客户端往往需要马上得到回应。这时我们就可以如下步骤实现:

    1、客户端发起请求执行任务(选定条件,下载报表);

    2、首先将任务ID及开始时间,起始状态记录到数据库表中;

    3、另起一个后台线程去执行这个耗时任务(比如生成报表);

    4、线程执行成功或失败状态记录到数据库;

    5、客户通过异步查询数据(下载报表或其他操作)。

    好了,大致步骤我们清楚了。假如这个耗时任务一直执行,而且和消耗系统资源。我们往往想放弃这个任务的执行,再缩小范围执行更小的任务执行。那我们如何实现呐!

    话不多说,直接上代码:

    1.首先我们实现一个线程管理工具:

    import java.sql.DriverManager
    import java.util.concurrent.ConcurrentHashMap

    import org.slf4j.LoggerFactory

    import scala.util.{Failure, Success, Try}

    /**
    * 类功能描述:报表线程管理器
    *
    * @author WangXueXing create at 18-11-2 上午11:35
    * @version 1.0.0
    */
    object ReportThreadManager {
    private val logger = LoggerFactory.getLogger(getClass)
    /**
    * 报表ID与对应线程map
    */
    val REPORT_THREAD_MAP: ConcurrentHashMap[Long, Thread] = new ConcurrentHashMap()

    /**
    * 将对应报表子线程放入线程池
    *
    * @param reportId 报表ID
    * @param thread 对应子线程
    */
    def put(reportId: Long, thread: Thread): Unit = {
    REPORT_THREAD_MAP.put(reportId, thread)
    }

    /**
    * 获取对应报表线程
    * @param reportId 报表ID
    * @return
    */
    def get(reportId: Long): Thread ={
    REPORT_THREAD_MAP.get(reportId)
    }

    /**
    * 将对应报表子线程移除线程池
    * @param reportId 报表ID
    */
    def remove(reportId: Long): Unit ={
    REPORT_THREAD_MAP.remove(reportId)
    }

    /**
    * 销毁指定报表子线程
    * @param reportId 报表ID
    */
    def deploy(reportId: Long)={
    val thread = REPORT_THREAD_MAP.get(reportId)
    if(thread != null){
    Try{
    if(!thread.isInterrupted){
    logger.info(s"线程:${reportId} 开始被结束")

    logger.info("before interrupt")
    thread.getStackTrace.foreach(println)

    thread.interrupt()

    Thread.sleep(10)
    logger.info("after interrupt")
    thread.getStackTrace.foreach(println)
    }
    } match {
    case Success(x) => logger.info(s"线程:${reportId} 被成功杀死")
    case Failure(e) => logger.error(s"线程:${reportId} interrupt 失败", e)
    }
    REPORT_THREAD_MAP.remove(reportId)
    }
    }

    val thread1 = new Thread(new Runnable {
    override def run(): Unit = {
    ReportThreadManager.deploy(1)
    println(s"thread 1 killed")
    }
    })

    def main(args: Array[String]): Unit = {
    Class.forName("org.apache.hive.jdbc.HiveDriver")
    val con = DriverManager.getConnection("jdbc:hive2://192.168.71.127:10000/finance", "goods", null)
    val stmt = con.createStatement
    var res = stmt.executeQuery("SELECT company_name,store_code,store_name,source_date,trade_type,trade_no,third_party_payment_no,business_type,cast(business_amount as decimal(20, 2)) business_amount,cast(service_charge as decimal(20, 4)) service_charge,trade_time,customer_account,updated_at,created_at FROM t_pay_source_data")
    while(res.next()){
    println(res.getString(1))
    }
    }
    }
    此工具可以实现根据任务ID添加当前任务执行线程,也可以从线程池移除此线程,根据任务ID中断线程。

    2.如下任务执行过程及如何调用线程管理及中断线程:
    import java.io.{File, FileInputStream}

    import com.today.api.financereport.scala.request.ReportInfo
    import com.today.api.financereport.scala.response.ServiceResponse
    import com.today.service.financereport.action.{ExportReportRecordFailureAction, ExportReportRecordSuccessAction, StartExportReportAction}
    import com.today.service.financereport.common.ReportThreadManager
    import com.today.service.financereport.dto.{ExportReportFailureInput, ExportReportSuccessInput, ReportOutput}
    import com.today.service.financereport.util.{Debug, OssUtil}
    import org.slf4j.LoggerFactory

    import scala.concurrent.ExecutionContext.Implicits.global
    import scala.concurrent.Future
    import scala.util.control.NonFatal
    import scala.util.{Failure, Success}

    /**
    * 报表导出流程限定类
    *
    * @author BarryWang create at 2018/5/17 9:15
    * @version 0.0.1
    */
    trait ReportAction {
    protected val logger = LoggerFactory.getLogger(getClass)
    /**
    * 报表导出流程
    * @return
    */
    def execute: ServiceResponse = {
    var result:ServiceResponse = null
    var count = Counter.count
    logger.info(s"---1生成报表前运行个数:${count}----------")
    if(count >= Counter.MAX_COUNT.intValue()){
    result = ServiceResponse("405", s"目前有多个报表正在导出,请5分钟后再操作,谢谢!")
    } else {
    Counter.increment
    count = Counter.count
    logger.info(s"---2启动生成报表运行个数:${count}----------")
    var reportId: Long = -1
    try {
    //1. 前置检查
    preCheck

    //2. 开始生成报表纪录
    val reportInfo = setReportInfo
    reportId = startGenerate

    //3. 生成报表处理
    val output: Future[ReportOutput] = Future {
    Debug.reset()
    //添加当前子线程到线程管理池
    ReportThreadManager.put(reportId, Thread.currentThread())
    //1) 加载模板
    val templateInfo = Debug.trace(s"${reportInfo.reportType}-loadTemplate:")(loadTemplate(setTemplatePath))
    //2) 生成报表
    Debug.trace(s"${reportInfo.reportType}-generateReport:")(generateReport(reportId, templateInfo))
    //3) 保存报表
    val output = Debug.trace(s"${reportInfo.reportType}-saveReport:")(saveReport(templateInfo.localFile))

    //将此子线程从线程管理池移除
    ReportThreadManager.remove(reportId)

    Debug.info()
    output
    }
    output.onComplete {
    case Success(v) => {
    successGenerate(ExportReportSuccessInput(reportId, v))
    Counter.decrement
    count = Counter.count
    logger.info(s"---3结束报表生成运行个数:${count}----------")
    }
    case Failure(e) => {
    failureGenerate(ExportReportFailureInput(reportId, e))
    Counter.decrement
    count = Counter.count
    logger.info(s"---3结束报表生成运行个数:${count}----------")
    }
    }

    //4. 后置检查
    postCheck

    result = ServiceResponse("200", "请到导出管理查看或下载报表")
    } catch {
    case NonFatal(e) =>
    Counter.decrement
    count = Counter.count
    logger.info(s"---3结束报表生成运行个数:${count}----------")
    failureGenerate(ExportReportFailureInput(reportId, e))
    throw e
    } finally {}
    }
    result
    }

    /**
    * 前置条件检查:动作、状态等业务逻辑
    */
    def preCheck

    /**
    * 设置报表信息
    * @return
    */
    def setReportInfo: ReportInfo

    /**
    * 设置模板路径
    * @return
    */
    def setTemplatePath: String

    /**
    * 开始生成报表纪录
    */
    def startGenerate(): Long = {
    new StartExportReportAction(setReportInfo).execute
    }

    /**
    * 加载模板
    * @param templatPath
    */
    def loadTemplate(templatPath: String): ExcelTemaplateInfo = {
    val suffix = isZip match {
    case true => ".zip"
    case false => ".xlsx"
    }
    //生成本地文件
    val localFile = File.createTempFile(downloadFileName+"_", suffix)
    ExcelTemaplateInfo(templatPath, localFile)
    }

    /**
    * 下载文件名
    * @return
    */
    def downloadFileName: String = setReportInfo.reportType.name

    /**
    * 根据数据生成报表
    * @return
    */
    def generateReport(reportId: Long, templateInfo: ExcelTemaplateInfo)

    /**
    * 将生成在本地的报表上传到阿里SSO
    * @param localFile
    * @return
    */
    def saveReport(localFile: File): ReportOutput = {
    val fileUrl = OssUtil.uploadFileStream(new FileInputStream(localFile), localFile.getName)
    localFile.deleteOnExit()
    val suffix = isZip match {
    case true => ".zip"
    case false => ".xlsx"
    }
    // OssUtil.downloadFile(fileUrl, "/home/barry/data/1122322"+suffix)
    ReportOutput(fileUrl)
    }

    /**
    * 最终生成报表是否为Zip
    * @return
    */
    def isZip: Boolean

    /**
    * 成功生成报表纪录
    * @param result: ExportReportSuccessInput
    */
    def successGenerate(result: ExportReportSuccessInput): Unit = {
    new ExportReportRecordSuccessAction(result).execute
    }

    /**
    * 失败生成报表纪录
    * @param result: ExportReportFailureInput
    */
    def failureGenerate(result: ExportReportFailureInput): Unit = {
    new ExportReportRecordFailureAction(result).execute
    }

    /**
    * 后置检查
    */
    def postCheck = {}
    }

    3.客户端触发中断当前任务线程:
    import com.today.api.financereport.scala.response.ServiceResponse
    import com.today.service.commons.Action
    import com.today.service.financereport.action.sql.ExportReportRecordActionSql
    import com.today.service.financereport.common.ReportThreadManager

    /**
    * 类功能描述:报表删除Action
    *
    * @author WangXueXing create at 18-11-2 下午2:17
    * @version 1.0.0
    */
    class DeleteExportReportAction(id: Long) extends Action[ServiceResponse]{
    override def preCheck: Unit = {}

    override def action: ServiceResponse = {
    val result = ExportReportRecordActionSql.deleteExportReportById(id)
    result match {
    case 0 => ServiceResponse("501","删除报表失败")
    case _ => {
    //删除报表的同时结束生成报表线程
    new Thread(new Runnable {
    /**
    * 延迟退出系统
    */
    override def run(): Unit = {
    Thread.sleep(50)
    ReportThreadManager.deploy(id)
    }
    }).start()
    ServiceResponse("200","success")
    }
    }
    }
    }
  • 相关阅读:
    assign()与create()的区别
    ES6对象扩展——部分新的方法和属性
    ES6对象扩展——扩展运算符
    rest operater剩余操作符
    深拷贝和浅拷贝
    for in和for of的简单区别
    查询ES6兼容的网站
    ES6扩展——对象的扩展(简洁表示法与属性名表达式)
    滚动条设置样式
    marquee横向无缝滚动无js
  • 原文地址:https://www.cnblogs.com/barrywxx/p/10009884.html
Copyright © 2011-2022 走看看