RabbitMq 基于 cloud 配置使用结构图
RabbitMq 基于 cloud 配置使用流程
1.在application.yml中配置RabbitMq
输出/输入>>>连接配置
spring:
rabbitmq:
addresses: 127.0.0.1
username: rabbitMq
password: admin
输出>推送使用配置
cloud:
stream:
bindings:
userOutput:
destination: UserSubscription
group: user-service
binders: #配置绑定器
defaultRabbit:
type: rabbit
输入>监听使用配置
cloud:
stream:
bindings:
input: #内置的获取消息的通道 , 从topcheer-default中获取消息
destination: topcheer-default
output: #指定消息发送的目的地,在rabbitMq中,发送到一个topcheer-default的exchange中
destination: topcheer-default
userInput:
destination: UserSubscription
group: user-service
# producer:
# partition-key-expression: payload #分区关键字 对象中的id,对象
# partition-count: 2 #分区大小
binders: #配置绑定器
defaultRabbit:
type: rabbit
2.Mq推送、监听自定义通道
输出>推送通道
import org.springframework.cloud.stream.annotation.Output; import org.springframework.messaging.MessageChannel; /** * @Author: 马家立 * @Date: 2020/10/13 11:56 * @Description: TODO Mq 输出>推送自定义消息通道 */ public interface MyProcessor { // 用户推送消息通道 String USEROUTPUT="userOutput"; @Output(USEROUTPUT) MessageChannel userOutput(); }
输入>监听通道
import org.springframework.cloud.stream.annotation.Input; import org.springframework.messaging.MessageChannel; /** * @Author: 马家立 * @Date: 2020/10/13 11:56 * @Description: TODO Mq 输入>监听自定义消息通道 */ public interface MyProcessor { // 用户监听消息通道 String USERINPUT="userInput"; @Input(USERINPUT) MessageChannel userInput(); }
3.Mq推送、监听使用
输出>推送使用
import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.integration.support.MessageBuilder; import org.springframework.messaging.MessageChannel; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.util.Map; /** * @Author: 马家立 * @Date: 2020/10/16 11:38 * @Description: TODO MQ推送 */ @Component @EnableBinding(MyProcessor.class) public class MessageSender { /** * 用户推送消息通道 */ @Resource @Qualifier(value=MyProcessor.USEROUTPUT) private MessageChannel userOutput; /** * @Author: 马家立 * @Date: 2020/10/16 11:38 * @Description: TODO MQ推送 */ public void send(Map<String,Object> map) { map.put("id","123"); map.put("param","张三"); userOutput.send(MessageBuilder.withPayload(map).build()); } }
输入>监听使用
import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.stereotype.Component; import java.util.Map; /** * @Author: 马家立 * @Date: 2020/10/16 11:43 * @Description: TODO 消息监听器 */ @Component @EnableBinding(MyProcessor.class) public class MessageListener { /** * @Author: 马家立 * @Date: 2020/10/16 11:43 * @Description: TODO MQ监听消费 */ @StreamListener(MyProcessor.USERINPUT) public void userInput(Map<String,Object> map){ String id = (String) map.get("id"); String param = (String) map.get("param"); /** * @Description: TODO 以下做消费处理 * ............................. */ } }