zoukankan      html  css  js  c++  java
  • Flex数据推送

    文介绍怎样使用Flex数据推送实现前台消息订阅,是在前面Flex+BlazeDS+Spring整合基础上进行的,利用Spring来简化配置。环境准备:

                    1.完成Flex+BlazeDS+Spring整合

                    2.修改项目根目录下.flexProperties文件中serverContextRoot为项目名,否则后台接收不到前台订阅信息(此处浪费了我很多时间,一定注意)

            修改配置文件

            1.修改WEB-INF/flex/services-config.xml,添加如下代码:

    1. <channel-definitionid="my-streaming-amf"class="mx.messaging.channels.StreamingAMFChannel"> 
    2.     <endpointurl="http://{server.name}:{server.port}/{context.root}/messagebroker/streamingamf"class="flex.messaging.endpoints.StreamingAMFEndpoint"/> 
    3.     <properties> 
    4.         <idle-timeout-minutes>0</idle-timeout-minutes> 
    5.         <max-streaming-clients>10</max-streaming-clients> 
    6.         <server-to-client-heartbeat-millis>5000</server-to-client-heartbeat-millis> 
    7.         <user-agent-settings> 
    8.             <user-agentmatch-on="MSIE"kickstart-bytes="2048"max-streaming-connections-per-session="1"/> 
    9.             <user-agentmatch-on="Firefox"kickstart-bytes="2048"max-streaming-connections-per-session="1"/> 
    10.         </user-agent-settings> 
    11.     </properties> 
    12. </channel-definition> 
    <channel-definition id="my-streaming-amf" class="mx.messaging.channels.StreamingAMFChannel">
    	<endpoint url="http://{server.name}:{server.port}/{context.root}/messagebroker/streamingamf" class="flex.messaging.endpoints.StreamingAMFEndpoint"/>
    	<properties>
    		<idle-timeout-minutes>0</idle-timeout-minutes>
    		<max-streaming-clients>10</max-streaming-clients>
    		<server-to-client-heartbeat-millis>5000</server-to-client-heartbeat-millis>
    		<user-agent-settings>
    			<user-agent match-on="MSIE" kickstart-bytes="2048" max-streaming-connections-per-session="1"/>
    			<user-agent match-on="Firefox" kickstart-bytes="2048" max-streaming-connections-per-session="1"/>
    		</user-agent-settings>
    	</properties>
    </channel-definition>

            2.修改Spring配置文件applicationContext.xml,把<flex:message-broker/>替换为如下代码:

    1.     <!-- allow-subtopics设为true,允许订阅者订阅指定主题消息 --> 
    2.     <flex:message-destinationid="data_push"allow-subtopics="true"subtopic-separator="."/> 
    3. <flex:message-broker> 
    4.     <flex:message-servicedefault-channels="my-streaming-amf,my-polling-amf"/> 
    5. </flex:message-broker> 
    6.      
    7.     <beanid="messageTemplate"class="org.springframework.flex.messaging.MessageTemplate"/> 
     	<!-- allow-subtopics设为true,允许订阅者订阅指定主题消息 -->
     	<flex:message-destination id="data_push" allow-subtopics="true" subtopic-separator="."/>
    	<flex:message-broker>
    		<flex:message-service default-channels="my-streaming-amf,my-polling-amf"/>
    	</flex:message-broker>
     	
     	<bean id="messageTemplate" class="org.springframework.flex.messaging.MessageTemplate" />

    其中"my-streaming-amf,my-polling-amf"要在services-config.xml中配置,messageTemplate在代码中实现推送功能。

            后台推送数据

            1.添加推送数据Service,代码如下,注意msg设置的各参数,前后台及配置文件都要对应

    1. package demo.flex.service; 
    2.  
    3. import javax.annotation.Resource; 
    4.  
    5. import org.springframework.flex.messaging.AsyncMessageCreator; 
    6. import org.springframework.flex.messaging.MessageTemplate; 
    7. import org.springframework.stereotype.Service; 
    8.  
    9. import flex.messaging.messages.AsyncMessage; 
    10. import flex.messaging.util.UUIDUtils; 
    11.  
    12. @Service 
    13. publicclass DataPushService { 
    14.  
    15.     @Resource 
    16.     private MessageTemplate messageTemplate; 
    17.      
    18.     publicvoid push(String topic, Object data) { 
    19.         messageTemplate.send(new CustomAsyncMessageCreator(topic, data)); 
    20.     } 
    21.      
    22.     class CustomAsyncMessageCreator implements AsyncMessageCreator { 
    23.          
    24.         private String topic; 
    25.         private Object data; 
    26.          
    27.         public CustomAsyncMessageCreator(String topic, Object data) { 
    28.             this.topic = topic; 
    29.             this.data = data; 
    30.         } 
    31.          
    32.         @Override 
    33.         public AsyncMessage createMessage() { 
    34.             AsyncMessage msg = new AsyncMessage(); 
    35.              
    36.             msg.setClientId(UUIDUtils.createUUID()); 
    37.             msg.setMessageId(UUIDUtils.createUUID()); 
    38.             msg.setTimestamp(System.currentTimeMillis()); 
    39.             msg.setDestination("data_push");    // 必须和配置文件中message-destination相同 
    40.             msg.setHeader("DSSubtopic", topic); // 设置订阅主题 
    41.             msg.setBody(data); 
    42.              
    43.             return msg; 
    44.         } 
    45.     } 
    package demo.flex.service;
    
    import javax.annotation.Resource;
    
    import org.springframework.flex.messaging.AsyncMessageCreator;
    import org.springframework.flex.messaging.MessageTemplate;
    import org.springframework.stereotype.Service;
    
    import flex.messaging.messages.AsyncMessage;
    import flex.messaging.util.UUIDUtils;
    
    @Service
    public class DataPushService {
    
    	@Resource
    	private MessageTemplate messageTemplate;
    	
    	public void push(String topic, Object data) {
    		messageTemplate.send(new CustomAsyncMessageCreator(topic, data));
    	}
    	
    	class CustomAsyncMessageCreator implements AsyncMessageCreator {
    		
    		private String topic;
    		private Object data;
    		
    		public CustomAsyncMessageCreator(String topic, Object data) {
    			this.topic = topic;
    			this.data = data;
    		}
    		
    		@Override
    		public AsyncMessage createMessage() {
    			AsyncMessage msg = new AsyncMessage();
    			
    			msg.setClientId(UUIDUtils.createUUID());
    			msg.setMessageId(UUIDUtils.createUUID());
    			msg.setTimestamp(System.currentTimeMillis());
    			msg.setDestination("data_push");	// 必须和配置文件中message-destination相同
    			msg.setHeader("DSSubtopic", topic);	// 设置订阅主题
    			msg.setBody(data);
    			
    			return msg;
    		}
    	}
    }
    

            2.添加控制数据开始推送及结束推送Controller,每隔一秒推送当前系统时间

    1. package demo.flex.mvc; 
    2.  
    3. import java.text.SimpleDateFormat; 
    4. import java.util.Date; 
    5.  
    6. import javax.annotation.Resource; 
    7.  
    8. import org.springframework.stereotype.Controller; 
    9. import org.springframework.web.bind.annotation.RequestMapping; 
    10.  
    11. import demo.flex.service.DataPushService; 
    12.  
    13. @Controller 
    14. publicclass DataPush { 
    15.  
    16.     privatestatic FeedThread thread; 
    17.      
    18.     @Resource 
    19.     private DataPushService dataPushService; 
    20.      
    21.     @RequestMapping("push"
    22.     public String push(String cmd) throws Exception { 
    23.          
    24.         if ("start".equals(cmd)) { 
    25.             start(); 
    26.         } 
    27.         else
    28.             stop(); 
    29.         } 
    30.          
    31.         return"push"
    32.     } 
    33.      
    34.     privatevoid start() { 
    35.         if (thread == null) { 
    36.             thread = new FeedThread(); 
    37.             thread.start(); 
    38.         } 
    39.         System.out.println("开始数据推送..."); 
    40.     } 
    41.  
    42.     privatevoid stop() { 
    43.         thread.running = false
    44.         thread = null
    45.         System.out.println("结束数据推送..."); 
    46.     } 
    47.      
    48.     publicclass FeedThread extends Thread { 
    49.         publicboolean running = true
    50.         private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); 
    51.  
    52.         publicvoid run() { 
    53.  
    54.             while (running) { 
    55.                 String time = sdf.format(new Date()); 
    56.                 System.out.println(">>>>>>>>>>>" + time); 
    57.                  
    58.                 // time为订阅主题 
    59.                 dataPushService.push("time", time); 
    60.                  
    61.                 try
    62.                     Thread.sleep(1000); 
    63.                 } catch (InterruptedException e) { 
    64.                 } 
    65.             } 
    66.         } 
    67.     } 
    package demo.flex.mvc;
    
    import java.text.SimpleDateFormat;
    import java.util.Date;
    
    import javax.annotation.Resource;
    
    import org.springframework.stereotype.Controller;
    import org.springframework.web.bind.annotation.RequestMapping;
    
    import demo.flex.service.DataPushService;
    
    @Controller
    public class DataPush {
    
    	private static FeedThread thread;
    	
    	@Resource
    	private DataPushService dataPushService;
    	
    	@RequestMapping("push")
    	public String push(String cmd) throws Exception {
    		
    		if ("start".equals(cmd)) {
    			start();
    		}
    		else {
    			stop();
    		}
    		
    		return "push";
    	}
    	
    	private void start() {
    		if (thread == null) {
    			thread = new FeedThread();
    			thread.start();
    		}
    		System.out.println("开始数据推送...");
    	}
    
    	private void stop() {
    		thread.running = false;
    		thread = null;
    		System.out.println("结束数据推送...");
    	}
    	
    	public class FeedThread extends Thread {
    		public boolean running = true;
    		private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    
    		public void run() {
    
    			while (running) {
    				String time = sdf.format(new Date());
    				System.out.println(">>>>>>>>>>>" + time);
    				
    				// time为订阅主题
    				dataPushService.push("time", time);
    				
    				try {
    					Thread.sleep(1000);
    				} catch (InterruptedException e) {
    				}
    			}
    		}
    	}
    }
    

            3.为使DataPush能够正常运行,修改web.xml及applicationContext.xml配置,添加如下代码:

    1. <!-- mvc mapping--> 
    2. <servlet-mapping> 
    3.     <servlet-name>Spring MVC Dispatcher Servlet</servlet-name> 
    4.     <url-pattern>/mvc/*</url-pattern> 
    5. </servlet-mapping> 
    	<!-- mvc mapping-->
    	<servlet-mapping>
    		<servlet-name>Spring MVC Dispatcher Servlet</servlet-name>
    		<url-pattern>/mvc/*</url-pattern>
    	</servlet-mapping>
    1. <mvc:annotation-driven/> 
    2. lt;bean class="org.springframework.web.servlet.view.InternalResourceViewResolver">   
    3. <propertyname="prefix"value="/WEB-INF/jsp/"/>   
    4. <propertyname="suffix"value=".jsp"/>  
    5. <propertyname="viewClass"value="org.springframework.web.servlet.view.JstlView"/> 
    6. lt;/bean>  
     	<mvc:annotation-driven />
    	<bean class="org.springframework.web.servlet.view.InternalResourceViewResolver">  
    		<property name="prefix" value="/WEB-INF/jsp/"/>  
    		<property name="suffix" value=".jsp"/> 
    		<property name="viewClass" value="org.springframework.web.servlet.view.JstlView" />
    	</bean> 

            前台订阅功能

            前台FlexDemo.mxml中添加如下代码,订阅主题为time的消息,用于显示后台系统时间

    1. <s:GrouphorizontalCenter="0"> 
    2.     <s:Labeltext="系统时间:"/> 
    3.     <s:TextInputid="timeText"text="systime"editable="false"x="80"y="0"width="200"/> 
    4.     <s:Buttonlabel="显示时间"x="60"y="40"click="showTimeHandler(event)"/> 
    5.     <s:Buttonlabel="隐藏时间"x="155"y="40"click="hideTimeHandler(event)"/> 
    6. </s:Group>         
    		<s:Group horizontalCenter="0">
    			<s:Label text="系统时间:"/>
    			<s:TextInput id="timeText" text="systime" editable="false" x="80" y="0" width="200"/>
    			<s:Button label="显示时间" x="60" y="40" click="showTimeHandler(event)"/>
    			<s:Button label="隐藏时间" x="155" y="40" click="hideTimeHandler(event)"/>
    		</s:Group>		
    1. privatevar consumer:Consumer; 
    2.  
    3. protectedfunction showTimeHandler(event:MouseEvent):void 
    4.     if (consumer == null) { 
    5.         consumer = new Consumer();  
    6.      
    7.         // consumer设置的参数要与后台配置对应 
    8.         consumer.destination = "data_push";  
    9.         consumer.channelSet = new ChannelSet(["my-streaming-amf","my-polling-amf"]); 
    10.          
    11.         // 只能收到主题为time的订阅消息 
    12.         consumer.subtopic = "time";  
    13.          
    14.         //添加message的监听,当后台有消息发送时,调用messageHandler  
    15.         consumer.addEventListener(MessageEvent.MESSAGE, messageHandler);  
    16.          
    17.         // 订阅 
    18.         consumer.subscribe(); 
    19.     } 
    20.  
    21. protectedfunction hideTimeHandler(event:MouseEvent):void 
    22.     if (consumer != null) { 
    23.          
    24.         // 取消订阅 
    25.         consumer.unsubscribe(); 
    26.         consumer.removeEventListener(MessageEvent.MESSAGE, messageHandler);  
    27.         consumer = null
    28.         timeText.text = "systime";  
    29.     } 
    30.  
    31. privatefunction messageHandler(event:MessageEvent):void  
    32. {  
    33.     var time:String = event.message.body as String;  
    34.     timeText.text = time;  
    			private var consumer:Consumer;
    			
    			protected function showTimeHandler(event:MouseEvent):void
    			{
    				if (consumer == null) {
    					consumer = new Consumer(); 
    				
    					// consumer设置的参数要与后台配置对应
    					consumer.destination = "data_push"; 
    					consumer.channelSet = new ChannelSet(["my-streaming-amf","my-polling-amf"]);
    					
    					// 只能收到主题为time的订阅消息
    					consumer.subtopic = "time"; 
    					
    					//添加message的监听,当后台有消息发送时,调用messageHandler 
    					consumer.addEventListener(MessageEvent.MESSAGE, messageHandler); 
    					
    					// 订阅
    					consumer.subscribe();
    				}
    			}
    			
    			protected function hideTimeHandler(event:MouseEvent):void
    			{
    				if (consumer != null) {
    					
    					// 取消订阅
    					consumer.unsubscribe();
    					consumer.removeEventListener(MessageEvent.MESSAGE, messageHandler); 
    					consumer = null;
    					timeText.text = "systime"; 
    				}
    			}
    
    			private function messageHandler(event:MessageEvent):void 
    			{ 
    				var time:String = event.message.body as String; 
    				timeText.text = time; 
    			}

    消息订阅也可以采用MultiTopicProducer,能够在单个消息处理程序中同时订阅主题,通过addSubscription(topic)方法订阅主题,removeSubscription(topic)取消订阅。Consumer只能订阅单个主题。

            所有代码已写完,启动项目进行测试!

            1.订阅消息:打开链接http://localhost:8080/FlexDemo/FlexDemo.html,点击显示时间订阅time消息,点击隐藏时间取消订阅

            2.推送消息:打开链接http://localhost:8080/FlexDemo/mvc/push?cmd=start推送消息,http://localhost:8080/FlexDemo/mvc/push?cmd=stop停止推送

            经过测试,my-streaming-amf、my-polling-amf两个通道都可以实现消息推送订阅,但my-streaming-amf发送订阅请求后会一直维护这一个请求直到取消订阅,my-polling-amf会根据配置文件中设定的时间间隔,每过一段时间发送一次订阅请求,获得后台订阅的信息。

            最后附完整代码下载地址:http://download.csdn.net/detail/sjepy/5522399

  • 相关阅读:
    vue-fullcalendar插件
    iframe 父框架调用子框架的函数
    关于调试的一点感想
    hdfs 删除和新增节点
    hadoop yarn 实战错误汇总
    Ganglia 安装 No package 'ck' found
    storm on yarn(CDH5) 部署笔记
    spark on yarn 安装笔记
    storm on yarn安装时 提交到yarn失败 failed
    yarn storm spark
  • 原文地址:https://www.cnblogs.com/regalys168/p/3627732.html
Copyright © 2011-2022 走看看