zoukankan      html  css  js  c++  java
  • ActiveMQ实现消息的发送与接受

    activemq是apache的一个JMS接口的实现产品,java中定义了JMS规范,虽然RocketMQ,kafka现在比较火,但是还是建议先学一下activeMQ再学其他两个就容易很多

    首先可以下载压缩包,linux中,Windows中都可以,个人建议linux,目录结构

    直接在bin下面运行:

    然后进入管控台,默认用户密码是admin,admin,可以在配置文件里进行配置:

    ,开启之后新建maven项目,添加mq依赖,这时候就可以实现消息发送和接受了:

    先说一下代码中用到的对象:

     activeMQ是严格实现了JMS规范的,所以代码中也就是这些接口的实现

    发送消息端:

    //发送消息
    public class Sender {
        public static void main(String[] args) throws JMSException {
            //1 创建ConnectionFactory工厂,  用户名  密码  连接url
            // 如果添加了安全机制,只有用有认证的用户去发送或者接受
            ConnectionFactory factory = new ActiveMQConnectionFactory(
                    "houzheng",//默认的用户
                    "houzheng",
                    "tcp://localhost:61616"  //默认端口: 61616
            );
            //2 创建连接,并开启,默认是关闭的,并且完成后要关闭连接
            Connection connection = factory.createConnection();
            connection.start();
            //3 创建session   参数: 是否启用事务   选择签收模式(自动签收,客户端签收,不确认是否成功签收(消息可能崇重复))
            Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
            //4 创建Destination对象, 参数: 指定消息队列名称
            //Destination指的是消息的目的地或者消费来源,TPT模式中成为Queue,订阅/发布中称为主题
            Destination destination=session.createQueue("myqueue");
            //5 通过session对象创建消息的发送者或者接受者
            MessageProducer producer = session.createProducer(destination);//传入Destination
            //6 可以设置持久化或者非持久化
            producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
            //7 发送消息并关闭连接,可发送多种定义格式的消息
            for (int i = 0; i < 5; i++) {
                TextMessage textMessage = session.createTextMessage();
                textMessage.setText("我是第"+i+"条消息");
                producer.send(textMessage);
            }
            //如果开启了事务,则必须要提交才能全部发送到服务器
            //session.commit();
            if(connection!=null)connection.close();
        }
    }  

    这里我使用了自己配置的用户,使用了安全机制,即只有我认证的用户才能发送或者接受消息,当然也可以使用默认的用户,不过不要加安全验证,加了就必须使用验证通过的

    接受消息端:

    //消费者
    public class Receiver {
        public static void main(String[] args) throws JMSException {
            //1 创建ConnectionFactory工厂,  用户名  密码  连接url
            ConnectionFactory factory = new ActiveMQConnectionFactory(
                    "houzheng",//默认的用户
                    "houzheng",
                    "tcp://localhost:61616"  //默认端口: 61616
            );
            //2 创建连接,并开启,默认是关闭的,并且完成后要关闭连接
            Connection connection = factory.createConnection();
            connection.start();
            //3 创建session   参数: 是否启用事务   选择签收模式(自动签收,客户端签收,不确认是否成功签收(消息可能崇重复))
            Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
            //4 创建Destination对象, 参数: 指定消息队列名称
            //Destination指的是消息的目的地或者消费来源,TPT模式中成为Queue,订阅/发布中称为主题
            Destination destination=session.createQueue("myqueue");//必须和生产者指定的queue一样
            //5 通过session对象创建消息的发送者或者接受者
            MessageConsumer consumer = session.createConsumer(destination);//传入Destination
            //6 接受消息
            while(true){
                TextMessage message = (TextMessage) consumer.receive();//阻塞接受,还可以指定等待时间或者不阻塞接受,没有就跳过
                if(message==null)break;
                System.out.println(message.getText());
            }
            if(connection!=null)connection.close();//关闭连接
        }
    }
    

      发送消息并消费之后就可以去管控台查看消息队列,这里只是PTP模式的

     

    或者也可以进行持久化到mysql中,直接去数据库中查询,不过还是使用默认的kahadb或者leveldb这种内存数据存储性能比较好,mysql 可能比较直观看到!

  • 相关阅读:
    mysql 主从配置 读写分离
    interface接口
    http结构
    call_user_func函数
    pcntl_fork 进程
    数据库事务
    php 之 ob缓冲
    shell脚本
    php 守护进程
    ssdb zset
  • 原文地址:https://www.cnblogs.com/houzheng/p/9598112.html
Copyright © 2011-2022 走看看