zoukankan      html  css  js  c++  java
  • Rocket MQ 1

    参考 http://www.iocoder.cn/categories/RocketMQ/ ; https://www.jianshu.com/nb/16219849

    首先上启动方法,分别启动namesrv/broker/procedure/consumer

        public static void main(String[] args) throws Exception {
            NettyServerConfig nettyServerConfig = new NettyServerConfig();
            NamesrvConfig namesrvConfig = new NamesrvConfig();
            nettyServerConfig.setListenPort(9876);
            NamesrvController nameSrvController = new NamesrvController(namesrvConfig, nettyServerConfig);
            nameSrvController.initialize();
            nameSrvController.start();
            Thread.sleep(1000000);
        }
        public static void main(String[] args) throws Exception {
            System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
            final NettyServerConfig nettyServerConfig = new NettyServerConfig();
            nettyServerConfig.setListenPort(10911);
            final BrokerConfig brokerConfig = new BrokerConfig();
            brokerConfig.setBrokerName("broker-a");
            brokerConfig.setAutoCreateTopicEnable(true);
            brokerConfig.setNamesrvAddr("127.0.0.1:9876");
            final MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
            messageStoreConfig.setDeleteWhen("04");
            messageStoreConfig.setFileReservedTime(48);
            messageStoreConfig.setFlushDiskType(FlushDiskType.ASYNC_FLUSH);
            messageStoreConfig.setDuplicationEnable(false);
            
            BrokerController brokerController = new BrokerController(
                    brokerConfig,
                    nettyServerConfig,
                new NettyClientConfig(),
                messageStoreConfig);
            assertThat(brokerController.initialize());
            brokerController.start();
            Thread.sleep(1000000);
            brokerController.shutdown();
        }
        public static void main(String[] args) throws MQClientException, InterruptedException {
            DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
            producer.setNamesrvAddr("127.0.0.1:9876");
            producer.start();
            for (int i = 0; i < 1000; i++) {
                try {
                    Message msg = new Message("TopicTest" /* Topic */,
                        "TagB" /* Tag */,
                        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
                    );
                    SendResult sendResult = producer.send(msg);
                    System.out.printf("%s%n", sendResult);
                } catch (Exception e) {
                    e.printStackTrace();
                    Thread.sleep(1000);
                }
            }
            producer.shutdown();
        }
        public static void main(String[] args) throws InterruptedException, MQClientException {
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
            consumer.setNamesrvAddr("127.0.0.1:9876");
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            consumer.subscribe("TopicTest", "*");
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                    ConsumeConcurrentlyContext context) {
                    System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
            consumer.start();
            System.out.printf("Consumer Started.%n");
        }
  • 相关阅读:
    springboot热启动中那些不为人知的东东
    maven生命周期(lifecycle)—— maven权威指南学习笔记(四)
    maven 一个简单项目 —— maven权威指南学习笔记(三)
    maven 安装、运行、获取帮助 —— maven权威指南学习笔记(二)
    maven 简介 —— maven权威指南学习笔记(一)
    用opencsv文件读写CSV文件
    java基础之——DecimalFormat格式化数字
    Git学习
    Spring Boot教程(二十四)Web应用的统一异常处理
    Spring Boot教程(二十三)使用Swagger2构建强大的RESTful API文档(2)
  • 原文地址:https://www.cnblogs.com/it-worker365/p/10039419.html
Copyright © 2011-2022 走看看