zoukankan      html  css  js  c++  java
  • 内容平台消息队列异步静态化实现

    一、重构前系统面对的问题

    随着分公司站点的建设,后台稿件的数量越来越多,加上自动签发的需求,
    大量的静态化请求非常容易造成系统宕机,于是引入消息队列做异步处理,平滑请求处理曲线。

    二、静态化消息发送端工程

    首先并不是全部的静态化请求都需要通过消息队列来处理,在项目配置文件中添加了一个static_mq的参数控制是否静态化。

    (1)处理请求,并对静态化相关的数据进行封装
    收到静态化请求后,后台首先进行判断,是否开启消息队列静态化,
    确认后实例化一个MQ的线程,将相关的静态化信息保存到线程信息中,然后将线程提交给线程池调度,如果未开启消息队列,则按照原有的逻辑进行静态化。
    以首页静态化为例:

    String isMQStatic = ConfUtil.getString("static_mq");
    if (isMQStatic != null && "true".equals(isMQStatic.trim())) {
    				// 如果启用了消息队列,则发起一个mq的线程执行
    				IndexStaticPageThread indexStaticPage = new IndexStaticPageThread(site.getId(), jmsTemplate);
    				// 将线程提交到线程池,提高执行的效率
    				StaticThreadPool.threadPool().execute(indexStaticPage);
    			} else {
    				staticPageSvc.index(site);
    			}		
    

    栏目和稿件的类似,但是需要将静态化的信息添加到线程实例中,下面分别是栏目消息封装:

    if (isMQStatic != null && "true".equals(isMQStatic.trim())) {
    				// 增加MQ的消息发送方式 begin
    				Map<Object, Object> paramMap = new HashMap<Object, Object>();
    				paramMap.put("siteId", site.getId());
    				paramMap.put("contextPath", request.getContextPath());
    				paramMap.put("isStaticFirst", staticFirst);
    				paramMap.put("isContainChild", containChild);
    				paramMap.put("startDate", startDate);
    
    				ChannelStaticPageThread channelStaticPage = new ChannelStaticPageThread(channelId, jmsTemplate, paramMap);
    
    				StaticThreadPool.threadPool().execute(channelStaticPage);
    
    				// 增加MQ的消息发送方式 end
    			} else {
    				count = staticPageSvc.channel(staticFirst, site.getId(), channelId, startDate, containChild, request);
    			}
    

      稿件消息封装:

    if (isMQStatic != null && "true".equals(isMQStatic.trim())) {
    				Map<Object, Object> paramMap = new HashMap<Object, Object>();
    				paramMap.put("siteId", siteId);
    				paramMap.put("startDate", startDate);
    				paramMap.put("endDate", endDate);
    				paramMap.put("needRegenerateAll", needRegenerateAll);
    				paramMap.put("subCommand", "pages");
    				paramMap.put("isContainChild", true);
    
    				ChannelStaticPageThread channelStaticPage = new ChannelStaticPageThread(channelId, jmsTemplate, paramMap);
    				StaticThreadPool.threadPool().execute(channelStaticPage);
    			} else {
    				count = staticPageSvc.content(siteId, channelId, startDate, endDate, needRegenerateAll);
    			}
    

      

    (2)发送消息到ActiveMQ中
    消息线程通过传入的静态化信息构造并且实例化以后,在线程的执行体中做具体的消息绑定和发送工作。

    下面以ChannelStaticPageThread为例来看一下具体是如何对消息队列进行封装和发送的:

    public class ChannelStaticPageThread implements Runnable {
    	private static final Logger log = LoggerFactory.getLogger(ChannelStaticPageThread.class);
    
    	private Integer channelId;
    	private JmsTemplate jmsTemplate;
    	private Map<Object, Object> paramMap;
    
    	private boolean needRegenerateAll, isStaticFirst, isContainChild = false;
    
    	// 构造方法
    	public ChannelStaticPageThread(Integer channelId, JmsTemplate jmsTemplate, Map<Object, Object> paramMap) {
    		this.channelId = channelId;
    		this.jmsTemplate = jmsTemplate;
    		this.paramMap = paramMap;
    	}
    
    	public void run() {
    		log.info("thread...send message,channel static,channel_id:" + channelId);
    
    		// 需要根据不同的情形,调用不同的queue
    		String QueueName = "xhCloud.STATIC.AUTO";
    
    		Integer siteId = (Integer) paramMap.get("siteId");
    		String contextPath = (String) paramMap.get("contextPath");
    
    		Date startDate = (Date) paramMap.get("startDate");
    		Date endDate = (Date) paramMap.get("endDate");
    
    		if (paramMap.get("isStaticFirst") != null)
    			isStaticFirst = (Boolean) paramMap.get("isStaticFirst");
    
    		if (paramMap.get("isContainChild") != null)
    			isContainChild = (Boolean) paramMap.get("isContainChild");
    
    		if (paramMap.get("needRegenerateAll") != null)
    			needRegenerateAll = (Boolean) paramMap.get("needRegenerateAll");
    
    		// 用包装过的taskNode发送
    		ChannelTaskEntity taskEntity = new ChannelTaskEntity();
    		taskEntity.setCommand("channel");
    
    		String subCommand = (String) paramMap.get("subCommand");
    
    		if (subCommand != null && subCommand.trim().length() > 0) {
    			taskEntity.setSubCommand(subCommand);
    		} else {
    			taskEntity.setSubCommand("default");
    		}
    
    		taskEntity.setChannelId(channelId);
    		taskEntity.setSiteId(siteId);
    
    		taskEntity.setContextPath(contextPath);
    		taskEntity.setStaticFirst(isStaticFirst);
    		taskEntity.setContainChild(isContainChild);
    		taskEntity.setStartDate(startDate);
    		taskEntity.setEndDate(endDate);
    		taskEntity.setRegenerateAll(needRegenerateAll);
    
    		InstantTaskNode<ChannelTaskEntity> instTaskNode = new InstantTaskNode<ChannelTaskEntity>();
    		instTaskNode.setMainPriority(TaskMainPriority.MEDIUM);
    		instTaskNode.setTaskType(TaskType.INSTANT);
    		instTaskNode.setEntity(taskEntity);
    
    		jmsTemplate.convertAndSend(QueueName, instTaskNode);
    		log.info("send success!channelid:" + channelId + " subCommand:" + subCommand);
    	}
    } 

    这里的JmsTemplate来自于org.springframework.jms.core.JmsTemplate,
    是一个消息传递的标准,具体的分析可以查看博文。

    为了区分消息的优先级,在MQ中创建了若干个不同的队列,
    不同的任务通过不同的消息队列发送;

    jms.queue.static.auto=xhCloud.STATIC.AUTO
    jms.queue.static.medium=xhCloud.STATIC.MEDIUM
    jms.queue.static.high=xhCloud.STATIC.HIGH
    jms.queue.static.critical=xhCloud.STATIC.CRITICAL

    (3)对线程池的处理
    代码中维护了一个用于处理静态化操作的线程池,
    是通过java.util.concurrent.ThreadPoolExecutor来创建的。

    看一下具体的代码实现:

    public class StaticThreadPool {
    	private static int corePoolSize;// 池中所保存的线程数,包括空闲线程
    	private static int maximumPoolSize;// 池中允许的最大线程数
    	private static int keepAliveTime;// 当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间。
    	private static int workQueue; // 执行前用于保持任务的队列。此队列仅保持由 execute 方法提交的 Runnable
    									// 任务。
    	private static ThreadPoolExecutor executor = null;
    
    	private static String resource = "activeMQStaticThreadPool";
    	private static ResourceBundle rb = null;
    
    	public static ThreadPoolExecutor threadPool() {
    		if (rb == null) {
    			rb = ResourceBundle.getBundle(resource);
    			if (rb != null) {
    				corePoolSize = Integer.parseInt(rb.getString("corePoolSize"));
    				maximumPoolSize = Integer.parseInt(rb.getString("maximumPoolSize"));
    				keepAliveTime = Integer.parseInt(rb.getString("keepAliveTime"));
    				workQueue = Integer.parseInt(rb.getString("workQueue"));
    			} else {
    				System.out.println("thread pool property initialized failed!");
    			}
    		}
    		if (executor == null) {
    			System.out.println(" Initialize StaticThreadPool Executor");
    			executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(workQueue), new ThreadPoolExecutor.AbortPolicy());
    		}
    		return executor;
    	}
    }
    

      

    可以看到通过读取配置文件的形式设置了线程池的相关参数,
    下面是具体的配置文件:

    #线程池维护线程的最少数量
    corePoolSize=10
    #线程池维护线程的最大数量
    maximumPoolSize=50
    #线程池维护线程所允许的空闲时间
    keepAliveTime=60000
    #线程池所使用的缓冲队列数
    workQueue=10
    

      

    线程池的任务队列使用了java.util.concurrent.ArrayBlockingQueue,
    这是一个基于数组结构的有界阻塞队列,此队列按 FIFO(先进先出)原则对元素进行排序。

    在代码中添加了一个threadPool()的静态方法,
    通过传入配置文件中的参数,返回ThreadPoolExecutor实例,
    使用了如下的构造方法:

    new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, milliseconds,runnableTaskQueue, handler);
    

    下面简单对参数进行说明:

    corePoolSize(线程池的基本大小):当提交一个任务到线程池时,线程池会创建一个线程来执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程,等到需要执行的任务数大于线程池基本大小时就不再创建。如果调用了线程池的prestartAllCoreThreads方法,线程池会提前创建并启动所有基本线程。
    runnableTaskQueue(任务队列):用于保存等待执行的任务的阻塞队列。 
    maximumPoolSize(线程池最大大小):线程池允许创建的最大线程数。如果队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。值得注意的是如果使用了无界的任务队列这个参数就没什么效果。
    ThreadFactory:用于设置创建线程的工厂,可以通过线程工厂给每个创建出来的线程设置更有意义的名字。
    RejectedExecutionHandler(饱和策略):当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。这个策略默认情况下是AbortPolicy,表示无法处理新任务时抛出异常。
    以下是JDK1.5提供的四种策略。
    AbortPolicy:直接抛出异常。
    CallerRunsPolicy:只用调用者所在线程来运行任务。
    DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务。
    DiscardPolicy:不处理,丢弃掉。
    当然也可以根据应用场景需要来实现RejectedExecutionHandler接口自定义策略。如记录日志或持久化不能处理的任务。
    keepAliveTime(线程活动保持时间):线程池的工作线程空闲后,保持存活的时间。所以如果任务很多,并且每个任务执行的时间比较短,可以调大这个时间,提高线程的利用率。
    TimeUnit(线程活动保持时间的单位):可选的单位有天(DAYS),小时(HOURS),分钟(MINUTES),毫秒(MILLISECONDS),微秒(MICROSECONDS, 千分之一毫秒)和毫微秒(NANOSECONDS, 千分之一微秒)。

    另外这里线程池的创建可以看做是单例的应用。

    (4)编写并提供首页/栏目/详情页等静态化的Dubbo接口

    根据目前系统中对静态化请求类型的划分,对外提供了
    staticIndexPageService、staticChannelPageService、
    staticContentPageService等静态化服务。

    下面是截取provider.xml的配置:

    <!-- 用dubbo协议在20880端口暴露服务 -->
    	<dubbo:protocol name="dubbo" port="${dubbo.port}" />
    	<!-- 使用zookeeper注册中心暴露服务地址  -->
    	<dubbo:registry protocol="zookeeper"
    		address="192.168.75.138:2181,192.168.75.139:2181,192.168.75.140:2181,192.168.75.141:2181,192.168.75.142:2181"/>
    	<!-- 静态化服务 -->
    	<dubbo:service interface="com.bingyue.dubbo.StaticChannelPageService"
    		ref="staticChannelPage" timeout="50000" version="2.2.006" />
    	<bean id="staticChannelPage"
    		class="com.xinhuanet.statictpl.dubbo.impl.StaticChannelPageServiceImpl" />
    
    	<dubbo:service interface="com.bingyue.dubbo.StaticIndexPageService"
    		ref="staticIndexPage" timeout="50000" version="2.2.006" />
    	<bean id="staticIndexPage"
    		class="com.bingyue.dubbo.impl.StaticIndexPageServiceImpl" />
    

      简单看一下Dubbo接口的编写:

    public interface StaticContentPageService {
    	public TaskEntity[] staticizeContentPage(Integer contentId, boolean isCheckedBeforePublished, boolean isBatchChange) throws StaticPageException, UnrecoverableException;
    }
    

      这个Dubbo接口是对原有静态化服务的封装,最终的实现还是通过项目最初的静态化服务。

    三、静态化消息处理端工程

    回顾完了静态化消息发送端的设计和实现,再来看一下消费端工程是如何实现的。
    消费端工程是一个Maven管理的多项目聚合工程,加上后期的功能扩展,形成了现在一个较大的工程:

    (1)父工程下各个Maven项目的作用如下:

    static-beans:
    主要封装了基础的消息实体,主要是TaskEntity等几个抽象类,自定义了程序执行期间相关的异常,依赖static-utils项目。
    下面以TaskEntity为例:

    public abstract class TaskEntity implements Serializable {
    	private final static Logger logger = LoggerFactory
    			.getLogger(TaskEntity.class);
    	private static final long serialVersionUID = 6139912897589483499L;
    	final ObjectMapper mapper = ObjectMapperFactory.get();
    	TaskEntityType entityType;
    	String command; // 静态化执行类型
    	String subCommand; // 静态化的二级执行类型,对于属性页静态化的任务,即为属性的类型
    
    	public String getSubCommand() {
    		return subCommand;
    	}
    	public void setSubCommand(String subCommand) {
    		this.subCommand = subCommand;
    	}
    	public String getCommand() {
    		return command;
    	}
    	public void setCommand(String command) {
    		this.command = command;
    	}
    	public TaskEntityType getEntityType() {
    		return entityType;
    	}
    
    	public void setEntityType(TaskEntityType entityType) {
    		this.entityType = entityType;
    	}
    	public String toJson() {
    		try {
    			return mapper.writeValueAsString(this);
    		} catch (JsonProcessingException e) {
    			logger.error("[{}]:[{}]", e.getClass().getName(), e.getMessage());
    		}
    		return this.getCommand() + "," + this.getSubCommand();
    	}
    }
    

      

    static-cachemanager:
    配置了Infinispan 缓存模式,主要就是包装了org.infinispan.manager.DefaultCacheManager,代码很简单:

    public class OptimizeCacheManager {
    		public static DefaultCacheManager  cacheManager = new DefaultCacheManager();
    }


    关于Infinispan ,简单了解一下就好,这里使用的是Infinispan的本地模式。
    Infinispan 是个开源的数据网格平台。它公开了一个简单的数据结构(一个Cache)来存储对象。
    看一下pom.xml的配置:

    <dependencies>
    <dependency>
    <groupId>org.infinispan</groupId>
    <artifactId>infinispan-cachestore-leveldb</artifactId>
    <version>7.0.2.Final</version>
    </dependency>
    <dependency>
    <groupId>org.infinispan</groupId>
    <artifactId>infinispan-spring</artifactId>
    <version>7.0.2.Final</version>
    </dependency>
    </dependencies>


    static-dubboinvoke:
    测试项目,主要测试Dubbo服务端提供的静态化接口是否有效。

    static-event:
    继承org.springframework.context.ApplicationEvent,实现了一个
    TaskProcessErrorEvent,用来监听消息处理中的错误事件。

    public class TaskProcessErrorEvent<T extends TaskEntity> extends
    		ApplicationEvent {
    	private static final long serialVersionUID = -4883903824786622364L;
    	TaskNode<T> taskNode;
    
    	public TaskProcessErrorEvent(TaskNode<T> source) {
    		super(source);
    		this.taskNode = source;
    	}
    	public TaskNode<T> getTaskNode() {
    		return taskNode;
    	}
    	public void setTaskNode(TaskNode<T> taskNode) {
    		this.taskNode = taskNode;
    	}
    }

    依赖static-beans项目。

    static-event-channelhandler:
    继承ApplicationListener,实现了一个ChannelTaskOccurredEventHandler。

    static-event-handler:
    类似static-event-channelhandler项目。

    static-executor:
    维护了一个任务调度的线程池,
    实现方式和前面发起静态化线程的线程池类似,不同的是
    这里使用Executors获得线程池实例的方法,
    封装了两个线程池,一个是
    java.util.concurrent.ThreadPoolExecutor
    java.util.concurrent.ScheduledThreadPoolExecutor

    ScheduledThreadPoolExecutor是ThreadPoolExecutor的子类,
    被用来执行定时任务。
    看一下代码:

    public class OptimizeScheduledExecutor {
    	public static ScheduledThreadPoolExecutor executor  = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(1);
    	public static ThreadPoolExecutor excutorBatch = (ThreadPoolExecutor) Executors.newSingleThreadExecutor();
    	static{
    		OptimizeScheduledExecutor.executor.setMaximumPoolSize(1);
    	}
    }

    static-process:
    定义了消息转换和发送等的接口。

    static-utils:
    项目工具类。

    static-xuan-beans:
    这个项目在代码层面是继承static-beans工程的,

    其中继承抽象类TaskEntity实现了针对首页、栏目和稿件静态化的三个消息实体对象:
    ChannelTaskEntity、IndexTaskEntity和ContentTaskEntity。

    继承抽象包装类TaskEntityWrapper实现了
    ChannelTaskEntityWrapper、ContentTaskEntityWrapper和IndexTaskEntityWrapper。

    继承ApplicationEvent实现了ChannelTaskOccurredEvent。

    其他的mobile等项目是后期扩展添加,不在我们的讨论范围中。

    下面重点看一下static-starter模块,分析一下相关流程,这是消费端项目启动的入口。

    (2)从MQ中接收消息并处理

    下面我们分析static-starter项目的配置和启动。


    (3)相关的配置文件

    ActiveMQ配置文件,application.properties:

    jms.broker.url=failover:(tcp://192.168.xx.xx:61616?wireFormat.maxInactivityDuration=0,tcp://192.168.xx.xx:61617?wireFormat.maxInactivityDuration=0)&maxReconnectDelay=1000
    jms.username=test
    jms.password=test
    jms.queue.static.auto=STATIC.AUTO
    jms.queue.static.medium=STATIC.MEDIUM
    jms.queue.static.high=STATIC.HIGH
    jms.queue.static.critical=STATIC.CRITICAL

    Dubbo消费端配置文件,consumer.xml:

    Redis配置文件,供jesque读取,redis.properties;

    redis.host=192.168.xx.xx
    redis.port=6379
    redis.timeout=5000
    redis.password=****
    redis.namespace=resque
    redis.database=0
    jesque.queue.name=bingyue
    

      

    配置jesque信息,jesque.xml:

    <bean id="config" class="net.greghaines.jesque.Config">
    		<constructor-arg value="${redis.host}" />
    		<constructor-arg value="${redis.port}" />
    		<constructor-arg value="${redis.timeout}" />
    		<constructor-arg value="${redis.password}" />
    		<constructor-arg value="${redis.namespace}" />
    		<constructor-arg value="${redis.database}" />
    	</bean>
    	
    	<bean id="jedisPool" class="redis.clients.jedis.JedisPool">
    		<constructor-arg>
    			<bean class="net.greghaines.jesque.utils.PoolUtils" factory-method="getDefaultPoolConfig" />
    		</constructor-arg>
    		<constructor-arg value="${redis.host}" />
    		<constructor-arg value="${redis.port}" />
    		<constructor-arg value="${redis.timeout}" />
    		<constructor-arg value="${redis.password}" />
    		<constructor-arg value="${redis.database}" />
    	</bean>
    	<bean id="jesqueClient" class="net.greghaines.jesque.client.ClientImpl">
    		<constructor-arg ref="config" />
    		<constructor-arg value="true" />
    	</bean>
    	
    	<bean id="failureDAO" class="net.greghaines.jesque.meta.dao.impl.FailureDAORedisImpl">
    		<constructor-arg ref="config" />
    		<constructor-arg ref="jedisPool" />
    	</bean>
    	
    	<bean id="keysDAO" class="net.greghaines.jesque.meta.dao.impl.KeysDAORedisImpl">
    		<constructor-arg ref="config" />
    		<constructor-arg ref="jedisPool" />
    	</bean>
    	
    	<bean id="queueInfoDAO" class="net.greghaines.jesque.meta.dao.impl.QueueInfoDAORedisImpl">
    		<constructor-arg ref="config" />
    		<constructor-arg ref="jedisPool" />
    	</bean>
    	
    	<bean id="workerInfoDAO" class="net.greghaines.jesque.meta.dao.impl.WorkerInfoDAORedisImpl">
    		<constructor-arg ref="config" />
    		<constructor-arg ref="jedisPool" />
    	</bean>
    
    	<bean id="queueName" class="java.lang.String">  
        <constructor-arg value="${jesque.queue.name}"/>  
    	</bean>   

    配置加载MQ连接池等,mq-config.xml:

    <amq:connectionFactory brokerURL="${jms.broker.url}"
    		id="amqConnectionFactory" userName="${jms.username}" password="${jms.password}">
    		<amq:redeliveryPolicyMap>
    			<amq:redeliveryPolicyMap>
    				<amq:defaultEntry>
    					<!-- default policy, 5 times with 10s delay each -->
    					<amq:redeliveryPolicy maximumRedeliveries="3"
    						initialRedeliveryDelay="1000" />
    				</amq:defaultEntry>
    				<amq:redeliveryPolicyEntries>
    					<!-- three times with exponential back-off, that is, 1s, 2s, 4s, 8s. 
    						"queue" references to the "physicalName" defined in amq:queue -->
    					<amq:redeliveryPolicy queue="${jms.queue.static.auto}"
    						maximumRedeliveries="3" initialRedeliveryDelay="1000"
    						backOffMultiplier="3" useExponentialBackOff="true"
    						maximumRedeliveryDelay="60000" />
    					<!-- another policy mapping -->
    					<amq:redeliveryPolicy queue="${jms.queue.static.high}"
    						maximumRedeliveries="3" initialRedeliveryDelay="2000" />
    				</amq:redeliveryPolicyEntries>
    			</amq:redeliveryPolicyMap>
    		</amq:redeliveryPolicyMap>
    	</amq:connectionFactory>
    
    	<!-- Default Destination Queue Definition 设置预读取条数 -->
    	<bean id="destinationAuto" class="org.apache.activemq.command.ActiveMQQueue">
    		<constructor-arg index="0"
    			value="${jms.queue.static.auto}?consumer.prefetchSize=1" />
    	</bean>
    	<bean id="destinationMedium" class="org.apache.activemq.command.ActiveMQQueue">
    		<constructor-arg index="0"
    			value="${jms.queue.static.medium}?consumer.prefetchSize=1" />
    	</bean>
    	<bean id="destinationHigh" class="org.apache.activemq.command.ActiveMQQueue">
    		<constructor-arg index="0"
    			value="${jms.queue.static.high}?consumer.prefetchSize=1" />
    	</bean>
    	<bean id="destinationCritical" class="org.apache.activemq.command.ActiveMQQueue">
    		<constructor-arg index="0"
    			value="${jms.queue.static.critical}?consumer.prefetchSize=1" />
    	</bean>
    
    	<!-- Text message converter -->
    	<bean id="textMessageConverter"
    		class="test.task.process.jms.internal.TextTaskMessageConverter">
    	</bean>
    
    	<!-- Map message converter -->
    	<bean id="mapMessageConverter"
    		class="test.task.process.jms.internal.MapTaskMessageConverter">
    	</bean>
    
    	<!-- Message Receiver Definition -->
    	<bean id="messageListener"
    		class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
    		<constructor-arg>
    			<bean
    				class="test.task.process.handle.internal.DefaultHandleTaskMessageEntry">
    				<property name="taskMessageSender" ref="textTaskMessageSender" />
    			</bean>
    		</constructor-arg>
    		<property name="defaultListenerMethod" value="processTask" />
    		<!-- property name="messageConverter"><null/></property -->
    		<property name="messageConverter" ref="textMessageConverter" />
    	</bean>
    	<bean id="errorHandler"
    		class="test.task.process.handle.internal.DefaultTaskProcessErrorHandler" />
    
    	<bean id="taskMessageListenerContainerAuto"
    		class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    		<property name="sessionAcknowledgeMode" value="2" />
    		<property name="connectionFactory" ref="amqConnectionFactory" />
    		<property name="destination" ref="destinationAuto" />
    		<property name="messageListener" ref="messageListener" />
    		<property name="concurrentConsumers" value="8" />
    		<property name="errorHandler" ref="errorHandler" />
    	</bean>
    
    	<bean id="taskMessageListenerContainerMedium"
    		class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    		<!-- property name="sessionTransacted" value="true"/ -->
    		<property name="sessionAcknowledgeMode" value="2" /><!-- 2-client-ack 
    			0 transaction -->
    		<property name="connectionFactory" ref="amqConnectionFactory" />
    		<property name="destination" ref="destinationMedium" />
    		<property name="messageListener" ref="messageListener" />
    		<property name="concurrentConsumers" value="1" />
    		<property name="errorHandler" ref="errorHandler" />
    		<!-- property name="maxMessagesPerTask" value="1" / -->
    	</bean>
    
    	<bean id="taskMessageListenerContainerHigh"
    		class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    		<!-- property name="sessionTransacted" value="true"/ -->
    		<property name="sessionAcknowledgeMode" value="2" /><!-- 2-client-ack 
    			0 transaction -->
    		<property name="connectionFactory" ref="amqConnectionFactory" />
    		<property name="destination" ref="destinationHigh" />
    		<property name="messageListener" ref="messageListener" />
    		<property name="concurrentConsumers" value="1" />
    		<property name="errorHandler" ref="errorHandler" />
    		<!-- property name="maxMessagesPerTask" value="1" / -->
    	</bean>
    	<bean id="taskMessageListenerContainerCritical"
    		class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    		<!-- property name="sessionTransacted" value="true"/ -->
    		<property name="sessionAcknowledgeMode" value="2" /><!-- 2-client-ack 
    			0 transaction -->
    		<property name="connectionFactory" ref="amqConnectionFactory" />
    		<property name="destination" ref="destinationCritical" />
    		<property name="messageListener" ref="messageListener" />
    		<property name="concurrentConsumers" value="1" />
    		<property name="errorHandler" ref="errorHandler" />
    		<!-- property name="maxMessagesPerTask" value="1" / -->
    	</bean>
    
    	<bean id="textJmsTemplate" class="org.springframework.jms.core.JmsTemplate">
    		<property name="connectionFactory" ref="amqConnectionFactory" />
    		<property name="messageConverter" ref="textMessageConverter" />
    	</bean>
    
    	<bean id="mapJmsTemplate" class="org.springframework.jms.core.JmsTemplate">
    		<property name="connectionFactory" ref="amqConnectionFactory" />
    		<property name="messageConverter" ref="mapMessageConverter" />
    	</bean>
    
    	<bean id="textTaskMessageSender"
    		class="test.task.process.handle.internal.DefaultTaskMessageSender">
    		<property name="jmsTemplate" ref="textJmsTemplate" />
    	</bean>
    
    	<bean id="mapTaskMessageSender"
    		class="test.task.process.handle.internal.DefaultTaskMessageSender">
    		<property name="jmsTemplate" ref="mapJmsTemplate" />
    	</bean>

    最后是Spring CTX启动加载文件,spring-config.xml:

    <bean
    		class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
    		<property name="locations">
    			<list>
    				<value>application.properties</value>
    				<value>redis.properties</value>
    			</list>
    		</property>
    	</bean>
    
    	<bean id="priorityChannel"
    		class="test.task.process.bean.internal.DefaultTaskPriorityChannel">
    		<property name="AUTO_CHANNEL" value="${jms.queue.static.auto}" />
    		<property name="MEDIUM_CHANNEL" value="${jms.queue.static.medium}" />
    		<property name="HIGH_CHANNEL" value="${jms.queue.static.high}" />
    		<property name="CRITICAL_CHANNEL" value="${jms.queue.static.critical}" />
    	</bean>
    	<bean class="test.task.process.utils.ApplicationContextRegister" />
    
    	<import resource="process-mapping.xml" />
    	<bean class="test.task.process.utils.ProcessEntityMap">
    		<property name="processMap" ref="processMap" />
    		<property name="processWrapperMap" ref="processWrapperMap" />
    	</bean>
    	<import resource="mq-config.xml" />
    	<import resource="consumer.xml" />
    	<import resource="jesque.xml" />
    
    	<bean id="taskEventHandler" class="test.task.event.listener.TaskProcessErrorEventHandler"></bean>
    	<bean id="channelTaskEventHandler" class="test.task.event.listener.ChannelTaskOccurredEventHandler"></bean>


    (4)启动和部署方式
    最后的打包方式是jar,启动脚本:

    java -cp "libs/*:xinhua-static-starter-0.0.1.jar" Main

    (5)ActiveMQ集群配置
    静态化消息吞吐量较大,单个MQ实例不能满足要求,系统配置了多台MQ的集群,
    消费端使用了MQ内置的failover断线重连机制:

    jms.broker.url=failover:(tcp://192.168.xx.xx:61616?wireFormat.maxInactivityDuration=0,tcp://192.168.xx.xx:61617?wireFormat.maxInactivityDuration=0)&maxReconnectDelay=1000

    采用的是单台物理机上配置多台ActiveMQ实例开放不同端口的伪集群。

    (6)配置Dubbo消费端,调用系统的静态化Dubbo服务

    四、使用消息队列异步后对系统的优化

    对静态化请求的解耦和异步处理,很好的平滑了高并发场景下短时间内大量的服务请求,比如稿件模板修改以后的批量静态化,

    经常有上万个静态化的请求,提高了系统的可用性和稳定性,对一些对优先级要求比较高的请求,通过加入到不同的队列中来处理,

    兼顾了及时性和异步性。

  • 相关阅读:
    WPF XAML之bing使用StringFormat
    C#程序以管理员权限运行
    注册表REG文件编写大全
    linux 的基本操作(编写shell 脚本)
    linux的基本操作(正则表达式)
    linux的基本操作(shell 脚本的基础知识)
    linux的基本操作(RPM包或者安装源码包)
    linux的基本操作(文件压缩与打包)
    linux的基本操作(文本编辑工具vim)
    linux的基本操作(磁盘管理)
  • 原文地址:https://www.cnblogs.com/binyue/p/3751903.html
Copyright © 2011-2022 走看看