zoukankan      html  css  js  c++  java
  • 异步远程调用线程池队列

    import com.google.common.util.concurrent.ThreadFactoryBuilder;
    import lombok.extern.slf4j.Slf4j;
    import java.util.concurrent.*;

    1、实现类

    @Slf4j
    public class MucRpcThreadPoolExecutor {
    /**
    * 任务队列
    * 设置有界队列长度为1024
    */
    private static BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(1024);

    private static ExecutorService executorService = new ThreadPoolExecutor(
    4, 16, 60, TimeUnit.SECONDS,
    queue,
    new ThreadFactoryBuilder().setNameFormat("muc-rpc-thd-%d").build(),
    new RpcExecutionHandler());

    public static void asyncRpcSubmit(AbstractRpcRunner runner){
    executorService.submit(runner);
    }


    private static class RpcExecutionHandler implements RejectedExecutionHandler{
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
    String name = Thread.currentThread().getName();
    if (r instanceof AbstractRpcRunner){
    if (((AbstractRpcRunner) r).getBaseRpcBo() != null)
    name = ((AbstractRpcRunner) r).getBaseRpcBo().getReqId();
    }
    log.warn("muc-rpc-thread-pool线程队列已满,新任务被丢弃:{}", name);
    }
    }
    }
    2、线程抽象类
    //BaseRpcBo 请求实体类
    public  abstract class AbstractRpcRunner <T extends BaseRpcBo> implements Runnable {
    protected T baseRpcBo;


    public T getBaseRpcBo() {
    return baseRpcBo;
    }

    public void setBaseRpcBo(T baseRpcBo) {
    this.baseRpcBo = baseRpcBo;
    }
    }

    3、实体类
    @Data
    public class BaseRpcBo implements Serializable {
    /**
    * 请求id
    */
    private String reqId;

    /**
    * 远程调用地址
    */
    @JSONField(serialize = false)
    private String rpcUrl;

    /**
    * 请求头
    */
    @JSONField(serialize = false)
    private Map<String, String> headers;
    }

    4、调用方法
    public void asyncGrowthUp(String uid) {
    GrowthUpRpcBo rpcBo = new GrowthUpRpcBo();
    rpcBo.setStamp(DateFormatUtils.format(new Date(), "yyyyMMddHHmmss"));
    rpcBo.setUid(uid);
    rpcBo.setReqId(UUIDUtil.randomId());
    rpcBo.setRpcUrl(mjHttpUtils.getAbsUrl(MjConstant.API_GROWTH_UP_SYNC));
    Map<String,String> header = new HashMap<>(1);
    header.put("Content-Type", "application/json;charset=UTF-8");
    rpcBo.setHeaders(header);
    MucRpcThreadPoolExecutor.asyncRpcSubmit(new PostJsonRpcRunner(mjHttpUtils, rpcBo));
    log.info("growth up notify to iot finish, uid:{}, reqId:{}", uid, rpcBo.getReqId());
    }
    5、GrowthUpRpcBo
    @Data
    public class GrowthUpRpcBo extends BaseRpcBo{
    /**
    * 请求时间戳
    * 格式为 yyyyMMddHHmmss
    */
    String stamp;
    /**
    * 用户id
    */
    String uid;
    }




  • 相关阅读:
    Javascript代码收集
    JS表自动取值赋值
    数据分析04 /基于pandas的DateFrame进行股票分析、双均线策略制定
    数据分析03 /基于pandas的数据清洗、级联、合并
    数据分析02 /pandas基础
    数据分析01 /numpy模块
    爬虫07 /scrapy图片爬取、中间件、selenium在scrapy中的应用、CrawlSpider、分布式、增量式
    爬虫06 /scrapy框架
    爬虫05 /js加密/js逆向、常用抓包工具、移动端数据爬取
    爬虫04 /asyncio、selenium规避检测、动作链、无头浏览器
  • 原文地址:https://www.cnblogs.com/fuqiang-terry/p/12529781.html
Copyright © 2011-2022 走看看