zoukankan      html  css  js  c++  java
  • activemq消息生产者与消息消费者简单例子

    消息生产者HelloQueueProducer.java

    package activemq.test;

    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.DeliveryMode;
    import javax.jms.Destination;
    import javax.jms.Message;
    import javax.jms.MessageProducer;
    import javax.jms.Session;

    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;

    public class HelloQueueProducer {
    public static void sendMessage(String activemq_url, String activemq_user, String activemq_pw, String msg, String queue_name) {
    try {
    //通过username,password,url创建连接工厂接口
    ConnectionFactory factory = new ActiveMQConnectionFactory(activemq_user, activemq_pw, activemq_url);
    //通过连接工厂创建一个新的连接接口
    Connection connection = factory.createConnection();
    //打开连接
    connection.start();
    //通过连接接口创建一个会话接口 消息应答模式:Session.AUTO_ACKNOWLEDGE
    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    //会话接口创建有关主题的目标接口
    Destination destination = session.createQueue(queue_name);
    //会话接口再根据目标接口来创建一个消息生产者接口
    MessageProducer producer = session.createProducer(destination);
    producer.setDeliveryMode(DeliveryMode.PERSISTENT);
    //调用会话生成一个文本消息
    // Message message = session.createTextMessage(msg);

    //调用会话生成一个对象消息
    QueueModel model = new QueueModel();
    model.setId("1");
    model.setIdType(1);
    model.setOperType(1);
    model.setDesc("修改");
    Message message = session.createObjectMessage(model);
    //通过生产者接口Send将消息发布到ActiveMQ服务器
    producer.send(message);
    //关闭会话
    session.close();
    //关闭连接
    connection.close();

    } catch (Exception e) {
    e.printStackTrace();
    }
    }
    public static void main(String[] args) {
    // sendMessage("tcp://localhost:61616", ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "发送消息:Hello ActiveMQ Text Message!", "test_queue");
    sendMessage("tcp://localhost:61616", "system", "manager", "发送消息:Hello ActiveMQ Text Message!", "test_queue");
    }
    }

    消息消费者HelloQueueConsumer.java

    package activemq.test;

    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageConsumer;
    import javax.jms.MessageListener;
    import javax.jms.ObjectMessage;
    import javax.jms.Session;
    import javax.jms.TextMessage;

    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;

    public class HelloQueueConsumer implements MessageListener {

    private String activemq_url;
    private String activemq_user;
    private String activemq_pw;
    private String queue_name;

    @Override
    public void onMessage(Message message) {
    //如果消息是TextMessage
    // if (message instanceof TextMessage) {
    // //强制转换一下
    // TextMessage txtMsg = (TextMessage) message;
    // try {
    // //输出接收到的消息
    // System.out.println("HaHa: I'v got " + txtMsg.getText());
    // System.out.println("接收到消息后续处理......");
    // } catch (JMSException e) {
    // e.printStackTrace();
    // }
    // }

    if (message instanceof ObjectMessage) {
    //强制转换一下
    ObjectMessage txtMsg = (ObjectMessage) message;
    //输出接收到的消息
    QueueModel model = null;
    try {
    model = (QueueModel) txtMsg.getObject();
    System.out.println("HaHa: I'v got " + model.getId()+" , "+model.getDesc());
    System.out.println("接收到消息后续处理......");
    } catch (JMSException e) {
    e.printStackTrace();
    }
    }
    }

    public void receiver() {
    try {
    // 通过username,password,url创建连接工厂接口
    ConnectionFactory factory = new ActiveMQConnectionFactory(this.getActivemq_user(), this.getActivemq_pw(), this.getActivemq_url());
    // 通过连接工厂创建一个新的连接接口
    Connection connection = factory.createConnection();
    //打开连接
    connection.start();
    // 通过连接接口创建一个会话接口
    Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
    // 会话接口创建有关主题的目标接口
    Destination destination = session.createQueue(this.getQueue_name());
    // 会话接口再根据目标接口来创建一个消息消费者接口
    MessageConsumer consumer = session.createConsumer(destination);
    //配置监听
    consumer.setMessageListener(this);

    } catch (Exception e) {
    e.printStackTrace();
    }
    }

    public String getActivemq_url() {
    return activemq_url;
    }

    public void setActivemq_url(String activemq_url) {
    this.activemq_url = activemq_url;
    }

    public String getActivemq_user() {
    return activemq_user;
    }

    public void setActivemq_user(String activemq_user) {
    this.activemq_user = activemq_user;
    }

    public String getActivemq_pw() {
    return activemq_pw;
    }

    public void setActivemq_pw(String activemq_pw) {
    this.activemq_pw = activemq_pw;
    }

    public String getQueue_name() {
    return queue_name;
    }

    public void setQueue_name(String queue_name) {
    this.queue_name = queue_name;
    }

    public static void main(String[] args) {
    HelloQueueConsumer consumer = new HelloQueueConsumer();
    consumer.setActivemq_url("tcp://localhost:61616");
    consumer.setActivemq_user("system");
    consumer.setActivemq_pw("manager");
    consumer.setQueue_name("test_queue");
    consumer.receiver();
    }
    }

  • 相关阅读:
    235. Lowest Common Ancestor of a Binary Search Tree
    234. Palindrome Linked List
    233. Number of Digit One
    232. Implement Queue using Stacks
    231. Power of Two
    230.Kth Smallest Element in a BST
    229. Majority Element II
    228. Summary Ranges
    postgres 数组中获取最后一个元素的值
    excel 拆分单元格并填充上一行的数据
  • 原文地址:https://www.cnblogs.com/dead-trap-ramble/p/3451044.html
Copyright © 2011-2022 走看看