zoukankan      html  css  js  c++  java
  • 阿里RocketMq(TCP模式)

    针对公司业务逻辑,向阿里云MQ发送指定数据,消费端根据数据来做具体的业务,分两个项目,一个生产端(Producer)、一个消费端(Consumer)

    生产端通过定时任务执行sql向阿里云MQ发送数据,消费端消费指定Topic上的数据

    1:定时任务列表:

    2:生产端表结构:

    aliasName:定时任务别名;

    cronExpression:定时任务轮询规则;

    jobGroup:定时任务分组;

    jobName:定时任务名称;

    jobTrigger:定时任务触发器;

    packageUrl:定时任务扫描具体封装类;

    excuteSql:扫描类中执行的获取数据的脚本;

    lastPramaryKey:最后一次获取数据时最大的主键;

    topic:阿里云MQ的topic;

    producerId:生产端的Id;

    accessKey、securityKey:账号跟秘钥

    dataBaseType:操作数据库类型(公司数据库类型比较多,执行脚本时,需要根据类型来指定具体的Service)

    3:Java端核心代码,定时任务扫描如下配置的任务类来向阿里云MQ发送数据

    public class SendPrimaryKeyListToMqTask implements Job{
        
        private final Logger logger = LoggerFactory.getLogger(SendPrimaryKeyListToMqTask.class);
        
        public void execute(JobExecutionContext context) throws JobExecutionException{
            JobDataMap data = context.getJobDetail().getJobDataMap();
            ScheduleJob scheduleJob = (ScheduleJob)data.get("jobParam");
        
            //最后一次获取数据时最大的主键
            int lastPramaryKey = scheduleJob.getLastPramaryKey();
            
            //执行sql
            String excuteSql = scheduleJob.getExcuteSql();
            excuteSql = excuteSql.replace("lastPramaryKey", String.valueOf(lastPramaryKey));
            
            //操作数据库类型(数据库配置)
            int dataBaseType = scheduleJob.getDataBaseType();
            
            //从游戏库获取数据
            LinkedList<ExcuteResultData> resultData = new LinkedList<ExcuteResultData>();
            if( dataBaseType == 1 ){
                GameService gameService = (GameService)SpringBeanFactory.getBean(GameService.class);
                resultData = gameService.getExcuteResultData(excuteSql);
            //从网站库获取数据
            }else if( dataBaseType == 2 ){
                 SiteService siteService = (SiteService)SpringBeanFactory.getBean(SiteService.class);
                 resultData = siteService.getExcuteResultData(excuteSql);
            }
        
            if ( resultData.size() > 0 ){
                scheduleJob.setPrimaryKeyList(resultData);
                QuartzService quartzService = (QuartzService)SpringBeanFactory.getBean(QuartzService.class);
                //将数据集中最大的主键更新
                scheduleJob.setLastPramaryKey(resultData.getLast().getPrimaryKey());
                quartzService.updateLastPramaryKey(scheduleJob);
        
                String topic = scheduleJob.getTopic();
                String producerId = scheduleJob.getProducerId();
                String ak = scheduleJob.getAccessKey();
                String sk = scheduleJob.getSecurityKey();
                
                //添加日志
                ScheduleJobLog scjLog = new ScheduleJobLog();
                scjLog.setDataSize(resultData.size());
                scjLog.setJobName(scheduleJob.getJobName());
                scjLog.setTopic(topic);
                int scjLogId = quartzService.addMqScheduleJobLog(scjLog);
                //消费端根据此日志主键更新日志状态
                scheduleJob.setScjLogId(scjLogId);
                
                Properties properties = new Properties();
                properties.put("ProducerId", producerId);
                properties.put("AccessKey", ak);
                properties.put("SecretKey", sk);
                Producer producer = ONSFactory.createProducer(properties);
                producer.start();
                
                Message msg = new Message(topic, "PRIMARY_KEY_" + String.valueOf(scjLogId), ObjectsTranscoder.serialize(scheduleJob));
                msg.setKey("PRIMARY_KEY_" + String.valueOf(scjLogId));
                SendResult sendResult = producer.send(msg);
                if ( ( sendResult != null ) && ( sendResult.getMessageId() != null ) ){
                    scjLog.setMessageId(sendResult.getMessageId());
                    scjLog.setStatus(2);
                    quartzService.updateMqScheduleJobLog(scjLog);
                }
                
                producer.shutdown();
        
                logger.debug("=====>任务名称:" + scheduleJob.getJobName());
                logger.debug("=====>发送条数:" + resultData.size());
                logger.debug("=====>发送主键内容:" + resultData.toString());
    
            }
        }
    }

    4:消费端表结构:

    5:消费端Java核心代码(通过监听器来做):

    import java.util.List;
    import java.util.Properties;
    import javax.servlet.ServletContextEvent;
    import javax.servlet.ServletContextListener;
    import org.springframework.web.context.WebApplicationContext;
    import org.springframework.web.context.support.WebApplicationContextUtils;
    import com.aliyun.openservices.ons.api.Action;
    import com.aliyun.openservices.ons.api.ConsumeContext;
    import com.aliyun.openservices.ons.api.Consumer;
    import com.aliyun.openservices.ons.api.Message;
    import com.aliyun.openservices.ons.api.MessageListener;
    import com.aliyun.openservices.ons.api.ONSFactory;
    import com.aliyun.openservices.ons.api.PropertyKeyConst;
    import com.odao.common.utils.ObjectsTranscoder;
    import com.odao.entity.ScheduleJob;
    import com.odao.entity.ScheduleJobLog;
    import com.odao.service.consumer.ConsumerService;
    import com.odao.service.message.MessageService;
    
    /**
     * 阿里云游戏、网站 主键数据集消费监听器
     */
    public class ConsumePrimaryKeyFromMqListener implements ServletContextListener {
        
        @Override
        public void contextInitialized(ServletContextEvent sce) {
            WebApplicationContext appctx = WebApplicationContextUtils.getWebApplicationContext(sce.getServletContext());
            MessageService messageService = (MessageService) appctx.getBean(MessageService.class);
            List<ScheduleJob> consumeList = messageService.getScheduleJobList();
            for(final ScheduleJob sjc : consumeList){
                
                String topic = sjc.getTopic();
                String consumerId= sjc.getConsumerId();
                String ak = sjc.getAccessKey();
                String sk = sjc.getSecurityKey();
                
                Properties properties = new Properties();
                properties.put(PropertyKeyConst.ConsumerId,consumerId);
                properties.put(PropertyKeyConst.AccessKey,ak);
                properties.put(PropertyKeyConst.SecretKey,sk);
                //properties.put(PropertyKeyConst.ONSAddr,"http://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal");
                
                Consumer consumer = ONSFactory.createConsumer(properties);
                
                consumer.subscribe(topic, "*", new MessageListener() {
                    @Override
                    public Action consume(Message message, ConsumeContext context) {
                        ScheduleJob scheduleJob = (ScheduleJob) ObjectsTranscoder.deserialize(message.getBody());
                        if( scheduleJob !=null ){
                            //更新消息状态为3:消费消息成功
                            ScheduleJobLog scjLog = new ScheduleJobLog();
                            scjLog.setStatus(3);
                            scjLog.setMqScheduleJobLogId(scheduleJob.getScjLogId());
                            messageService.updateMqScheduleJobLog(scjLog);
                            try {
                                ConsumerService consumerService = (ConsumerService) Class.forName(sjc.getImplementClass()).newInstance();
                                boolean isSuccess = consumerService.consume(scheduleJob.getPrimaryKeyList());
                                if(isSuccess){
                                    //更新消息状态为4:业务逻辑处理成功
                                    scjLog.setStatus(4);
                                    messageService.updateMqScheduleJobLog(scjLog);
                                }
                            } catch (InstantiationException e) {
                                e.printStackTrace();
                            } catch (IllegalAccessException e) {
                                e.printStackTrace();
                            } catch (ClassNotFoundException e) {
                                e.printStackTrace();
                            }
                        }
                        return Action.CommitMessage;
                    }
                });
                
                consumer.start();
            }
        }
        
        @Override
        public void contextDestroyed(ServletContextEvent sce) {
            
        }
    }
  • 相关阅读:
    sklearn 数据预处理1: StandardScaler
    Gitlab利用Webhook实现Push代码后的Jenkins自动构建
    Shell脚本-自动化部署WEB
    Jenkins可用环境变量以及使用方法
    Docker常用命令
    nginx中root和alias的区别
    gitlab+jenkins=自动化构建
    Spring Boot2.0:使用Docker部署Spring Boot
    Maven内置属性、POM属性
    navicat连接mysql出现Client does not support authentication protocol requested by server解决方案
  • 原文地址:https://www.cnblogs.com/wangfajun/p/5923186.html
Copyright © 2011-2022 走看看