zoukankan      html  css  js  c++  java
  • 生产者MessageQueueSelector实战

     top下面默认有四个Queue, queque的数量不能大约配置,否者会报错

    假设一个top下面有三个类目,分别是手机,衣服,食品,他们发送消息都是随机发送到一个queue里面,如果有一天,衣服的消息突然增多了,堵塞队列了,其他两个类目也会受到影响,造成消息发送失败,这个的话就可以指定类目发送到哪个queue,手机指定发送到队列0 ,衣服发送队列1,这样即使衣服的消息增多了也不会影响其他队列,但这样也会失去负载均衡

    代码案例:

    同步发送

    @Override
    public MtopResult api(MtopInnerRequest request) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
    List<Produce> list = Produce.produceList();

    //MessageQueueSelector 选择
    if (! CollectionUtils.isEmpty(list)){
    for (Produce produce : list) {
    Message message = new Message("box","orderMessage",produce.getId(),JSON.toJSONString(produce).getBytes());

    SendResult send = payProduct.getProducer().send(message, new MessageQueueSelector() {
    @Override
    public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
    Integer queusNum = Integer.valueOf(o.toString());
    return list.get(queusNum);
    }
    }, 0);
    System.out.printf("发送结果=%s, msg=%s ", send.getSendStatus(), send.toString());
    }
    }

    top 默认是4个queue

    queue不能大于配置,

     报错

    异步发送

    //MessageQueueSelector 选择
    if (!CollectionUtils.isEmpty(list)) {

    for (Produce produce : list) {
    Message message = new Message("box", "orderMessage", produce.getId(), JSON.toJSONString(produce).getBytes());
    payProduct.getProducer().send(message, (list1, message1, o) ->
    {
    Integer integer = Integer.valueOf(o.toString());
    return list1.get(integer);
    }
    , 4, new SendCallback() {

    @Override
    public void onSuccess(SendResult sendResult) {
    System.out.printf("发送结果=%s, msg=%s ", sendResult.getSendStatus(), sendResult.toString());

    }

    @Override
    public void onException(Throwable throwable) {
    System.out.println(throwable);
    throwable.printStackTrace();
    }
    });
    }





    }
  • 相关阅读:
    前端编程之jQuery
    当nfs-server宕机后,client端执行 df -h hang
    git 设置默认推送和拉去的分支
    python2 中字符串转成字典后汉字出现乱码
    1.in_k8s
    部署etcd中使用ansible进行变量初始化
    获取aliyun固定类型的domain记录并输出到文件
    jumpserver 1.3x 版本忘记MFA的如何解决
    pistat 查看进程状态
    iostat 命令
  • 原文地址:https://www.cnblogs.com/HuangXingLei/p/12620242.html
Copyright © 2011-2022 走看看