zoukankan      html  css  js  c++  java
  • RocketMq入门 配置的acl没有生效

    修改RocketMq源码  distribution/conf/broker.conf 和 distribution/conf/plain_acl.yml之后,配置文件未生效

    在启动broker.startup时,配置运行环境

     Program arguments: 配置文件 输入 -c C:UsersAdministratorDesktop ocketmq-all-4.7.1-source-releasedistributionconfroker.conf

    Environment variables:设置环境变量 输入 ROCKETMQ_HOME=C:UsersAdministratorDesktop ocketmq-all-4.7.1-source-releasedistribution

    rocketmq中只有broker需要设置环境变量

    producer使用acl:

    public static void main(String[] args) throws Exception {
    
        //Instantiate with a producer group name.
        DefaultMQProducer producer = new
                DefaultMQProducer("please_rename_unique_group_name",getAclRPCHook());
    
        // Specify name server addresses.
        producer.setNamesrvAddr("localhost:9876");
    
        //Launch the instance.
        producer.start();
    
        for (int i = 0; i < 10; i++) {
            //Create a message instance, specifying topic, tag and message body.
            Message msg = new Message("TopicTest",
                    "TagA",
                    ("Hello RocketMQ wangshuai").getBytes(RemotingHelper.DEFAULT_CHARSET));
    
            //Call send message to deliver message to one of brokers.
            SendResult sendResult = producer.send(msg);
    
            System.out.printf("%s%n", sendResult);
        }
        //Shut down once the producer instance is not longer in use.
        producer.shutdown();
    }
    static RPCHook getAclRPCHook(){
        return new AclClientRPCHook(new SessionCredentials("rocketmq","12345678"));
    }

    consumer使用acl:

    public static void main(String[] args) throws InterruptedException, MQClientException {
    
       // Instantiate with specified consumer group name.
       DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name",getAclRPCHook(),new AllocateMessageQueueAveragely());
    
       // Specify name server addresses.
       consumer.setNamesrvAddr("localhost:9876");
    
       // Subscribe one more more topics to consume.
       consumer.subscribe("TopicTest", "*");
    
       // Register callback to execute on arrival of messages fetched from brokers.
    //      注册回调 以便获取到达broker的消息
       consumer.registerMessageListener(new MessageListenerConcurrently() {
          @Override
          public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                                ConsumeConcurrentlyContext context) {
               System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
               return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
    
         //Launch the consumer instance.
         consumer.start();
    
         System.out.printf("Consumer Started.%n");
    }
    static RPCHook getAclRPCHook(){
        return new AclClientRPCHook(new SessionCredentials("rocketmq","12345678"));
    }

      

      

  • 相关阅读:
    【转】Shell编程基础篇-上
    【转】inotify+rsync实现实时同步
    Spring
    jdk,jre,tommcat配置问题
    Java前后台开发
    前端组件学习(一)
    报表工具进阶(二)
    查询时异步刷新问题--用到了ajax
    学习jaspersoft/JasperReport
    利用SQLYog操作数据库mysql
  • 原文地址:https://www.cnblogs.com/pass-ion/p/13489705.html
Copyright © 2011-2022 走看看