Spring Integration 配置
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-jpa="http://www.springframework.org/schema/integration/jpa"
xmlns:int-jms="http://www.springframework.org/schema/integration/jms"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-2.2.xsd
http://www.springframework.org/schema/integration/jms http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/jpa http://www.springframework.org/schema/integration/jpa/spring-integration-jpa-2.2.xsd
">
<int-jpa:inbound-channel-adapter
auto-startup="true" entity-manager="em"
send-timeout="60000" channel="process.channel"
expect-single-result="true"
jpa-query="SELECT sysdate FROM dual">
<int:poller fixed-delay="60000" />
</int-jpa:inbound-channel-adapter>
<int:channel id="process.channel">
<int:queue capacity="1"/>
</int:channel>
<int:chain input-channel="process.channel">
<int-jpa:retrieving-outbound-gateway entity-manager="em" jpa-query="SELECT sp FROM SmsMessage sp Where sp.tatus is null order by sp.requestOn,sp.id"/>
<int:splitter ref="process.processSplitter" method="split"/>
<int:service-activator ref="process.smsSenderService"
method="send" />
<int:poller fixed-delay="5000" receive-timeout="-1"/>
</int:chain>
<bean id="process.smsSenderService" class="com.yd.core.service.SmsSenderService" />
<bean id="process.processSplitter" class="com.yd.core.service.PaymentProcessSplitter" />
</beans>
Job Worker
import org.springframework.context.ApplicationContext;
import org.springframework.integration.MessageChannel;
import org.springframework.integration.support.MessageBuilder;
public class JobWorker implements Runnable {
private static final int DEFAULT_WAIT_TIME = 3000;
@Override
public void run() {
while (true) {
try {
LoggerUtil.getJobLogger().info("JobWorker, Ready for take job run request.");
JobRunnerRequest jobRequest = JobManagerService.getJobManager().takeRequest();
while (jobRequest == null) {
LoggerUtil.getJobLogger().warn("JobWorker, jobRequest is null, will try to get the job requet again.");
Thread.sleep(DEFAULT_WAIT_TIME);
jobRequest = JobManagerService.getJobManager().takeRequest();
}
LoggerUtil.getJobLogger().info("JobWorker, Received a job run request.");
MessageChannel channel = findChannel(jobRequest.getJobChannelId());
if (channel != null) {
channel.send(MessageBuilder.withPayload(jobRequest.getJobMessagePayload()).build());
LoggerUtil.getJobLogger().info("JobWorker, Completed to sned message to job channel");
}
}
catch (Exception ex) {
LoggerUtil.getJobLogger().warn("JobWorker, Completed to sned message to job channel");
}
}
}
private MessageChannel findChannel(String jobChannelId) {
ApplicationContext context = ApplicationContextProvider.getContext();
if (context == null) {
LoggerUtil.getJobLogger().error(String.format("JobWorker, Cannot get the application context, to startup job %s", jobChannelId));
return null;
}
Object channel = context.getBean(jobChannelId);
if (channel instanceof MessageChannel) {
return (MessageChannel) channel;
}
else {
LoggerUtil.getJobLogger().error(String.format("JobWorker, Cannot get the message bean, to startup job %s", jobChannelId));
return null;
}
}
}
JobManagerService
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public final class JobManagerService {
private BlockingQueue<JobRunnerRequest> jobRequestQueue = new LinkedBlockingQueue<JobRunnerRequest>();
private static volatile JobManagerService jobManagerInstnce;
private static Object objSyncLocker = new Object();
private JobManagerService() {
}
private void startupWorker() {
new Thread(new JobWorker()).start();
}
public static JobManagerService getJobManager() {
if (jobManagerInstnce == null) {
synchronized (objSyncLocker) {
if (jobManagerInstnce == null) {
jobManagerInstnce = new JobManagerService();
jobManagerInstnce.startupWorker();
}
}
}
return jobManagerInstnce;
}
public void addRequest(JobRunnerRequest request) {
try {
jobRequestQueue.put(request);
}
catch (InterruptedException e) {
LoggerUtil.getJobLogger().warn(e.getMessage(), e);
}
}
public JobRunnerRequest takeRequest() {
try {
return jobRequestQueue.take();
}
catch (InterruptedException e) {
LoggerUtil.getJobLogger().warn(e.getMessage(), e);
return null;
}
}
}
ApplicatonContextProvider
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
public class ApplicationContextProvider implements ApplicationContextAware {
private static volatile ApplicationContext ctx;
public static ApplicationContext getContext() {
return ctx;
}
private static synchronized void setContext(ApplicationContext applicationContext) {
ctx = applicationContext;
}
@Override
public void setApplicationContext(ApplicationContext applicationContext){
setContext(applicationContext);
}
}