1、实现基础组件实现关键点
基础组件封装设计-迅速消息发送
基础组件封装设计-确认消息发送
基础组件封装设计-延迟消息发送
2、基础组件需要实现的功能
迅速、延迟、可靠
消息异步化序列化
链接池化、高性能
完备的补偿机制
3、创建工程

rabbit-common : 公共模块
rabbit-api: 提供给第三方使用
rabbit-core-producer: 用于发送消息(核心)
rabbit-task: 用于做可靠性处理
首先创建rabbit-parent工程
pom.xml 如下
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<modules>
<module>rabbit-common</module>
<module>rabbit-api</module>
<module>rabbit-core-producer</module>
<module>rabbit-task</module>
</modules>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.16.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>rabbit-parent</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>pom</packaging>
<name>rabbit-parent</name>
<description>Demo project for Spring Boot</description>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<java.version>8</java.version>
<fasterxml.uuid.version>3.1.4</fasterxml.uuid.version>
<org.codehaus.jackson.version>1.9.13</org.codehaus.jackson.version>
<druid.version>1.0.24</druid.version>
<!-- 当当网:分布式定时任务-->
<elastic-job.version>2.1.4</elastic-job.version>
<guava.version>20.0</guava.version>
<commons-langs3.version>3.3.1</commons-langs3.version>
<commons-io.version>2.4</commons-io.version>
<commons-collections.version>3.2.2</commons-collections.version>
<curator.version>2.11.0</curator.version>
<fastjson.version>1.1.26</fastjson.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>${commons-io.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>${fastjson.version}</version>
</dependency>
<!-- 对json格式的支持-->
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
<version>${org.codehaus.jackson.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.uuid</groupId>
<artifactId>java-uuid-generator</artifactId>
<version>${fasterxml.uuid.version}</version>
</dependency>
</dependencies>
</project>
二、rabbit-api: 提供给第三方使用
1、创建Message类
@Data
public class Message implements Serializable {
private static final long serialVersionUID = -6142443586615984421L;
/**
* 消息的唯一ID
*/
private String messageId;
/**
* 消息的主题
*/
private String topic;
/**
* 消息的路由规则
*/
private String routingKey = "";
/**
* 消息的附加属性
*/
private Map<String,Object> attributes = new HashMap<>();
/**
* 延迟消息的参数配置
*/
private int delayMills;
/**
* 消息类型.默认为Confirm消息
*/
private String messageType = MessageType.CONFIRM;
public Message(){
}
public Message(String messageId, String topic, String routingKey, Map<String, Object> attributes, int delayMills, String messageType) {
this.messageId = messageId;
this.topic = topic;
this.routingKey = routingKey;
this.attributes = attributes;
this.delayMills = delayMills;
this.messageType = messageType;
}
}
2、创建消息类型类
public final class MessageType {
/**
* 迅速消息: 不需要保障消息的可靠性,也不需要做confirm确认
*/
public final static String RAPID = "0";
/**
* 确认消息: 不需要保障消息的可靠性,但是会做消息confirm确认
*/
public final static String CONFIRM = "1";
/**
* 可靠性消息: 一定消息保障消息100%投递,不允许有任何的消息丢失
* PS: 保障数据库和所发的消息的原子性操作(最终一致的)
*/
public final static String RELIANT = "2";
}
3、构建消息类,采用构造者模式
public class MessageBuilder {
/**
* 消息的唯一ID
*/
private String messageId;
/**
* 消息的主题
*/
private String topic;
/**
* 消息的路由规则
*/
private String routingKey = "";
/**
* 消息的附加属性
*/
private Map<String,Object> attributes = new HashMap<>();
/**
* 延迟消息的参数配置
*/
private int delayMills;
/**
* 消息类型.默认为Confirm消息
*/
private String messageType = MessageType.CONFIRM;
private MessageBuilder(){
}
public static MessageBuilder create(){
return new MessageBuilder();
}
public MessageBuilder withMessageId(String messageId){
this.messageId = messageId;
return this;
}
public MessageBuilder withTopic(String topic){
this.topic = topic;
return this;
}
public MessageBuilder withRoutingKey(String routingKey){
this.routingKey = routingKey;
return this;
}
public MessageBuilder withAttributes(Map<String,Object> attributes){
this.attributes = attributes;
return this;
}
public MessageBuilder withAttribute(String key, Object object){
this.attributes.put(key,object);
return this;
}
public MessageBuilder withMessageId(int delayMills){
this.delayMills = delayMills;
return this;
}
public MessageBuilder withMessageType(String messageType){
this.messageType = messageType;
return this;
}
public Message build(){
if(messageId == null){
messageId = UUID.randomUUID().toString();
}
if(topic == null){
throw new MessageRunTimeException("this topic is null");
}
Message message = new Message(messageId,topic,routingKey,attributes,delayMills,messageType);
return message;
}
}
4、创建异常类
MessageException类
public class MessageException extends Exception{
private static final long serialVersionUID = -8283764568495174322L;
public MessageException(){
super();
}
public MessageException(String message){
super(message);
}
public MessageException(String message, Throwable cause){
super(message, cause);
}
public MessageException( Throwable cause){
super( cause);
}
}
创建MessageRunTimeException 类
public class MessageRunTimeException extends RuntimeException{
private static final long serialVersionUID = -2591307228826723236L;
public MessageRunTimeException(){
super();
}
public MessageRunTimeException(String message){
super(message);
}
public MessageRunTimeException(String message, Throwable cause){
super(message, cause);
}
public MessageRunTimeException(Throwable cause){
super( cause);
}
}
5、创建消息生成者接口
public interface MessageProducer {
void send(Message message) throws MessageRunTimeException;
/**
* 消息的发送,附带SendCallback回调执行响应的业务逻辑处理
* @param message
* @throws MessageRunTimeException
*/
void send(Message message, SendCallback sendCallback) throws MessageRunTimeException;
void send(List<Message> messages) throws MessageRunTimeException;
}
发送回调接口。 回调函数处理
public interface SendCallback {
void onSuccess();
void onFailure();
}
6、创建消费者监听消息类
public interface MessageListener {
void onMessage(Message message);
}
三、rabbit-common : 公共模块
1、添加依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>rabbit-parent</artifactId>
<groupId>com.example</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>rabbit-common</artifactId>
<dependencies>
<dependency>
<groupId>com.example</groupId>
<artifactId>rabbit-api</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>1.1.1</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.1.10</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
</dependencies>
</project>
2、创建序列化和反序列化接口
public interface Serializer {
byte[] serializeRaw(Object data);
String serialize(Object data);
byte[] deserializeRaw(Object data);
<T> T deserialize(String content);
<T> T deserialize(byte[] content);
}
3、创建接口SerializerFactory
public interface SerializerFactory {
Serializer create();
}
4、序列化反序列化实现
public class JacksonSerializer implements Serializer {
private static final Logger LOGGER = LoggerFactory.getLogger(JacksonSerializer.class);
private static final ObjectMapper mapper = new ObjectMapper();
static {
mapper.disable(SerializationFeature.INDENT_OUTPUT);
mapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
mapper.configure(JsonParser.Feature.ALLOW_BACKSLASH_ESCAPING_ANY_CHARACTER, true);
mapper.configure(JsonParser.Feature.ALLOW_COMMENTS, true);
mapper.configure(JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS, true);
mapper.configure(JsonParser.Feature.ALLOW_NUMERIC_LEADING_ZEROS, true);
mapper.configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, true);
mapper.configure(JsonParser.Feature.ALLOW_UNQUOTED_CONTROL_CHARS, true);
mapper.configure(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES, true);
}
private final JavaType type;
private JacksonSerializer(JavaType type) {
this.type = type;
}
public JacksonSerializer(Type type) {
this.type = mapper.getTypeFactory().constructType(type);
}
public static JacksonSerializer createParametricType(Class<?> cls) {
return new JacksonSerializer(mapper.getTypeFactory().constructType(cls));
}
@Override
public byte[] serializeRaw(Object data) {
try {
return mapper.writeValueAsBytes(data);
} catch (JsonProcessingException e) {
LOGGER.error("序列化出错", e);
}
return null;
}
@Override
public String serialize(Object data) {
try {
return mapper.writeValueAsString(data);
} catch (JsonProcessingException e) {
LOGGER.error("序列化出错", e);
}
return null;
}
@Override
public byte[] deserializeRaw(Object data) {
return new byte[0];
}
@Override
public <T> T deserialize(String content) {
try {
return mapper.readValue(content, type);
} catch (IOException e) {
LOGGER.error("反序列化出错", e);
}
return null;
}
@Override
public <T> T deserialize(byte[] content) {
try {
return mapper.readValue(content, type);
} catch (IOException e) {
LOGGER.error("反序列化出错", e);
}
return null;
}
}
5、序列化工厂实现
public class JacksonSerializerFactory implements SerializerFactory{
public static final SerializerFactory INSTANCE = new JacksonSerializerFactory();
@Override
public Serializer create() {
return JacksonSerializer.createParametricType(Message.class);
}
}
6、普通消息Convert
public class GenericMessageConverter implements MessageConverter {
private Serializer serializer;
public GenericMessageConverter(Serializer serializer) {
Preconditions.checkNotNull(serializer);
this.serializer = serializer;
}
/**
* 使用序列化: org.springframework.amqp.core.Message 转为Object
*/
@Override
public Object fromMessage(org.springframework.amqp.core.Message message) throws MessageConversionException {
return this.serializer.deserialize(message.getBody());
}
/**
* 使用反序列化 将自己的object转换为org.springframework.amqp.core.Message
*/
@Override
public org.springframework.amqp.core.Message toMessage(Object object, MessageProperties messageProperties)
throws MessageConversionException {
return new org.springframework.amqp.core.Message(this.serializer.serializeRaw(object), messageProperties);
}
}
7、扩展消息Convert,
引用了GenericMessageConverter,可以理解为装饰者模式。比如设置过期时间。
public class RabbitMessageConverter implements MessageConverter {
private GenericMessageConverter delegate;
// private final String delaultExprie = String.valueOf(24 * 60 * 60 * 1000);
public RabbitMessageConverter(GenericMessageConverter genericMessageConverter) {
Preconditions.checkNotNull(genericMessageConverter);
this.delegate = genericMessageConverter;
}
@Override
public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
// messageProperties.setExpiration(delaultExprie);
com.example.api.Message message = (com.example.api.Message)object;
messageProperties.setDelay(message.getDelayMills());
return this.delegate.toMessage(object, messageProperties);
}
@Override
public Object fromMessage(Message message) throws MessageConversionException {
com.example.api.Message msg = (com.example.api.Message) this.delegate.fromMessage(message);
return msg;
}
}
四、rabbit-core-producer
1、增加依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>rabbit-parent</artifactId>
<groupId>com.example</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>rabbit-core-producer</artifactId>
<dependencies>
<dependency>
<groupId>com.example</groupId>
<artifactId>rabbit-common</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
<version>7.0.0-beta1</version>
<scope>compile</scope>
</dependency>
</dependencies>
</project>
2、自动装配
创建META-INF文件夹下创建spring.facories文件

内容为:
# Auto Configure org.springframework.boot.autoconfigure.EnableAutoConfiguration= com.example.producer.autoconfigure.RabbitProducerAutoConfiguration
3、发送消息的实际实现类
@Component
public class ProducerClient implements MessageProducer {
@Autowired
RabbitBroker rabbitBroker;
@Override
public void send(Message message) throws MessageRunTimeException {
Preconditions.checkNotNull(message.getTopic());
String messageType = message.getMessageType();
switch (messageType){
case MessageType.RAPID:
rabbitBroker.rapidSend(message);
break;
case MessageType.CONFIRM:
rabbitBroker.confirmSend(message);
break;
case MessageType.RELIANT:
rabbitBroker.reliantSend(message);
break;
default:
break;
}
}
@Override
public void send(Message message, SendCallback sendCallback) throws MessageRunTimeException {
}
@Override
public void send(List<Message> messages) throws MessageRunTimeException {
}
}
4、创建接口 RabbitBroker
作用: 具体发送不同种类消息的接口
public interface RabbitBroker {
void rapidSend(Message message);
void confirmSend(Message message);
void reliantSend(Message message);
void sendMessages();
}
创建实现类RabbitBrokerImpl
@Slf4j
@Component
public class RabbitBrokerImpl implements RabbitBroker {
@Autowired
private RabbitTemplateContainer rabbitTemplateContainer;
@Autowired
private MessageStoreService messageStoreService;
@Override
public void rapidSend(Message message) {
message.setMessageType(MessageType.RAPID);
sendKernel(message);
}
/**
* 发送消息的核心方法, 使用异步线程池进行发送消息
* @param message
*/
private void sendKernel(Message message) {
AsyncBaseQueue.submit(() -> {
CorrelationData correlationData = new CorrelationData(
String.format("%s#%s", message.getMessageId(), System.currentTimeMillis()));
String topic = message.getTopic();
String routingKey = message.getRoutingKey();
RabbitTemplate rabbitTemplate = rabbitTemplateContainer.getTemplate(message);
rabbitTemplate.convertAndSend(topic,routingKey, message, correlationData);
log.info("#RabbitBrokerImpl.sendKernel# 发送消息到RabbitMQ,messageId={}", message.getMessageId());
});
}
@Override
public void confirmSend(Message message) {
message.setMessageType(MessageType.CONFIRM);
sendKernel(message);
}
@Override
public void reliantSend(Message message) {
message.setMessageType(MessageType.RELIANT);
BrokerMessage bm = messageStoreService.selectByMessageId(message.getMessageId());
if(bm == null) {
//1. 把数据库的消息发送日志先记录好
Date now = new Date();
BrokerMessage brokerMessage = new BrokerMessage();
brokerMessage.setMessageId(message.getMessageId());
brokerMessage.setStatus(BrokerMessageStatus.SENDING.getCode());
//tryCount 在最开始发送的时候不需要进行设置
brokerMessage.setNextRetry(DateUtils.addMinutes(now, BrokerMessageConst.TIMEOUT));
brokerMessage.setCreateTime(now);
brokerMessage.setUpdateTime(now);
brokerMessage.setMessage(message);
messageStoreService.insert(brokerMessage);
}
//2. 执行真正的发送消息逻辑
sendKernel(message);
}
@Override
public void sendMessages() {
}
}
创建异步消息队列(使用线程池)
@Slf4j
public class AsyncBaseQueue {
private static final int THREAD_SIZE = Runtime.getRuntime().availableProcessors();
private static final int QUEUE_SIZE = 10000;
private static ExecutorService senderAsync = new ThreadPoolExecutor(THREAD_SIZE, THREAD_SIZE,
60L, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(QUEUE_SIZE),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("rabbitmq_client_async_sender");
return t;
}
},
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
log.error("async sender is error rejected, runnable:{}, executor:{}", r, executor);
}
}
);
public static void submit(Runnable runnable){
senderAsync.submit(runnable);
}
}
池化RabbitTemplate
/**
* @description: 池化RabbitTemplate
* 每一个topic对应一个RabbitTemplate的好处
* 1、提高发送的效率
* 2、可以根据不同的需求定制不同的RabbitTemplate,比如每一个topic都有自己的routingKey规则
* @author:
* @create: 2020-08-01 17:22
*/
@Slf4j
@Component
public class RabbitTemplateContainer implements RabbitTemplate.ConfirmCallback {
private Map<String /* TOPIC */, RabbitTemplate> rabbitTemplateMap = Maps.newConcurrentMap();
private Splitter splitter = Splitter.on("#");
private SerializerFactory serializerFactory = JacksonSerializerFactory.INSTANCE;
@Autowired
private ConnectionFactory connectionFactory;
@Autowired
private MessageStoreService messageStoreService;
public RabbitTemplate getTemplate(Message message) throws MessageRunTimeException {
Preconditions.checkNotNull(message);
String topic = message.getTopic();
RabbitTemplate rabbitTemplate = rabbitTemplateMap.get(topic);
if(rabbitTemplate != null){
return rabbitTemplate;
}
log.info("topic={} is not exists, create", topic);
RabbitTemplate newTemplate = new RabbitTemplate(connectionFactory);
newTemplate.setExchange(topic);
newTemplate.setRoutingKey(message.getRoutingKey());
newTemplate.setRetryTemplate(new RetryTemplate());
// 添加序列化反序列化和converter对象
Serializer serializer = serializerFactory.create();
GenericMessageConverter gmc = new GenericMessageConverter(serializer);
RabbitMessageConverter rmc = new RabbitMessageConverter(gmc);
newTemplate.setMessageConverter(rmc);
String messageType = message.getMessageType();
if(!MessageType.RAPID.equals(messageType)){
newTemplate.setConfirmCallback(this);
}
rabbitTemplateMap.putIfAbsent(topic,newTemplate);
return rabbitTemplateMap.get(topic);
}
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
//消息应答
List<String> strings = splitter.splitToList(correlationData.getId());
String messageId = strings.get(0);
long sendTime = Long.parseLong(strings.get(1));
String messageType = strings.get(2);
if(ack){
log.info("发送消息成功,confirm messageId={}, sendTime={}" , messageId, sendTime);
}else {
log.info("发送消息失败,confirm messageId={}, sendTime={}" , messageId, sendTime);
}
if(ack) {
// 当Broker 返回ACK成功时, 就是更新一下日志表里对应的消息发送状态为 SEND_OK
// 如果当前消息类型为reliant 我们就去数据库查找并进行更新
if(MessageType.RELIANT.endsWith(messageType)) {
this.messageStoreService.succuess(messageId);
}
log.info("发送消息成功,confirm messageId={}, sendTime={}" , messageId, sendTime);
} else {
log.info("发送消息失败,confirm messageId={}, sendTime={}" , messageId, sendTime);
}
}
}
5、数据准备
创建数据库和表
createtable.sql
USE broker_message; CREATE TABLE IF NOT EXISTS borker_message ( message_id VARCHAR(128) NOT NULL, message VARCHAR(4000), try_count INT(4) DEFAULT 0, STATUS VARCHAR(10) DEFAULT '', next_retry TIMESTAMP , create_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, update_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY(message_id) ) ENGINE=INNODB DEFAULT CHARSET=utf8;
rabbit-core-producer工程下
创建BorkerMessage类
public class BrokerMessage implements Serializable {
private static final long serialVersionUID = 7447792462810110841L;
private String messageId;
private Message message;
private Integer tryCount = 0;
private String status;
private Date nextRetry;
private Date createTime;
private Date updateTime;
public String getMessageId() {
return messageId;
}
public void setMessageId(String messageId) {
this.messageId = messageId == null ? null : messageId.trim();
}
public Message getMessage() {
return message;
}
public void setMessage(Message message) {
this.message = message;
}
public Integer getTryCount() {
return tryCount;
}
public void setTryCount(Integer tryCount) {
this.tryCount = tryCount;
}
public String getStatus() {
return status;
}
public void setStatus(String status) {
this.status = status == null ? null : status.trim();
}
public Date getNextRetry() {
return nextRetry;
}
public void setNextRetry(Date nextRetry) {
this.nextRetry = nextRetry;
}
public Date getCreateTime() {
return createTime;
}
public void setCreateTime(Date createTime) {
this.createTime = createTime;
}
public Date getUpdateTime() {
return updateTime;
}
public void setUpdateTime(Date updateTime) {
this.updateTime = updateTime;
}
}
定义BrokerMessageMapper 接口
@Mapper
public interface BrokerMessageMapper {
int deleteByPrimaryKey(String messageId);
int insert(BrokerMessage record);
int insertSelective(BrokerMessage record);
BrokerMessage selectByPrimaryKey(String messageId);
int updateByPrimaryKeySelective(BrokerMessage record);
//int updateByPrimaryKeyWithBLOBs(BrokerMessage record);
int updateByPrimaryKey(BrokerMessage record);
void changeBrokerMessageStatus(@Param("brokerMessageId") String brokerMessageId, @Param("brokerMessageStatus") String brokerMessageStatus, @Param("updateTime") Date updateTime);
List<BrokerMessage> queryBrokerMessageStatus4Timeout(@Param("brokerMessageStatus") String brokerMessageStatus);
List<BrokerMessage> queryBrokerMessageStatus(@Param("brokerMessageStatus") String brokerMessageStatus);
int update4TryCount(@Param("brokerMessageId") String brokerMessageId, @Param("updateTime") Date updateTime);
}
增加BrokerMessageMapper.xml文件
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="com.example.producer.mapper.BrokerMessageMapper" >
<resultMap id="BaseResultMap" type="com.example.producer.entity.BrokerMessage" >
<id column="message_id" property="messageId" jdbcType="VARCHAR" />
<result column="message" property="message" jdbcType="VARCHAR" typeHandler="com.example.common.mybatis.handler.MessageJsonTypeHandler" />
<result column="try_count" property="tryCount" jdbcType="INTEGER" />
<result column="status" property="status" jdbcType="VARCHAR" />
<result column="next_retry" property="nextRetry" jdbcType="TIMESTAMP" />
<result column="create_time" property="createTime" jdbcType="TIMESTAMP" />
<result column="update_time" property="updateTime" jdbcType="TIMESTAMP" />
</resultMap>
<sql id="Base_Column_List" >
message_id, message, try_count, status, next_retry, create_time, update_time
</sql>
<select id="selectByPrimaryKey" resultMap="BaseResultMap" parameterType="java.lang.String" >
select
<include refid="Base_Column_List" />
from broker_message
where message_id = #{messageId,jdbcType=VARCHAR}
</select>
<delete id="deleteByPrimaryKey" parameterType="java.lang.String" >
delete from broker_message
where message_id = #{messageId,jdbcType=VARCHAR}
</delete>
<insert id="insert" parameterType="com.example.producer.entity.BrokerMessage" >
insert into broker_message (message_id, message, try_count,
status, next_retry, create_time,
update_time)
values (#{messageId,jdbcType=VARCHAR}, #{message,jdbcType=VARCHAR, typeHandler=com.example.common.mybatis.handler.MessageJsonTypeHandler}, #{tryCount,jdbcType=INTEGER},
#{status,jdbcType=VARCHAR}, #{nextRetry,jdbcType=TIMESTAMP}, #{createTime,jdbcType=TIMESTAMP},
#{updateTime,jdbcType=TIMESTAMP})
</insert>
<insert id="insertSelective" parameterType="com.example.producer.entity.BrokerMessage" >
insert into broker_message
<trim prefix="(" suffix=")" suffixOverrides="," >
<if test="messageId != null" >
message_id,
</if>
<if test="message != null" >
message,
</if>
<if test="tryCount != null" >
try_count,
</if>
<if test="status != null" >
status,
</if>
<if test="nextRetry != null" >
next_retry,
</if>
<if test="createTime != null" >
create_time,
</if>
<if test="updateTime != null" >
update_time,
</if>
</trim>
<trim prefix="values (" suffix=")" suffixOverrides="," >
<if test="messageId != null" >
#{messageId,jdbcType=VARCHAR},
</if>
<if test="message != null" >
#{message,jdbcType=VARCHAR, typeHandler=com.example.common.mybatis.handler.MessageJsonTypeHandler},
</if>
<if test="tryCount != null" >
#{tryCount,jdbcType=INTEGER},
</if>
<if test="status != null" >
#{status,jdbcType=VARCHAR},
</if>
<if test="nextRetry != null" >
#{nextRetry,jdbcType=TIMESTAMP},
</if>
<if test="createTime != null" >
#{createTime,jdbcType=TIMESTAMP},
</if>
<if test="updateTime != null" >
#{updateTime,jdbcType=TIMESTAMP},
</if>
</trim>
</insert>
<update id="updateByPrimaryKeySelective" parameterType="com.example.producer.entity.BrokerMessage" >
update broker_message
<set >
<if test="message != null" >
message = #{message,jdbcType=VARCHAR, typeHandler=com.example.common.mybatis.handler.MessageJsonTypeHandler},
</if>
<if test="tryCount != null" >
try_count = #{tryCount,jdbcType=INTEGER},
</if>
<if test="status != null" >
status = #{status,jdbcType=VARCHAR},
</if>
<if test="nextRetry != null" >
next_retry = #{nextRetry,jdbcType=TIMESTAMP},
</if>
<if test="createTime != null" >
create_time = #{createTime,jdbcType=TIMESTAMP},
</if>
<if test="updateTime != null" >
update_time = #{updateTime,jdbcType=TIMESTAMP},
</if>
</set>
where message_id = #{messageId,jdbcType=VARCHAR}
</update>
<update id="updateByPrimaryKey" parameterType="com.example.producer.entity.BrokerMessage" >
update broker_message
set message = #{message,jdbcType=VARCHAR, typeHandler=com.example.common.mybatis.handler.MessageJsonTypeHandler},
try_count = #{tryCount,jdbcType=INTEGER},
status = #{status,jdbcType=VARCHAR},
next_retry = #{nextRetry,jdbcType=TIMESTAMP},
create_time = #{createTime,jdbcType=TIMESTAMP},
update_time = #{updateTime,jdbcType=TIMESTAMP}
where message_id = #{messageId,jdbcType=VARCHAR}
</update>
<update id="changeBrokerMessageStatus" >
update broker_message bm
set bm.status = #{brokerMessageStatus,jdbcType=VARCHAR},
bm.update_time = #{updateTime, jdbcType=TIMESTAMP}
where bm.message_id = #{brokerMessageId,jdbcType=VARCHAR}
</update>
<select id="queryBrokerMessageStatus4Timeout" resultMap="BaseResultMap" >
<![CDATA[
select message_id, message, try_count, status, next_retry, create_time, update_time
from broker_message bm
where bm.status = #{brokerMessageStatus,jdbcType=VARCHAR}
and bm.next_retry < sysdate()
]]>
</select>
<select id="queryBrokerMessageStatus" resultMap="BaseResultMap" >
select message_id, message, try_count, status, next_retry, create_time, update_time
from broker_message bm
where bm.status = #{brokerMessageStatus,jdbcType=VARCHAR}
</select>
<update id="update4TryCount" >
update broker_message bm
set bm.try_count = bm.try_count + 1,
bm.update_time = #{updateTime,jdbcType=TIMESTAMP}
where bm.message_id = #{brokerMessageId,jdbcType=VARCHAR}
</update>
</mapper>
增加配置文件rabbit-producer-message.properties
使用了阿里巴巴的druid的数据源
rabbit.producer.druid.type=com.alibaba.druid.pool.DruidDataSource rabbit.producer.druid.jdbc.url=jdbc:mysql://118.xx.xx.101:3306/broker_message?characterEncoding=UTF-8&autoReconnect=true&zeroDateTimeBehavior=convertToNull&useUnicode=true&serverTimezone=GMT rabbit.producer.druid.jdbc.driver-class-name=com.mysql.jdbc.Driver rabbit.producer.druid.jdbc.username=root rabbit.producer.druid.jdbc.password=xxx rabbit.producer.druid.jdbc.initialSize=5 rabbit.producer.druid.jdbc.minIdle=1 rabbit.producer.druid.jdbc.maxActive=100 rabbit.producer.druid.jdbc.maxWait=60000 rabbit.producer.druid.jdbc.timeBetweenEvictionRunsMillis=60000 rabbit.producer.druid.jdbc.minEvictableIdleTimeMillis=300000 rabbit.producer.druid.jdbc.validationQuery=SELECT 1 FROM DUAL rabbit.producer.druid.jdbc.testWhileIdle=true rabbit.producer.druid.jdbc.testOnBorrow=false rabbit.producer.druid.jdbc.testOnReturn=false rabbit.producer.druid.jdbc.poolPreparedStatements=true rabbit.producer.druid.jdbc.maxPoolPreparedStatementPerConnectionSize= 20 rabbit.producer.druid.jdbc.filters=stat,wall,log4j rabbit.producer.druid.jdbc.useGlobalDataSourceStat=true
增加数据库配置
1) RabbitProducerDataSourceConfiguration 增加数据源,并读取配置文件
@Configuration
@PropertySource({"classpath:rabbit-producer-message.properties"})
public class RabbitProducerDataSourceConfiguration {
private static Logger LOGGER = org.slf4j.LoggerFactory.getLogger(RabbitProducerDataSourceConfiguration.class);
@Value("${rabbit.producer.druid.type}")
private Class<? extends DataSource> dataSourceType;
@Bean(name = "rabbitProducerDataSource")
@Primary
@ConfigurationProperties(prefix = "rabbit.producer.druid.jdbc")
public DataSource rabbitProducerDataSource() throws SQLException {
DataSource rabbitProducerDataSource = DataSourceBuilder.create().type(dataSourceType).build();
LOGGER.info("============= rabbitProducerDataSource : {} ================", rabbitProducerDataSource);
return rabbitProducerDataSource;
}
public DataSourceProperties primaryDataSourceProperties(){
return new DataSourceProperties();
}
public DataSource primaryDataSource(){
return primaryDataSourceProperties().initializeDataSourceBuilder().build();
}
}
2)执行SQL脚本createtable.sql,如果表不存在,则创建表
@Configuration
public class BrokerMessageConfiguration {
@Autowired
private DataSource rabbitProducerDataSource;
@Value("classpath:createtable.sql")
private Resource schemaScript;
@Bean
public DataSourceInitializer initDataSourceInitializer() {
System.err.println("--------------rabbitProducerDataSource-----------:" + rabbitProducerDataSource);
final DataSourceInitializer initializer = new DataSourceInitializer();
initializer.setDataSource(rabbitProducerDataSource);
initializer.setDatabasePopulator(databasePopulator());
return initializer;
}
private DatabasePopulator databasePopulator() {
final ResourceDatabasePopulator populator = new ResourceDatabasePopulator();
populator.addScript(schemaScript);
return populator;
}
}
3) 整合Mybatis
@Configuration
@AutoConfigureAfter(value = {RabbitProducerDataSourceConfiguration.class})
public class RabbitProducerMyBatisConfiguration {
@Resource(name= "rabbitProducerDataSource")
private DataSource rabbitProducerDataSource;
@Bean(name="rabbitProducerSqlSessionFactory")
public SqlSessionFactory rabbitProducerSqlSessionFactory(DataSource rabbitProducerDataSource) {
SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
bean.setDataSource(rabbitProducerDataSource);
ResourcePatternResolver resolver = new PathMatchingResourcePatternResolver();
try {
bean.setMapperLocations(resolver.getResources("classpath:com/example/producer/mapping/*.xml"));
SqlSessionFactory sqlSessionFactory = bean.getObject();
sqlSessionFactory.getConfiguration().setCacheEnabled(Boolean.TRUE);
return sqlSessionFactory;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@Bean(name="rabbitProducerSqlSessionTemplate")
public SqlSessionTemplate rabbitProducerSqlSessionTemplate(SqlSessionFactory sqlSessionFactory) {
return new SqlSessionTemplate(sqlSessionFactory);
}
}
4) 配置扫描
@Configuration
@AutoConfigureAfter(RabbitProducerDataSourceConfiguration.class)
public class RabbitProducerMybatisMapperScanerConfig {
@Bean(name="rabbitProducerMapperScannerConfigurer")
public MapperScannerConfigurer rabbitProducerMapperScannerConfigurer() {
MapperScannerConfigurer mapperScannerConfigurer = new MapperScannerConfigurer();
mapperScannerConfigurer.setSqlSessionFactoryBeanName("rabbitProducerSqlSessionFactory");
mapperScannerConfigurer.setBasePackage("com.example.producer.mapper");
return mapperScannerConfigurer;
}
}