zoukankan      html  css  js  c++  java
  • Spring Cloud Stream

    Spring Cloud Stream

    基本概念

    Source:来源(近义词:Producer、Publisher)

    Sink:接收器(近义词:Consumer、Subscriber)

    Processor:对于上流而言是Sink,对于下游而言是Source

    Binder:kafka

    消息大致分为两个部分:

    • 消息头(Headers)

    • 消息体(Body/Payload)

    启动Zookeeper

    启动kafka

    Producer实现

    1. 定义发送通道

       public interface Source {
       
           /**
            * 需求通道
            */
           String OUT_PUT_DEMAND = "out_put_demand";
       
           /**
            * 任务通道
            */
           String OUT_PUT_TASK = "out_put_task";
       
           /**
            * 工作日志通道
            */
           String OUT_PUT_WORK_LOG = "out_put_workLog";
       
           /**
            * 组织结构信息通道
            */
           String OUT_PUT_ORG = "out_put_org";
       
           /**
            * 代码质量通道
            */
           String OUT_PUT_QUALITY = "out_put_quality";
       
       
           @Output(Source.OUT_PUT_DEMAND)
           MessageChannel demand();
       
           @Output(Source.OUT_PUT_TASK)
           MessageChannel task();
       
           @Output(Source.OUT_PUT_WORK_LOG)
           MessageChannel workLog();
       
           @Output(Source.OUT_PUT_ORG)
           MessageChannel org();
       
           @Output(Source.OUT_PUT_QUALITY)
           MessageChannel quality();
       }

       

    2. 生产类

       public class Producer {
       
           /**
            * 默认发送消息
            *
            * @param message
            * @param channel
            * @return
            */
           public static Boolean send(Object message, MessageChannel channel) {
               return send(message, channel, 5000L);
          }
       
           /**
            * 带超时时间
            *
            * @param message
            * @param timeout
            * @param channel
            * @return
            */
           public static Boolean send(Object message, MessageChannel channel, Long timeout) {
               return channel.send(MessageBuilder.withPayload(message).build(), timeout);
          }
       
       }

       

    3. Binding

       @EnableBinding(Source.class)
       public class SourceAutoConfiguration {
       }

       

    4. 策略模式-消息类型

       public enum SendType {
       
           DEMAND_MESSAGE(new DemandMessage()),
           TASK_MESSAGE(new TaskMessage()),
           WORK_LOG_MESSAGE(new WorkLogMessage()),
           CODE_QUALITY_MESSAGE(new CodeQualityMessage());
       
           private MessageSend messageSend;
           SendType(MessageSend messageSend){
               this.messageSend = messageSend;
          }
       
           public MessageSend get(){
               return  this.messageSend;
          }
       
       }

       

    5. 消息发送接口

       public interface MessageSend {
       
           public  Boolean send(Object message);
       
       }

       

    6. 接口实现

       public class DemandMessage implements MessageSend {
       
           private static final Source SOURCE = SpringContextHelper.getBean(Source.class);
       
           @Override
           public Boolean send(Object message) {
               message = MaskMessage.messageHelper(message);
               if (null == message) {
                   return false;
              }
               return Producer.send(message, SOURCE.demand());
          }
       }

       

    7. 生产消息

       public class ProduceHelper {
       
           /**
            * 需求消息生产
            * @param sendType 发送类型
            * @param message 消息内容
            * @return boolean
            */
           public static Boolean produce(SendType sendType, Demand message) {
               return sendType.get().send(message);
          }
       
           /**
            * 任务消息生产
            * @param sendType 发送类型
            * @param message 消息内容
            * @return boolean
            */
           public static Boolean produce(SendType sendType, Task message) {
               return sendType.get().send(message);
          }
       
           /**
            * 工作日志消息生产
            * @param sendType 发送类型
            * @param message 消息内容
            * @return boolean
            */
           public static Boolean produce(SendType sendType, WorkLog message) {
               return sendType.get().send(message);
          }
       
           /**
            * 代码质量消息生产
            * @param sendType 发送类型
            * @param message 消息内容
            * @return boolean
            */
           public static Boolean produce(SendType sendType, CodeQuality message) {
               return sendType.get().send(message);
          }
       
       }

       

    Comsumer实现

    1. 定义接收通道

       public interface Sink {
       
           /**
            * 需求通道
            */
           String IN_PUT_DEMAND = "in_put_demand";
       
           /**
            * 任务通道
            */
           String IN_PUT_TASK = "in_put_task";
       
           /**
            * 工作日志通道
            */
           String IN_PUT_WORK_LOG = "in_put_workLog";
       
           /**
            * 组织结构信息通道
            */
           String IN_PUT_ORG = "in_put_org";
       
       
           /**
            * 代码质量通道
            */
           String IN_PUT_QUALITY = "in_put_quality";
       
       
           @Input(Sink.IN_PUT_DEMAND)
           SubscribableChannel demand();
       
           @Input(Sink.IN_PUT_TASK)
           SubscribableChannel task();
       
           @Input(Sink.IN_PUT_WORK_LOG)
           SubscribableChannel workLog();
       
           @Input(Sink.IN_PUT_ORG)
           SubscribableChannel org();
       
           @Input(Sink.IN_PUT_QUALITY)
           SubscribableChannel quality();
       }

       

    2. 消费类

       public interface Consumer<T> {
           void onMessage(T message);
       }

       

    3. 消息监听

      • @StreamListener方式

         @Slf4j
         @Component
         public class MessageListener {
         
             @Autowired
             private MessageHandler messageHandler;
         
         
             /**
              * 监听需求消息
              *
              * @param message
              */
             @StreamListener(Sink.IN_PUT_DEMAND)
             public void task(Message message) {
         
                 LOGGER.info("监听到任务信息:{}", message.getPayload());
                 //调用demand入库
                 messageHandler.demandSave(message);
         
            }
         
             /**
              * 监听任务消息
              *
              * @param message
              */
             @StreamListener(Sink.IN_PUT_TASK)
             public void bug(Message message) {
         
                 LOGGER.info("监听到缺陷信息:{}", message.getPayload());
                 //任务消息入库
                 messageHandler.taskSave(message);
         
            }
         
             /**
              * 监听工作日志消息
              *
              * @param message
              */
             @StreamListener(Sink.IN_PUT_WORK_LOG)
             public void workLog(Message message) {
         
                 LOGGER.info("监听到工作日志信息:{}", message.getPayload());
                 //工作日志消息入库
                 messageHandler.worklogSave(message);
            }
         
         
             /**
              * 监听组织消息
              *
              * @param message
              */
             @StreamListener(Sink.IN_PUT_ORG)
             public void org(Message message) {
         
                 LOGGER.info("监听到组织信息:{}", message.getPayload());
                 //组织消息入库
                 messageHandler.orgSave(message);
            }
         
             /**
              * 监听质量消息
              *
              * @param message
              */
             @StreamListener(Sink.IN_PUT_QUALITY)
             public void quality(Message message) {
                 LOGGER.info("接收到质量信息:{}", message.getPayload());
                 //质量消息入库
                 messageHandler.codeQualitySave(message);
            }
         }

         

      • @ServiceActivator

         @ServiceActivator(Sink.IN_PUT_DEMAND)
         public void onMessage(String message){
             System.out.printIn("@ServiceActivator:"+message);
         }

         

      • @PostConstruct

         @PostConstruct
         public void init(){
             //实现异步回调
             subscribableChannel.subscribe(new MessageHandler){
                 @Override
                 public void handleMessage(Message<?> message) throws MessagingException{
                     System.out.printIn("@PostConstruct:"+message);
                }
            }
         }

         

    4. 消息处理

       @Slf4j
       @Component
       public class MessageHandler {
       
           @Autowired
           private CodeQualityRepository codeQualityRepository;
       
           @Autowired
           private DemandRepository demandRepository;
       
           @Autowired
           private TaskRepository taskRepository;
       
           @Autowired
           private WorkLogRepository workLogRepository;
       
           @Autowired
           private CompanyRepository companyRepository;
       
           @Autowired
           private OrgInfoRepository orgInfoRepository;
       
       
       
       
           /**
            * 需求消息入库
            */
           public void demandSave(Message message) {
               Demand demand = JSONObject.parseObject(message.getPayload().toString(), Demand.class);
               LOGGER.info("demand {}",demand);
               MongoNameGet.setCompanyId(demand.getCompanyId());
               if (null != demand.getId() && null != demand.getCompanyId()) {
                   demand.setGrabDate(new Date());
                   demandRepository.save(demand);
                   saveCompany(demand.getCompanyId(),"");
                   LOGGER.info("线程名:{}",Thread.currentThread().getName());
                   LOGGER.info("数据存储完毕");
              }
          }
       
           /**
            * 任务消息入库
            */
           public void taskSave(Message message) {
               Task task = JSONObject.parseObject(message.getPayload().toString(), Task.class);
               MongoNameGet.setCompanyId(task.getCompanyId());
               if (null != task.getId() && null != task.getCompanyId() && !StringUtils.isEmpty(task.getDemandId())) {
       
                   task.setGrabDate(new Date());
       
                   //查询部门id和组id 补充数据
                   Optional<Demand> demand =  demandRepository.findById(task.getDemandId());
                   if(demand.isPresent()){
                       task.setDepartId(demand.get().getDepartId());
                       task.setTeamId(demand.get().getTeamId());
                  }
       
                   taskRepository.save(task);
                   saveCompany(task.getCompanyId(),"");
                   LOGGER.info("数据存储完毕");
              }
          }
       
       
           /**
            * 工作日志消息入库
            */
           public void worklogSave(Message message) {
               WorkLog workLog = JSONObject.parseObject(message.getPayload().toString(), WorkLog.class);
               MongoNameGet.setCompanyId(workLog.getCompanyId());
               if (null != workLog.getId() && null != workLog.getCompanyId() && !StringUtils.isEmpty(workLog.getDemandId())) {
                   workLog.setGrabDate(new Date());
       
                   //查询部门id和组id 补充数据
                   Optional<Demand> demand =  demandRepository.findById(workLog.getDemandId());
                   if(demand.isPresent()){
                       workLog.setDepartId(demand.get().getDepartId());
                       workLog.setTeamId(demand.get().getTeamId());
                  }
       
                   workLogRepository.save(workLog);
                   saveCompany(workLog.getCompanyId(),"");
                   LOGGER.info("数据存储完毕");
       
              }
          }
       
       
           /**
            * 质量消息入库
            */
           public void codeQualitySave(Message message) {
               CodeQuality codeQuality = JSONObject.parseObject(message.getPayload().toString(), CodeQuality.class);
               MongoNameGet.setCompanyId(codeQuality.getCompanyId());
               if (null != codeQuality.getId() && null != codeQuality.getCompanyId()) {
       
                   codeQuality.setGrabDate(new Date());
                   codeQualityRepository.save(codeQuality);
                   saveCompany(codeQuality.getCompanyId(),"");
                   LOGGER.info("数据存储完毕");
              }
          }
       
           /**
            * 部门信息存储
            * @param message
            */
           public void orgSave(Message message) {
               OrgInfo orgInfo = JSONObject.parseObject(message.getPayload().toString(), OrgInfo.class);
               if (orgInfo.getId()!=0) {
       
                   orgInfoRepository.save(orgInfo);
                   LOGGER.info("数据存储完毕");
              }
          }
       
           /**
            * 保存企业id
            * @param companyId
            * @param companyName
            */
           private void saveCompany(String companyId,String companyName){
               if(!StringUtils.isEmpty(companyId)){
                   Company c = Company.builder().id(companyId).companyName(companyName).build();
                   companyRepository.save(c);
              }
       
          }
       }
    5.  

  • 相关阅读:
    Windows7记事本的五大秘密
    深刻认识clientX,offsetX,screenX
    win7几个小技巧
    怎样开启Win7快速启动栏
    offsetLeft,Left,clientLeft的区别
    更改Windows7下图标查看方式
    让IE6、IE7、IE8支持CSS3的圆角、阴影样式
    windows7桌面右下角显示不止一个时间以及显示日期
    Win7开关机关闭Update方法
    jquery outerHeight方法 outerWidth方法
  • 原文地址:https://www.cnblogs.com/kinglead/p/10979914.html
Copyright © 2011-2022 走看看