zoukankan      html  css  js  c++  java
  • 阿里云RocketMQ的生产者简单实现

    // MQ的应用场景有比如 订单变更消息可以通过产生这个事件的地方(比如前端调用后端的接口post一个订单,那么就是在这个mapping方法里做一个生产者【不过最好通过aop来实现,不然n多个接口都要写生产者代码】,
    // 将客户端发来的订单消息存入MQ里,然后相关的服务【比如需要通知后台管理人员,那么通知后台管理人员的service肯定是一个consumer来消费该消息,不过的组的消费者是可以多次消费该消息的,这符合现实逻辑,我有一个消息
    // 需要让多个部门知道,那么这个部门其实就是消费组,但是一个组里只有一个consumer消费该消息】会处理这条消息(比如入库/调用相关接口啥的)
    // 注意,消息发送给消息队列后,如果没有任何消费组有消费过,那么这条消息会被保存直到被至少一个消费组消费;已经消费的消息正常情况下不会再被消费;
    package
    com.test.eee; import com.aliyun.openservices.ons.api.*; import java.util.Date; import java.util.Properties; import java.util.Scanner; /** * 阿里云RocketMQ Producer简单实现 */ public class MQTestProducer { public static void main(String[] args) { Properties properties = new Properties(); // GROUP_ID其实就是网上文章里的GroupName,对于RocketMQ而言一条消息会分发给所有的订阅了此消息(topic/tag)的group // 但是一个group里的consumer只有一个会得到这条消息 // TODO 经过测试阿里云上的RocketMQ可以不设置组 //properties.setProperty(PropertyKeyConst.GROUP_ID, "GID_CONSUMER_EEE"); // AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建 properties.put(PropertyKeyConst.AccessKey, "sss"); // SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建 properties.put(PropertyKeyConst.SecretKey, "bbb"); //设置发送超时时间,单位毫秒 properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000"); // 设置 TCP 接入域名,到控制台的实例基本信息中查看 properties.put(PropertyKeyConst.NAMESRV_ADDR, "http://wwws.mq-internet-access.mq-internet.aliyuncs.com:80"); Producer producer = ONSFactory.createProducer(properties); // 在发送消息前,必须调用 start 方法来启动 Producer,只需调用一次即可 producer.start(); //循环发送消息 //for (int i = 0; i < 100; i++) { Message msg = new Message( // Message 所属的 Topic "sss-change", // TODO Topic表示某种业务的类型,而Tag则是对这种业务的具体的划分 // 比如消息类型是保存的是微信消息,那么可以分为好友消息/群组消息/小程序消息等等Tag "TagA", // Message Body 可以是任何二进制形式的数据, MQ 不做任何干预, // 需要 Producer 与 Consumer 协商好一致的序列化和反序列化方式 "Hello 中文拉拉".getBytes()); // 设置代表消息的业务关键属性,请尽可能全局唯一。 // 以方便您在无法正常收到消息情况下,可通过阿里云服务器管理控制台查询消息并补发 // 注意:不设置也不会影响消息正常收发 // TODO 这个key就是比Tag更加精确的标识,一般设置为整个Topic里的消息唯一标识 //msg.setKey("ORDERID_" + i); try { SendResult sendResult = producer.send(msg); // 同步发送消息,只要不抛异常就是成功 // TODO 发送的每条消息在RocketMQ里都有一个唯一的MessageId if (sendResult != null) { System.out.println(new Date() + " Send mq message success. Topic is:" + msg.getTopic() + " msgId is: " + sendResult.getMessageId()); } } catch (Exception e) { // 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理 System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic()); e.printStackTrace(); } // } var scanner = new Scanner(System.in); scanner.next(); System.out.println("closed producer conn."); // 在应用退出前,销毁 Producer 对象 // 注意:如果不销毁也没有问题 producer.shutdown(); } }

    如果是要结合Spring,则可以用ProducerBean代替Producer

    @Bean(initMethod = "start", destroyMethod = "shutdown")
        public ProducerBean xxxProducer() {
            return this.getProducer(gid);
        }
    
    
        private ProducerBean getProducer(String gid) {
            Properties properties = new Properties();
            properties.setProperty("addr", nameSrvAddr);
            properties.setProperty("AccessKey", accessKey);
            properties.setProperty("SecretKey", secretKey);
            // 对于producer可以不要gid
            //properties.setProperty("GROUP_ID", gid);
    
            ProducerBean producer = new ProducerBean();
            producer.setProperties(properties);
            return producer;
        }
    
    用的时候就是用过这个bean.send(new Message(...))来实现发送
  • 相关阅读:
    \r,\n,\r\n的区别
    \r,\n,\r\n的区别
    C# TextBox 换行 滚动到最后一行
    C# TextBox 换行 滚动到最后一行
    C# Textbox 始终保持最后最后一行
    C# Textbox 始终保持最后最后一行
    踩坑之mongodb配置文件修改
    踩坑之mongodb配置文件修改
    开启mongodb 的web
    开启mongodb 的web
  • 原文地址:https://www.cnblogs.com/silentdoer/p/11101463.html
Copyright © 2011-2022 走看看