zoukankan      html  css  js  c++  java
  • Spring Integration

    Spring Integration 配置

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration"
        xmlns:int-jpa="http://www.springframework.org/schema/integration/jpa"
        xmlns:int-jms="http://www.springframework.org/schema/integration/jms"
        xmlns:context="http://www.springframework.org/schema/context"
        xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
                http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-2.2.xsd
                http://www.springframework.org/schema/integration/jms http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd
                http://www.springframework.org/schema/context    http://www.springframework.org/schema/context/spring-context.xsd            
                http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
                http://www.springframework.org/schema/integration/jpa http://www.springframework.org/schema/integration/jpa/spring-integration-jpa-2.2.xsd
                ">    
    
        <int-jpa:inbound-channel-adapter
            auto-startup="true" entity-manager="em"
            send-timeout="60000" channel="process.channel"
            expect-single-result="true"
            jpa-query="SELECT sysdate FROM dual">
            <int:poller fixed-delay="60000" />
        </int-jpa:inbound-channel-adapter>
            
        <int:channel id="process.channel">
            <int:queue capacity="1"/>        
        </int:channel>
    
        <int:chain input-channel="process.channel">    
            
            <int-jpa:retrieving-outbound-gateway entity-manager="em" jpa-query="SELECT sp FROM SmsMessage sp Where sp.tatus is null order by sp.requestOn,sp.id"/>    
                    
            <int:splitter ref="process.processSplitter" method="split"/>
            
            <int:service-activator ref="process.smsSenderService"
                method="send" />
                
            <int:poller fixed-delay="5000" receive-timeout="-1"/>            
        </int:chain>        
        
        <bean id="process.smsSenderService" class="com.yd.core.service.SmsSenderService" />        
        
        <bean id="process.processSplitter" class="com.yd.core.service.PaymentProcessSplitter" />
    </beans>

    Job Worker

    import org.springframework.context.ApplicationContext;
    import org.springframework.integration.MessageChannel;
    import org.springframework.integration.support.MessageBuilder;
    
    public class JobWorker implements Runnable {
    
        private static final int DEFAULT_WAIT_TIME = 3000;
    
        @Override
        public void run() {
            while (true) {
                try {
                    LoggerUtil.getJobLogger().info("JobWorker, Ready for take job run request.");
    
                    JobRunnerRequest jobRequest = JobManagerService.getJobManager().takeRequest();
                    while (jobRequest == null) {
                        LoggerUtil.getJobLogger().warn("JobWorker, jobRequest is null, will try to get the job requet again.");
                        Thread.sleep(DEFAULT_WAIT_TIME);
                        jobRequest = JobManagerService.getJobManager().takeRequest();
                    }
    
                    LoggerUtil.getJobLogger().info("JobWorker, Received a job run request.");
    
                    MessageChannel channel = findChannel(jobRequest.getJobChannelId());
                    if (channel != null) {
                        channel.send(MessageBuilder.withPayload(jobRequest.getJobMessagePayload()).build());
                        LoggerUtil.getJobLogger().info("JobWorker, Completed to sned message to job channel");
                    }
                }
                catch (Exception ex) {
                    LoggerUtil.getJobLogger().warn("JobWorker, Completed to sned message to job channel");
                }
            }
        }
    
        private MessageChannel findChannel(String jobChannelId) {
            ApplicationContext context = ApplicationContextProvider.getContext();
            if (context == null) {
                LoggerUtil.getJobLogger().error(String.format("JobWorker, Cannot get the application context, to startup job %s", jobChannelId));
                return null;
            }
    
            Object channel = context.getBean(jobChannelId);
            if (channel instanceof MessageChannel) {
                return (MessageChannel) channel;
            }
            else {
                LoggerUtil.getJobLogger().error(String.format("JobWorker, Cannot get the message bean, to startup job %s", jobChannelId));
                return null;
            }
        }
    }

    JobManagerService

    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.LinkedBlockingQueue;
    
    public final class JobManagerService {
        private BlockingQueue<JobRunnerRequest> jobRequestQueue = new LinkedBlockingQueue<JobRunnerRequest>();
        private static volatile  JobManagerService jobManagerInstnce;
        private static Object objSyncLocker = new Object();
    
        private JobManagerService() {
        }
    
        private void startupWorker() {
            new Thread(new JobWorker()).start();
        }
    
        public static JobManagerService getJobManager() {
            if (jobManagerInstnce == null) {
                synchronized (objSyncLocker) {
                    if (jobManagerInstnce == null) {
                        jobManagerInstnce = new JobManagerService();
                        jobManagerInstnce.startupWorker();
                    }
                }
            }
            return jobManagerInstnce;
        }
    
        public void addRequest(JobRunnerRequest request) {
            try {
                jobRequestQueue.put(request);
            }
            catch (InterruptedException e) {
                LoggerUtil.getJobLogger().warn(e.getMessage(), e);
            }
        }
    
        public JobRunnerRequest takeRequest() {
            try {
                return jobRequestQueue.take();
            }
            catch (InterruptedException e) {
                LoggerUtil.getJobLogger().warn(e.getMessage(), e);
                return null;
            }
        }
    }

    ApplicatonContextProvider

    import org.springframework.context.ApplicationContext;
    import org.springframework.context.ApplicationContextAware;
    
    public class ApplicationContextProvider implements ApplicationContextAware {
    
        private static volatile ApplicationContext ctx;
    
        public static ApplicationContext getContext() {
            return ctx;
        }
    
        private static synchronized void setContext(ApplicationContext applicationContext) {
            ctx = applicationContext;
        }
    
        @Override
        public void setApplicationContext(ApplicationContext applicationContext){
            setContext(applicationContext);
        }
    }
  • 相关阅读:
    用ant发布项目版本
    11 款用于优化、分析源代码的Java工具 转载
    第四章 数学运算
    jdbc for mysql demo
    第二章 PHP基础
    第七章 自定义函数
    第五章 数组
    jmock2.5基本教程 转载
    jdbc for mssql2005 demo
    oracle实战第三天事务处理与函数
  • 原文地址:https://www.cnblogs.com/BenWong/p/3790998.html
Copyright © 2011-2022 走看看