文介绍怎样使用Flex数据推送实现前台消息订阅,是在前面Flex+BlazeDS+Spring整合基础上进行的,利用Spring来简化配置。环境准备:
1.完成Flex+BlazeDS+Spring整合
2.修改项目根目录下.flexProperties文件中serverContextRoot为项目名,否则后台接收不到前台订阅信息(此处浪费了我很多时间,一定注意)
修改配置文件
1.修改WEB-INF/flex/services-config.xml,添加如下代码:
- <channel-definitionid="my-streaming-amf"class="mx.messaging.channels.StreamingAMFChannel">
- <endpointurl="http://{server.name}:{server.port}/{context.root}/messagebroker/streamingamf"class="flex.messaging.endpoints.StreamingAMFEndpoint"/>
- <properties>
- <idle-timeout-minutes>0</idle-timeout-minutes>
- <max-streaming-clients>10</max-streaming-clients>
- <server-to-client-heartbeat-millis>5000</server-to-client-heartbeat-millis>
- <user-agent-settings>
- <user-agentmatch-on="MSIE"kickstart-bytes="2048"max-streaming-connections-per-session="1"/>
- <user-agentmatch-on="Firefox"kickstart-bytes="2048"max-streaming-connections-per-session="1"/>
- </user-agent-settings>
- </properties>
- </channel-definition>
<channel-definition id="my-streaming-amf" class="mx.messaging.channels.StreamingAMFChannel">
<endpoint url="http://{server.name}:{server.port}/{context.root}/messagebroker/streamingamf" class="flex.messaging.endpoints.StreamingAMFEndpoint"/>
<properties>
<idle-timeout-minutes>0</idle-timeout-minutes>
<max-streaming-clients>10</max-streaming-clients>
<server-to-client-heartbeat-millis>5000</server-to-client-heartbeat-millis>
<user-agent-settings>
<user-agent match-on="MSIE" kickstart-bytes="2048" max-streaming-connections-per-session="1"/>
<user-agent match-on="Firefox" kickstart-bytes="2048" max-streaming-connections-per-session="1"/>
</user-agent-settings>
</properties>
</channel-definition>
2.修改Spring配置文件applicationContext.xml,把<flex:message-broker/>替换为如下代码:
- <!-- allow-subtopics设为true,允许订阅者订阅指定主题消息 -->
- <flex:message-destinationid="data_push"allow-subtopics="true"subtopic-separator="."/>
- <flex:message-broker>
- <flex:message-servicedefault-channels="my-streaming-amf,my-polling-amf"/>
- </flex:message-broker>
- <beanid="messageTemplate"class="org.springframework.flex.messaging.MessageTemplate"/>
<!-- allow-subtopics设为true,允许订阅者订阅指定主题消息 --> <flex:message-destination id="data_push" allow-subtopics="true" subtopic-separator="."/> <flex:message-broker> <flex:message-service default-channels="my-streaming-amf,my-polling-amf"/> </flex:message-broker> <bean id="messageTemplate" class="org.springframework.flex.messaging.MessageTemplate" />
其中"my-streaming-amf,my-polling-amf"要在services-config.xml中配置,messageTemplate在代码中实现推送功能。
后台推送数据
1.添加推送数据Service,代码如下,注意msg设置的各参数,前后台及配置文件都要对应
- package demo.flex.service;
- import javax.annotation.Resource;
- import org.springframework.flex.messaging.AsyncMessageCreator;
- import org.springframework.flex.messaging.MessageTemplate;
- import org.springframework.stereotype.Service;
- import flex.messaging.messages.AsyncMessage;
- import flex.messaging.util.UUIDUtils;
- @Service
- publicclass DataPushService {
- @Resource
- private MessageTemplate messageTemplate;
- publicvoid push(String topic, Object data) {
- messageTemplate.send(new CustomAsyncMessageCreator(topic, data));
- }
- class CustomAsyncMessageCreator implements AsyncMessageCreator {
- private String topic;
- private Object data;
- public CustomAsyncMessageCreator(String topic, Object data) {
- this.topic = topic;
- this.data = data;
- }
- @Override
- public AsyncMessage createMessage() {
- AsyncMessage msg = new AsyncMessage();
- msg.setClientId(UUIDUtils.createUUID());
- msg.setMessageId(UUIDUtils.createUUID());
- msg.setTimestamp(System.currentTimeMillis());
- msg.setDestination("data_push"); // 必须和配置文件中message-destination相同
- msg.setHeader("DSSubtopic", topic); // 设置订阅主题
- msg.setBody(data);
- return msg;
- }
- }
- }
package demo.flex.service;
import javax.annotation.Resource;
import org.springframework.flex.messaging.AsyncMessageCreator;
import org.springframework.flex.messaging.MessageTemplate;
import org.springframework.stereotype.Service;
import flex.messaging.messages.AsyncMessage;
import flex.messaging.util.UUIDUtils;
@Service
public class DataPushService {
@Resource
private MessageTemplate messageTemplate;
public void push(String topic, Object data) {
messageTemplate.send(new CustomAsyncMessageCreator(topic, data));
}
class CustomAsyncMessageCreator implements AsyncMessageCreator {
private String topic;
private Object data;
public CustomAsyncMessageCreator(String topic, Object data) {
this.topic = topic;
this.data = data;
}
@Override
public AsyncMessage createMessage() {
AsyncMessage msg = new AsyncMessage();
msg.setClientId(UUIDUtils.createUUID());
msg.setMessageId(UUIDUtils.createUUID());
msg.setTimestamp(System.currentTimeMillis());
msg.setDestination("data_push"); // 必须和配置文件中message-destination相同
msg.setHeader("DSSubtopic", topic); // 设置订阅主题
msg.setBody(data);
return msg;
}
}
}
2.添加控制数据开始推送及结束推送Controller,每隔一秒推送当前系统时间
- package demo.flex.mvc;
- import java.text.SimpleDateFormat;
- import java.util.Date;
- import javax.annotation.Resource;
- import org.springframework.stereotype.Controller;
- import org.springframework.web.bind.annotation.RequestMapping;
- import demo.flex.service.DataPushService;
- @Controller
- publicclass DataPush {
- privatestatic FeedThread thread;
- @Resource
- private DataPushService dataPushService;
- @RequestMapping("push")
- public String push(String cmd) throws Exception {
- if ("start".equals(cmd)) {
- start();
- }
- else {
- stop();
- }
- return"push";
- }
- privatevoid start() {
- if (thread == null) {
- thread = new FeedThread();
- thread.start();
- }
- System.out.println("开始数据推送...");
- }
- privatevoid stop() {
- thread.running = false;
- thread = null;
- System.out.println("结束数据推送...");
- }
- publicclass FeedThread extends Thread {
- publicboolean running = true;
- private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
- publicvoid run() {
- while (running) {
- String time = sdf.format(new Date());
- System.out.println(">>>>>>>>>>>" + time);
- // time为订阅主题
- dataPushService.push("time", time);
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- }
- }
- }
- }
- }
package demo.flex.mvc;
import java.text.SimpleDateFormat;
import java.util.Date;
import javax.annotation.Resource;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import demo.flex.service.DataPushService;
@Controller
public class DataPush {
private static FeedThread thread;
@Resource
private DataPushService dataPushService;
@RequestMapping("push")
public String push(String cmd) throws Exception {
if ("start".equals(cmd)) {
start();
}
else {
stop();
}
return "push";
}
private void start() {
if (thread == null) {
thread = new FeedThread();
thread.start();
}
System.out.println("开始数据推送...");
}
private void stop() {
thread.running = false;
thread = null;
System.out.println("结束数据推送...");
}
public class FeedThread extends Thread {
public boolean running = true;
private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
public void run() {
while (running) {
String time = sdf.format(new Date());
System.out.println(">>>>>>>>>>>" + time);
// time为订阅主题
dataPushService.push("time", time);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
}
}
}
}
3.为使DataPush能够正常运行,修改web.xml及applicationContext.xml配置,添加如下代码:
- <!-- mvc mapping-->
- <servlet-mapping>
- <servlet-name>Spring MVC Dispatcher Servlet</servlet-name>
- <url-pattern>/mvc/*</url-pattern>
- </servlet-mapping>
<!-- mvc mapping--> <servlet-mapping> <servlet-name>Spring MVC Dispatcher Servlet</servlet-name> <url-pattern>/mvc/*</url-pattern> </servlet-mapping>
- <mvc:annotation-driven/>
- lt;bean class="org.springframework.web.servlet.view.InternalResourceViewResolver">
- <propertyname="prefix"value="/WEB-INF/jsp/"/>
- <propertyname="suffix"value=".jsp"/>
- <propertyname="viewClass"value="org.springframework.web.servlet.view.JstlView"/>
- lt;/bean>
<mvc:annotation-driven /> <bean class="org.springframework.web.servlet.view.InternalResourceViewResolver"> <property name="prefix" value="/WEB-INF/jsp/"/> <property name="suffix" value=".jsp"/> <property name="viewClass" value="org.springframework.web.servlet.view.JstlView" /> </bean>
前台订阅功能
前台FlexDemo.mxml中添加如下代码,订阅主题为time的消息,用于显示后台系统时间
- <s:GrouphorizontalCenter="0">
- <s:Labeltext="系统时间:"/>
- <s:TextInputid="timeText"text="systime"editable="false"x="80"y="0"width="200"/>
- <s:Buttonlabel="显示时间"x="60"y="40"click="showTimeHandler(event)"/>
- <s:Buttonlabel="隐藏时间"x="155"y="40"click="hideTimeHandler(event)"/>
- </s:Group>
<s:Group horizontalCenter="0"> <s:Label text="系统时间:"/> <s:TextInput id="timeText" text="systime" editable="false" x="80" y="0" width="200"/> <s:Button label="显示时间" x="60" y="40" click="showTimeHandler(event)"/> <s:Button label="隐藏时间" x="155" y="40" click="hideTimeHandler(event)"/> </s:Group>
- privatevar consumer:Consumer;
- protectedfunction showTimeHandler(event:MouseEvent):void
- {
- if (consumer == null) {
- consumer = new Consumer();
- // consumer设置的参数要与后台配置对应
- consumer.destination = "data_push";
- consumer.channelSet = new ChannelSet(["my-streaming-amf","my-polling-amf"]);
- // 只能收到主题为time的订阅消息
- consumer.subtopic = "time";
- //添加message的监听,当后台有消息发送时,调用messageHandler
- consumer.addEventListener(MessageEvent.MESSAGE, messageHandler);
- // 订阅
- consumer.subscribe();
- }
- }
- protectedfunction hideTimeHandler(event:MouseEvent):void
- {
- if (consumer != null) {
- // 取消订阅
- consumer.unsubscribe();
- consumer.removeEventListener(MessageEvent.MESSAGE, messageHandler);
- consumer = null;
- timeText.text = "systime";
- }
- }
- privatefunction messageHandler(event:MessageEvent):void
- {
- var time:String = event.message.body as String;
- timeText.text = time;
- }
private var consumer:Consumer;
protected function showTimeHandler(event:MouseEvent):void
{
if (consumer == null) {
consumer = new Consumer();
// consumer设置的参数要与后台配置对应
consumer.destination = "data_push";
consumer.channelSet = new ChannelSet(["my-streaming-amf","my-polling-amf"]);
// 只能收到主题为time的订阅消息
consumer.subtopic = "time";
//添加message的监听,当后台有消息发送时,调用messageHandler
consumer.addEventListener(MessageEvent.MESSAGE, messageHandler);
// 订阅
consumer.subscribe();
}
}
protected function hideTimeHandler(event:MouseEvent):void
{
if (consumer != null) {
// 取消订阅
consumer.unsubscribe();
consumer.removeEventListener(MessageEvent.MESSAGE, messageHandler);
consumer = null;
timeText.text = "systime";
}
}
private function messageHandler(event:MessageEvent):void
{
var time:String = event.message.body as String;
timeText.text = time;
}
消息订阅也可以采用MultiTopicProducer,能够在单个消息处理程序中同时订阅主题,通过addSubscription(topic)方法订阅主题,removeSubscription(topic)取消订阅。Consumer只能订阅单个主题。
所有代码已写完,启动项目进行测试!
1.订阅消息:打开链接http://localhost:8080/FlexDemo/FlexDemo.html,点击显示时间订阅time消息,点击隐藏时间取消订阅
2.推送消息:打开链接http://localhost:8080/FlexDemo/mvc/push?cmd=start推送消息,http://localhost:8080/FlexDemo/mvc/push?cmd=stop停止推送
经过测试,my-streaming-amf、my-polling-amf两个通道都可以实现消息推送订阅,但my-streaming-amf发送订阅请求后会一直维护这一个请求直到取消订阅,my-polling-amf会根据配置文件中设定的时间间隔,每过一段时间发送一次订阅请求,获得后台订阅的信息。
最后附完整代码下载地址:http://download.csdn.net/detail/sjepy/5522399