zoukankan      html  css  js  c++  java
  • apollo stomp client 代码示例

    0、环境准备

       0.1、linux

       0.2、java

       0.3、下载apollo二进制包,解压

       0.4、创建broker,名字为 userlog

    {APOLLO_HOME}/bin/apollo create userlog

      0.5 启动apollo

      cd {APOLLO_HOME}/userlog

      bin/apollo-broker run

      如果需要以服务形式启动,执行下面命令

      sudo ln -s "/opt/db/apache-apollo-1.7.1/test/bin/apollo-broker-service" /etc/init.d/

      /etc/init.d/apollo-broker-service start

    1、各协议开放的端口

    INFO  | Accepting connections at: tcp://0.0.0.0:61613

    INFO  | Accepting connections at: tls://0.0.0.0:61614

    INFO  | Accepting connections at: ws://0.0.0.0:61623/

    INFO  | Accepting connections at: wss://0.0.0.0:61624/

    INFO  | Administration interface available at: https://127.0.0.1:61681/

    INFO  | Administration interface available at: http://127.0.0.1:61680/

    3、生产者

    Client c = new Client("172.16.163.141", 61613, "admin", "password");

    final HashMap headers = new HashMap();

    for (int i = 0; i < 5; i++) {

    c.send("/topic/productlogs", "M" + i);

    System.out.println("Send:" + "M" + i);

    Thread.sleep(20);

    }

    c.disconnect(headers);

    4、消费者

    Client c = new Client("172.16.163.141", 61613, "admin", "password");

    final HashMap headers = new HashMap();

    c.subscribe("/topic/productlogs", new Listener() {

    @Override

    public void message(Map headers, String body) {

    System.out.println("receive:" + body);

    }

    }, headers);

    4.1、发送消息

        先启动生产者,这时没有消费者,消息会被丢掉。消息发送完后,toplic会被自动删掉

    注意:当不进行持久化的时候,在生产者启动之后的消费者,不能收到生产者发送的历史消息,只能从没有丢掉的消息开始接收。

    4.2、消费消息

       启动消费者,这时,如果有消息,将打印消息内容。

    如果消费者启动2份,每一个消费者,将会受到所有消息,如,生产者发5条消息,这5条消息,这些消费者每个都收到5条消息

    4.3 消息分组与持久化

    下面我们事先所有消费者均摊这5条消息

    消费者中添加以下代码

    headers.put("persistent", "true");//persistent 的值为string,否则,会产生意想不到的效果(所有消费者都收不到任何消息)

    headers.put("id", "dusb_1");//id 为消息分组的依据,每一组收到全量消息。id相同的消费者,均摊该组的所有消息。

    //headers.put("credit", "50,0");

    实验1:

    启动2份消费者,这时,消息被均摊到这两个消费者上,比如,第一个消费者收到3条,第二条收到2条,即消息均摊。

    实验2:

    此时,若是,关闭所有消费者,重新启动生产者,发送5条消息,

    web页面上durable subs 会有5条消息持久化,之后消费者head id为dsub_1的消费者,才会接受刚才发送的消息。

    结论:当该id组的消费者都挂掉的时候,消息会进行持久化。下次该组消费者启动时,继续进行消息消费。

    思考:这时候,如果启动四个消费者,两个消费者id为dusb_1,另外两个消费者id为dusb_2,启动生产者发送5条消息,这5条消息会怎么分发。

    答案:

    5条消息按照id进行广播,即id 为dusb_1的2个消费者收到这5条消息。这5条消息在2个消费者之间进行均摊。

    id 为dusb_2的2个消费者也收到这5条消息,5条消息在这两个消费者之间进行均摊

    再次思考1:接上面实验,若是断开2个分组内的所有消费者,用生产者,发送5条消息。此时直接重启机子,消息该怎么持久化? 

    结果:dusb_1持久化5条消息,dsub_2也持久化5条消息。

    再次思考2:若刚才没有直接重启机子,而是启动id为dsub_2的消费者进行消费。当该组消息被消费完后,直接重启机子。消息该怎么持久化。

    结果:dusb_1持久化5条消息,dusb_2由于消息已经消费完,所以没有消息被持久化。

    重启机子:用于模拟意外状况。

    结论:持久化的粒度是durable subs,也就是说persistent必须和id一起使用。说明该组消息已经被持久化 

    5、stomp消息确认Ack

    消息确认方式:auto 默认自动确认。即:只要接收到消息,即认为消息消费成功,进行自动确认。

    客户端确认:client 确认当前消息时,顺便确认之前没有来得急确认的消息。比如确认消息还没发送到服务端,就断开了。

    客户端确认:client-individual  只确认当前消息

    6、消息驳回NACK

    ack模式为client-individual:适用于驳回当前单个消息, 

    ack模式为client:适用于驳回那些还没有被ACK'edNACK'ed的消息

    其他参数详见:http://www.cnblogs.com/piaolingzxh/p/5450176.html

     7、apollo web ui

    入口: https://127.0.0.1:61681/

    默认账户密码:admin password

    7.1 主要概念

    queue:当生产者和消费者都断开时,会自动删除。格式:/queue/myQueueName

    topic:当生产者和消费者都断开时,会自动删除。格式:/topic/myTopicName

    durable subs:消息分组的依据。持久化的粒度

    7.2 有图有真相

    查看durable subs,可以看到刚才实验的两个id,分别为dsub_1,dsub_2

    点击dsub_1 ,查看该id分组的持久化信息

    生产者、消费者如下

    7.3 实用技巧:定位生产者、消费者程序

    上边可以看到生产者和消费者的IP及端口,

    下边,我们通过IP及端口定位程序,

    ssh到IP地址

    lsof -i:59446 #列出占用该端口的进程PID信息,如4133

    ps -ef | grep 4133

    这样就可以定位到主进程文件了。

  • 相关阅读:
    结合P2P软件使用Ansible分发大文件
    Centos7 上安装 FastDFS
    go在centos配置以及go mod配置
    代理
    笔记本安装ubuntu18.08,解决过程中出现的各种问题
    CentOS7设置自定义开机启动脚本,添加自定义系统服务
    gitlab忘记密码找回
    zabbix配置短信报警
    将博客搬至CSDN
    RT-Thread-stm32f769-qspi-flash移植
  • 原文地址:https://www.cnblogs.com/piaolingzxh/p/5463918.html
Copyright © 2011-2022 走看看