zoukankan      html  css  js  c++  java
  • ActiveMQ持久化到MySQL以及使用SSL协议通讯

    最近公司事情稍微少了点,研究下怎么优化几个系统的交互,因为我们目前使用的是长链接的同步接口,就考虑用下MQ来处理下。
    由于公司对安全有要求且和CA业务有关,则使用了SSL协议。此文使用的是Activemq的SSL协议+MYSQL作为持久化数据库,后续可能使用NIO+SSL协议。
    其他实现了JMS的MQ应该也大同小异。
    此文大多是参考官网文档
    持久化:http://activemq.apache.org/persistence.html

    相关协议:http://activemq.apache.org/configuring-transports.html

    先说说我对activemq的理解,由于是用java语言开发的,给我的感觉是依赖spring并且跑在jetty上的一个java程序。有了个大概的概念里面配置的东西就更好理解。
    配置mysql持久化,默认持久化是使用的kahadb写入文件里,这里修改成jdbc方式连接mysql,既然是jdbc就需要一个数据库驱动的jar包,
    所以我们需要在lib文件夹下放入mysql-connector-java-5.1.34-bin.jar(其他版本一样),因为使用的是官网推荐的dbcp数据源,相关其他的依赖包在lib下面已经有了。
    想要使用其他的数据源则需要引入相关依赖的jar包即可。
    以下是activemq.xml的mysql持久化配置。

    <persistenceAdapter>
        <!--<kahaDB directory="${activemq.data}/kahadb"/>-->
        <jdbcPersistenceAdapter dataSource="#mysql-ds" createTablesOnStartup="false"/> 
    </persistenceAdapter>
    <bean id="mysql-ds" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close"> 
        <property name="driverClassName" value="com.mysql.jdbc.Driver"/> 
        <property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/> 
        <property name="username" value="activemq"/> 
        <property name="password" value="activemq"/> 
        <property name="poolPreparedStatements" value="true"/> 
    </bean> 

    这个不就是用spring配置一个数据源的代码吗?把标签里面对应的值修改成自己的就行。
    注意,bean是和broker同级的。createTablesOnStartup第一次为true,后续修改为false,这个是自动创建表使用的,默认的是true。
    一切配置成功则在启动mq的时候数据库会生成
    activemq_acks
    activemq_lock
    activemq_msgs
    三个表

    在我测试的时候出现了一个错误:
    java.sql.SQLException: Cannot execute statement: impossible to write to binary log since BINLOG_FORMAT = STATEMENT and at least one table uses a storage engine limited to row-based logging. InnoDB is limited to row-logging when transaction isolation level is READ COMMITTED or READ UNCOMMITTED.

    就是数据库的写入日志格式,这个有点熟悉,在做mysql主从的时候也出现过。
    具体的我也解释不清楚,官网有给出解释https://dev.mysql.com/doc/refman/5.7/en/binary-log-setting.html
    在my.ini(在windows上的)记录日志格式BINLOG_FORMAT 设置为row或者mixed。
    至此,activemq持久化到mysql已完成配置。

    下面在说说使用SSL协议的配置。
    单向SSL:
    首先需要申请一个mq服务器的证书或者自签一个证书,私钥需要保存好。
    这个是官网的原文http://activemq.apache.org/how-do-i-use-ssl,因为我有点CA背景,说白了就是在上面添加一个标签,上面指定你的服务器证书以及密钥对密码,还有信任的证书库以及密码。
    看官网介绍好像还可以使用ocsp在线认证客户端证书。以下是我在activemq.xml配置的

    <sslContext>
        <sslContext keyStore="/activeMQ/keys/activemq.gdca.com.cn.jks"
                    keyStorePassword="123456789"
                    trustStore="/activeMQ/keys/trustca.jks"
                    trustStorePassword="123456"/>
    </sslContext>
    
    <transportConnectors>
        <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
        <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
        <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
        <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
        <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
        <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
        <transportConnector name="ssl" uri="ssl://0.0.0.0:61617?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
    </transportConnectors>

    keyStore:是你的服务器证书和密钥对合成的。
    keyStorePassword:是jks的密码,最好把密钥对和jks密码设置一致。
    trustStore:信任源(实际上是你的证书的中级或者顶级根证书)
    trustStorePassword:信任源的jks密码

    在我猜测应该还有个keyStoreType属性?
    配置无误之后启动mq是没有其他错误信息的。

    下面就是编码验证阶段了。

    这边使用springboot模拟一个生产者,一个消费者
    新建一个项目加入activemq依赖,生产者项目结构如图:

    先看看application.yml

    server:
      port: 8888
    
    
    #定义队列名称
    myqueue-name: queue-springboot
    
    my-topic: topic-springboot
    
    #mq的协议相关证书文件
    trustKey: D:WorkProjectLearningactivemq_springbootsrcmain
    esources	rustca.jks
    trustPwd: 123456
    #双向认证需要用到的客户端证书
    clientKey: D:WorkProjectLearningactivemq_springbootsrcmain
    esourceszengcx@gdca.com.cn.key
    clientKeyPwd: 12345678
    #这个好像不起作用了?需要在jmsTamplate上设置? spring: jms: pub-sub-domain: true

    看看config类:

    package com.demo.activemq.config;
    
    import org.apache.activemq.ActiveMQSslConnectionFactory;
    import org.apache.activemq.command.ActiveMQQueue;
    import org.apache.activemq.command.ActiveMQTopic;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.jms.annotation.EnableJms;
    import org.springframework.stereotype.Component;
    
    import javax.jms.ConnectionFactory;
    import javax.jms.Queue;
    import javax.jms.Topic;
    
    @Component
    @EnableJms
    public class ConfigBean {
    
    
        @Value("${myqueue-name}")
        private String queueName;
    
        @Value("${my-topic}")
        private String topicName;
    
        //信任源的文件
        @Value("${trustKey}")
        private String trustKey;
        //信任源的文件密码
        @Value("${trustPwd}")
        private String trustPwd;
    
        //客户端密钥文件
        @Value("${clientKey}")
        private String clientKey;
        //客户端密钥文件密码
        @Value("${clientKeyPwd}")
        private String clientKeyPwd;
    
        @Bean
        public Queue queue() {
            return new ActiveMQQueue(queueName);
        }
    
        @Bean
        public Topic topic() {
            return new ActiveMQTopic(topicName);
        }
    
        @Bean("clientSSLConnFactory")
        public ConnectionFactory clientSSLConnFactory() throws Exception {
            ActiveMQSslConnectionFactory sslConnectionFactory = new ActiveMQSslConnectionFactory();
            //双向认证需要的key文件和密码
    //        sslConnectionFactory.setKeyStore(clientKey);
    //        sslConnectionFactory.setKeyStoreKeyPassword(clientKeyPwd);
            //单向需要在本地设置一个信任源,检测服务端的证书
            sslConnectionFactory.setTrustStore(trustKey);
            sslConnectionFactory.setTrustStorePassword(trustPwd);
            sslConnectionFactory.setBrokerURL("ssl://activemq.gdca.com.cn:61617");
            sslConnectionFactory.setUserName("admin");
            sslConnectionFactory.setPassword("admin");
            sslConnectionFactory.setTrustStoreType("JKS");
    
            return sslConnectionFactory;
        }
    }

    因为重新创建了connectionFactory,在yaml上面的配置已经不起作用。对springboot不熟悉,这个是经过测试修改成这个样子的。有知道其他方法的麻烦告诉下。

    下面是生产消息:

    package com.demo.activemq.producer;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.jms.core.JmsMessagingTemplate;
    import org.springframework.stereotype.Component;
    
    import javax.jms.ConnectionFactory;
    import javax.jms.Queue;
    import java.util.UUID;
    
    @Component
    public class Producer {
    
        @Autowired
        JmsMessagingTemplate jmsMessagingTemplate;
    
        @Autowired
        ConnectionFactory clientSSLConnFactory;
    
        @Autowired
        private Queue queue;
    
        public void productMsg() {
            jmsMessagingTemplate.setConnectionFactory(clientSSLConnFactory);
            jmsMessagingTemplate.convertAndSend(queue, UUID.randomUUID().toString().substring(0, 5));
            System.out.println("send off");
        }
    
    }

    到这里生产者结束,使用单元测试跑下。

    MYSQL持久化 :

    再来看看消费者

    工程目录:

    yaml:

    server:
      port: 8887
    
    #定义属性直接在代码使用Value注解获取
    myqueue-name: queue-springboot
    my-topic: topic-springboot
    trustStore: D:WorkProjectLearningactivemq-queue-consumersrcmain
    esources	rustca.jks
    trustPwd: 123456

    config类:

    package com.demo.activemq.config;
    
    import org.apache.activemq.ActiveMQXASslConnectionFactory;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.stereotype.Component;
    
    import javax.jms.ConnectionFactory;
    
    @Component
    public class MainConfig {
    
    
        @Value("${trustStore}")
        private String trustStore;
    
        @Value("${trustPwd}")
        private String trustPwd;
    
    
        @Bean
        public ConnectionFactory clientSSLConnFactory() throws Exception {
            //创建ssl链接工厂,应该springboot在使用jms监听的时候会自动创建链接
            ActiveMQXASslConnectionFactory factory = new ActiveMQXASslConnectionFactory();
            factory.setBrokerURL("ssl://activemq.gdca.com.cn:61617");
            factory.setUserName("admin");
            factory.setPassword("admin");
            factory.setTrustStore(trustStore);
            factory.setTrustStorePassword(trustPwd);
            factory.setTrustStoreType("JKS");
            return factory;
        }
    }

    消费消息监听类:

    package com.demo.activemq.consumer;
    
    import org.springframework.jms.annotation.JmsListener;
    import org.springframework.stereotype.Component;
    
    import javax.jms.JMSConnectionFactory;
    import javax.jms.JMSException;
    import javax.jms.TextMessage;
    
    @Component
    public class Consumer {
    
        @JMSConnectionFactory(value = "clientSSLConnFactory")
        @JmsListener(destination = "${myqueue-name}")
        public void receiveMsg(TextMessage textMessage) throws JMSException {
            System.out.println("get msg:" + textMessage.getText());
        }
    
    }

     消息已出队: 

    以上则是Activemq配置mysql持久化并使用ssl协议生产和消费消息所有步骤,由于对springboot不是特别熟悉,可能存在代码不够好的情况。

    如有较好的解决方式,麻烦大家告诉一下。

  • 相关阅读:
    一、区块链,这次不容错过
    二、常用固件升级
    2.监控利器nagios手把手企业级实战第一部
    四、NOSQL之Redis持久化缓存服务基础实战第三部
    三、NOSQL之Memcached缓存服务实战精讲第二部
    linux重装docker-compose后无法执行docker-compose命令
    mongodb启用auth,使用密码登录
    Vue的三个点es6知识,扩展运算符
    关于同一台服务器上两个PHP项目相互访问超时的问题
    ffmpeg生成视频封面图
  • 原文地址:https://www.cnblogs.com/davenzeng/p/11647635.html
Copyright © 2011-2022 走看看