zoukankan      html  css  js  c++  java
  • java-ActiveMQ

    ActiveMQ安装

    下载 activeMQ jar包。(https://activemq.apache.org/download.html

    1、直接运行

    2、在服务里运行(如果要安装服务,点击InstallService.bat ,安装服务后,可以再服务控制台启动mq)

    进入管理后台进行测试:http://127.0.0.1:8161,用户名、密码一般为admin:admin

    例:

    接收端:

    @Component //申明为spring组件
    public class GetSpeechRecognition_XF implements MessageListener{
        // 静态初使化当前类
        public static GetSpeechRecognition_XF getSpeechRecognitionInfo;
        //注解@PostConstruct,这样方法就会在Bean初始化之后被Spring容器执行
        @PostConstruct
        public void init() {
            getSpeechRecognitionInfo = this;
        }
        
        private static final Logger logger = Logger.getLogger(GetSpeechRecognition_XF.class);
        private  String ipAndPortXF = SysConfigItemValue.getValue("IpAndPortXF");//监听地址
        private  String courtIdXF = SysConfigItemValue.getValue("CourtIdXF");//ID
        private static MessageConsumer consumer = null; 
        private static Connection connection = null;
        private static Destination destination = null;
        private static Session session = null;
        private static ConnectionFactory connectionFactory = null;
        public static String textMessage = "";
        /**
         * 启动监听
         */
        public void reciveXF() {
                    try {
                        //连接工厂(连接工厂,JMS 用它创建连接)
                        connectionFactory = new ActiveMQConnectionFactory(
                                ActiveMQConnection.DEFAULT_USER,
                                ActiveMQConnection.DEFAULT_PASSWORD,
                                ipAndPortXF //"tcp://213.138.160.57:61616"
                                );
                        //构造从工厂连接对象(JMS 客户端到JMS Provider 的连接)
                        connection = connectionFactory.createConnection();
                        connection.start();
                        //获取操作连接(一个发送或接收消息的线程)
                        session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
                        //获取session,发布订阅的Topic方式(消息的目的地;消息发送给谁.)
                        destination = session.createTopic(courtIdXF);//session.createQueue("hzzy01");
                        //(消费者,消息接收者)
                        consumer = session.createConsumer(destination);
                        // 开始监听     
                        consumer.setMessageListener(this);//(异步接收)  
                    } catch (JMSException e) {
                        e.printStackTrace();
                        logger.error("reciveXF(启动对讯飞语音识别监听)异常:"+e.getMessage());
                    }finally {
                        
                    }
        }
    
        public void recive()
        {
            try {
                if (connection == null) {
                    reciveXF();  
                }
            } catch (Exception e) { 
                e.printStackTrace();
                logger.error("recive(监听)异常:"+e.getMessage());
            }     
        }
        
        /**
        * 异步接收(进行MessageListener监听)
        */
        @Override
        public void onMessage(Message arg0) {
            try {
                if(arg0 instanceof TextMessage)
                {   
                    TextMessage txtMsg = (TextMessage) arg0;
                    String txtContent = txtMsg.getText();
                    JSONObject jsonObject = JSONObject.parseObject(txtContent);
                    String text = JSONObject.parseObject(jsonObject.getString("messageMap")).getString("text");
                    String pgs = JSONObject.parseObject(jsonObject.getString("messageMap")).getString("pgs");//讯飞校验后的语音
                    if (text != null && !text.equals("") && pgs.equals("1")) {
                        ConfWebSocketService.sendMessage(text, "2");//向页面发送消息
                    }
                    logger.debug("讯飞返回消息:"+txtContent);
                }
            } catch (Exception e) {
                logger.error("onMessage(讯飞语音识别)异常:"+e.getMessage());
                e.printStackTrace();
            }
        }
    }

    发送端:

    @Component //申明为spring组件
    public class GetSpeechRecognition_XF2_Send {private static final Logger logger = Logger.getLogger(GetSpeechRecognition_XF2_Send.class);
        private static final int SEND_NUMBER = 5;
        private  String systemId = SysConfigItemValue.getValue("SystemIdYJ");//系统编号
        
        public void sendXF() {
            ConnectionFactory connectionFactory; // ConnectionFactory--连接工厂,JMS用它创建连接
            // Provider 的连接
            Connection connection = null; // Connection :JMS 客户端到JMS
            Session session; // Session: 一个发送或接收消息的线程
            Destination destination; // Destination :消息的目的地;消息发送给谁.
            MessageProducer producer; // MessageProducer:消息发送者
            try {
                // 构造ConnectionFactory实例对象,此处采用ActiveMq的实现jar
                //connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
                connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
                        ActiveMQConnection.DEFAULT_PASSWORD, "tcp://127.0.0.1:61616");
                // 构造从工厂得到连接对象
                connection = connectionFactory.createConnection();
                // 启动
                connection.start();
                // 获取操作连接
                session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
                // 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置
                destination = session.createTopic("hzzy01");
                // 得到消息生成者【发送者】
                producer = session.createProducer(destination);
                // 设置不持久化,此处学习,实际根据项目决定
                producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
                // 构造消息,此处写死,项目就是参数,或者方法获取
                sendMessage(session, producer);
                //session.commit();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                try {
                    if (null != connection)
                        connection.close();
                } catch (Throwable ignore) {
                }
            }
            
        }
        
        public static void sendMessage(Session session, MessageProducer producer) throws Exception {
            //for (int i = 1; i <= SEND_NUMBER; i++) {
                TextMessage message = session.createTextMessage("ActiveMq 发送的消息" + 1);
                // 发送消息到目的地方
                System.out.println("发送消息:" + "ActiveMq 发送的消息" + 1);
                producer.send(message);
                //提交消息
                session.commit();
                Thread.sleep(500);
            //}
        }
    }
  • 相关阅读:
    弹出窗口js
    c 中的字符串的一个问题
    2012年4月29日
    iOS开发之详解剪贴板 CocoaChina 苹果开发中文站 最热的iPhone开发社区 最热的苹果开发社区 最热的iPad开发社区 (2)
    自定义标签栏 ios
    使用委托在对象间传递信息
    NSNotificationCenter消息注册与撤销
    Xcode_免证书开发调试_ipad_程序开发
    文件签名
    iOS获取当前系统的相关信息 博客频道 CSDN.NET (3)
  • 原文地址:https://www.cnblogs.com/lijianda/p/9483681.html
Copyright © 2011-2022 走看看