zoukankan      html  css  js  c++  java
  • 【pushlet学习】具体实战

    业务需求
    1. 前端界面需要实时显示空调、照明等设备的状态, 如:空调电压、空调电流、光照强度等,这些量每一个称作一个测点;
    2. 不同的用户登录系统后,用户只能看到自己设备的运行状态,而看不到其他人设备的运行状态;
    3. 由于每个用户的设备类型、种类、个数等都不相同,因此每个用户需要查询测点也不相同;
    4. 当多个用户同时登陆系统时,其实就是在多个浏览器上打开多个浏览界面,去查看自己设备运行状态,
    即:多个浏览器上的多个界面对后台请求的测点是不同的,例如:
    用户1:<测点1,测点2,测点3,测点4,.....>;
    用户2:<测点21,测点22,测点23,测点24,.....>;
    用户3:<测点31,测点32,测点33,测点34,.....>;
    用户n:<测点n1,测点n2,测点n3,测点n4,.....>;


    功能需求:

    采用传统的“请求/响应”方式,很难达到前台界面实时显示最新数据,为达到实时显示最新数据,我们采用一种“服务器推”的技术comet,而pushlet是“服务器推”技术的一种实现,这里我们采用pushlet技术来实现上述业务需求。
    1. 假设后台的测点数据是实时变化的,且保存在HashMap中,即:HashMap<测点名,测点值>;当测点值发生变化时,后台就会实时更新Hashmap中对应的测点值,即Hashmap中保存的始终是实时数据;
    2. 每隔固定的分钟数(如:5分钟),后台就会向前台推送最新数据,前台界面实时更新显示;

    具体实现:

    1. 前台界面打开时,会将测点名称集合以及主题名传递到后台,格式形如:{[测点1,测点2,测点3,....],subject};并在前台开启对此主题的监听;(注:主题名是动态随机生成的,每个界面的主题名都保证不相同)
    2. 后台解析从前台传递来的{测点名称集合+主题名},并根据主题名创建响应的事件源(EventPollSource);需要说明的是:每个事件源(主题名)对应一个Thread,即前端打开m个界面,后台就会有m个对应的Thread,每个Thread处理一个事件源。
    3. 当前端浏览器界面关闭时,后台能检测到关闭的界面对应的事件源(Thread),并将对应的Thread释放掉。



    示例程序

    环境如下:



    TestServlet.java

    主要功能:
    1. 获取前台传递过来的测点名称数组ArrayList<Object> keyList主题名称aSubject
    2. 根据测点名称数组主题名称开启一个新Thread,在Thread中处理业务;
    PushThread pushThread = new PushThread(aSubject, keyList);
    3. 每个session(或界面)对应一个Thread;

    1. package com.guoguo;
    2. import java.io.BufferedInputStream;
    3. import java.io.IOException;
    4. import java.io.InputStream;
    5. import java.util.ArrayList;
    6. import javax.servlet.ServletException;
    7. import javax.servlet.http.HttpServlet;
    8. import javax.servlet.http.HttpServletRequest;
    9. import javax.servlet.http.HttpServletResponse;
    10. import org.stringtree.json.JSONReader;
    11. import org.stringtree.json.JSONValidatingReader;
    12. public class TestServlet extends HttpServlet {
    13. private static final long serialVersionUID = 1L;
    14. public TestServlet() {
    15. super();
    16. }
    17. /**
    18. * get/post方法的处理函数
    19. */
    20. protected void service(HttpServletRequest request,
    21. HttpServletResponse response) throws ServletException, IOException {
    22. // 读取请求报文数据
    23. request.setCharacterEncoding("UTF-8");
    24. // 获取请求的数据
    25. StringBuffer reqData = new StringBuffer();
    26. InputStream in = request.getInputStream();
    27. BufferedInputStream buf = new BufferedInputStream(in);
    28. byte[] buffer = new byte[1024];
    29. int iRead;
    30. while ((iRead = buf.read(buffer)) != -1) {
    31. reqData.append(new String(buffer, 0, iRead, "UTF-8"));
    32. }
    33. // 获取请求的测点名称数组
    34. JSONReader r = new JSONValidatingReader();
    35. @SuppressWarnings("unchecked")
    36. ArrayList<Object> keyList = (ArrayList<Object>) r.read(reqData
    37. .toString());
    38. // 获取订阅主题名称
    39. String aSubject = request.getParameter("subject");
    40. System.out.println("请求的测点:" + reqData.toString() + " , 主题名:" + aSubject);
    41. // 启动一个线程,实现创建Pushlet事件、做业务、向前台推送数据等功能
    42. PushThread pushThread = new PushThread(aSubject, keyList);
    43. pushThread.start();
    44. }
    45. }

    PushThread.java

    主要功能:
    1. 这是一个Thread类,该类有两个属性:主题String aSubject和关键字列表 ArrayList<Object> keyList;
    2. 线程首先根据主题名创建事件源:Event event = Event.createDataEvent(aSubject);
    3. 线程运行的时候,会监测会话状态以及事件订阅情况,并基于此判断线程是否需要结束。
    若session关闭或浏览器关闭,则线程退出;
    若有会话订阅该事件源,线程则进行业务处理,处理过程如下:
    1. 获取各测点的值;
    2. 将各测点的值组装成字符串;
    3. 将该字符串设置为事件源的属性。
    4.然后以广播的形式将事件发送出去,Dispatcher.getInstance().multicast(event);
    5. 经测试:Thread的run()函数执行结束后,Thread就自动退出了,系统会释放该Thread的资源。

    1. package com.guoguo;
    2. import java.text.SimpleDateFormat;
    3. import java.util.ArrayList;
    4. import java.util.Date;
    5. import java.util.HashMap;
    6. import java.util.Random;
    7. import org.stringtree.json.JSONValidatingWriter;
    8. import nl.justobjects.pushlet.core.Dispatcher;
    9. import nl.justobjects.pushlet.core.Event;
    10. import nl.justobjects.pushlet.core.Session;
    11. import nl.justobjects.pushlet.core.SessionManager;
    12. public class PushThread extends Thread {
    13. // 主题
    14. public String aSubject; // 客户端传递过来
    15. // 关键字列表
    16. public ArrayList<Object> keyList; // 客户端传递过来
    17. /**
    18. * 构造函数
    19. *
    20. * @param aSubject
    21. * @param keyList
    22. */
    23. public PushThread(String aSubject, ArrayList<Object> keyList) {
    24. this.aSubject = aSubject;
    25. this.keyList = keyList;
    26. }
    27. @Override
    28. public void run() {
    29. Event event = Event.createDataEvent(aSubject);
    30. int i = 0;
    31. while (true) {
    32. try {
    33. Thread.sleep(5000);
    34. } catch (InterruptedException e) {
    35. // 线程阻塞,结束线程
    36. System.out.println("=========>sleep异常 --->" + "线程"
    37. + Thread.currentThread().getId() + "关闭");
    38. break;
    39. }
    40. System.out.println(" -----Thread ID: "
    41. + Thread.currentThread().getId());
    42. // 判断当前连接的会话个数,没有会话,则线程退出
    43. Session[] sessions = SessionManager.getInstance().getSessions();
    44. // 当前无会话,结束线程
    45. if (0 == sessions.length) {
    46. System.out.println("=========>无sessions --->" + "线程"
    47. + Thread.currentThread().getId() + "关闭");
    48. break;
    49. }
    50. // 判断当前会话中是否存在订阅该主题的订阅者,不存在则结束线程
    51. boolean if_exist_subscriber = true;
    52. // 遍历所有session
    53. for (int j = 0; j < sessions.length; j++) {
    54. System.out
    55. .println(sessions[j].getSubscriber().match(event) == null ? "Session"
    56. + j + ": 未订阅该事件 "
    57. : "Session" + j + ":订阅了该事件 ");
    58. if (null != sessions[j].getSubscriber().match(event)) {
    59. if_exist_subscriber = false;
    60. }
    61. }
    62. if (if_exist_subscriber) {
    63. System.out.println("=========>无"+aSubject+"订阅者 --->" + "线程" + Thread.currentThread().getId() + "关闭");
    64. break;
    65. }
    66. // 模拟业务处理:获取各测点的值
    67. HashMap<Object, Object> ret_value = new HashMap<Object, Object>();
    68. for (Object keyStr : keyList) {
    69. SimpleDateFormat df = new SimpleDateFormat("HH:mm:ss");// 设置日期格式
    70. String currTm = df.format(new Date());
    71. ret_value.put(keyStr, currTm);
    72. // ret_value.put(keyStr, (10*(new Random().nextFloat())));
    73. }
    74. // 将返回值封装为json数据形式
    75. String ret_string = "[";
    76. ret_string += new JSONValidatingWriter().write(ret_value);
    77. ret_string += "]";
    78. event.setField("key1", ret_string);
    79. // 推送消息
    80. Dispatcher.getInstance().multicast(event); // 向所有和event名称匹配的事件推送
    81. }
    82. }
    83. }



    sources.properties 本测试程序不需要在该文件中配置任何东西

    1. #
    2. # Properties file for EventSource objects to be instantiated.
    3. #
    4. # Place this file in the CLASSPATH (e.g. WEB-INF/classes) or directly under WEB-INF.
    5. #
    6. # $Id: sources.properties,v 1.2 2007/11/10 14:12:16 justb Exp $
    7. #
    8. # Each EventSource is defined as <key>=<classname>
    9. # 1. <key> should be unique within this file but may be any name
    10. # 2. <classname> is the full class name
    11. #
    12. #
    13. # Define Pull Sources here. These classes must be derived from
    14. # nl.justobjects.pushlet.core.EventPullSource
    15. # Inner classes are separated with a $ sign from the outer class.
    16. source1=nl.justobjects.pushlet.test.TestEventPullSources$TemperatureEventPullSource
    17. source2=nl.justobjects.pushlet.test.TestEventPullSources$SystemStatusEventPullSource
    18. source3=nl.justobjects.pushlet.test.TestEventPullSources$PushletStatusEventPullSource
    19. source4=nl.justobjects.pushlet.test.TestEventPullSources$AEXStocksEventPullSource
    20. source5=nl.justobjects.pushlet.test.TestEventPullSources$WebPresentationEventPullSource
    21. source6=nl.justobjects.pushlet.test.TestEventPullSources$PingEventPullSource
    22. source7=nl.justobjects.pushlet.test.TestEventPullSources$MyEventPullSource
    23. source8=nl.justobjects.pushlet.test.TestEventPullSources$SpEventPullSource
    24. # TO BE DONE IN NEXT VERSION
    25. # define Push Sources here. These must implement the interface
    26. # nl.justobjects.pushlet.core.EventSource


    web.xml

    这里主要是配置servlet:TestServlet,其他servlet用不到

    1. <?xml version="1.0" encoding="UTF-8"?>
    2. <web-app>
    3. <!-- Define the pushlet servlet -->
    4. <servlet>
    5. <servlet-name>pushlet</servlet-name>
    6. <servlet-class>nl.justobjects.pushlet.servlet.Pushlet</servlet-class>
    7. <load-on-startup>1</load-on-startup>
    8. </servlet>
    9. <servlet-mapping>
    10. <servlet-name>pushlet</servlet-name>
    11. <url-pattern>/pushlet.srv</url-pattern>
    12. </servlet-mapping>
    13. <servlet>
    14. <display-name>ChatServlet</display-name>
    15. <servlet-name>ChatServlet</servlet-name>
    16. <servlet-class>com.guoguo.ChatServlet</servlet-class>
    17. </servlet>
    18. <servlet-mapping>
    19. <servlet-name>ChatServlet</servlet-name>
    20. <url-pattern>/ChatServlet</url-pattern>
    21. </servlet-mapping>
    22. <servlet>
    23. <display-name>TestServlet</display-name>
    24. <servlet-name>TestServlet</servlet-name>
    25. <servlet-class>com.guoguo.TestServlet</servlet-class>
    26. </servlet>
    27. <servlet-mapping>
    28. <servlet-name>TestServlet</servlet-name>
    29. <url-pattern>/TestServlet</url-pattern>
    30. </servlet-mapping>
    31. </web-app>



    receive.jsp

    前台界面,主要功能如下:
    1. 负责生成随机主题名测点集合
    2. 页面初始化时,将主题名测点集合发送到后台,并开起pushlet监听;
    PL._init();
    PL.joinListen(aSubject);
    3. 编写onData()函数,用于处理“服务器推”送到前台的数据,并显示在页面上;
    4. 同时编写了“取消订阅”按钮,点击时,前台主动“取消订阅”,后台服务器接收到后,就会释放与此对应的Thread。

    1. <%@ page language="java" import="java.util.*" pageEncoding="UTF-8"%>
    2. <%
    3. String path = request.getContextPath();
    4. String basePath = request.getScheme() + "://"
    5. + request.getServerName() + ":" + request.getServerPort()
    6. + path + "/";
    7. %>
    8. <!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN">
    9. <html>
    10. <head>
    11. <base href="<%=basePath%>">
    12. <meta http-equiv="pragma" content="no-cache">
    13. <meta http-equiv="cache-control" content="no-cache">
    14. <meta http-equiv="expires" content="0">
    15. <meta http-equiv="keywords" content="keyword1,keyword2,keyword3">
    16. <meta http-equiv="description" content="This is my page">
    17. <script type="text/javascript"
    18. src="<%=basePath%>js/pushlet_js/ajax-pushlet-client.js"></script>
    19. <script type="text/javascript">
    20. var subscriptionId = null;
    21. window.onload = onInit;
    22. window.onbeforeunload = onUnsubscribe;
    23. // 监听后台返回的数据信息,更新页面
    24. function onData(event) {
    25. // 保存订阅编号,用于页面关闭时进行退订
    26. subscriptionId = event.get('p_sid');
    27. // 更新页面
    28. document.dataEventDisplay.event.value = event.get("key1");
    29. // ------实际案例处理(test)---------
    30. /* var respData = decodeURIComponent(event.get("key1"));
    31. var respObj = eval(respData);
    32. //var respActionNum = respObj.length;
    33. var obj = respObj[0];
    34. var str = "";
    35. for(var p in obj){
    36. if(typeof(obj[p]) != "function"){
    37. str += p + "=" + obj[p] + ", " ;
    38. }
    39. }
    40. alert(str); */
    41. }
    42. // 页面关闭时,取消订阅
    43. function onUnsubscribe() {
    44. if (subscriptionId != null) {
    45. PL.unsubscribe(subscriptionId);
    46. }
    47. }
    48. // 页面加载完,初始化请求、监听
    49. function onInit() {
    50. var aSubject = _getRandomString(6); //主题名
    51. var httpRequest = getXMLHttpRequest();
    52. if (httpRequest) {
    53. var reqData = getData();
    54. httpRequest.onreadystatechange = function() {
    55. if (httpRequest.readyState == 4) {
    56. if (httpRequest.status == 200) {
    57. // 请求成功,起pushlet监听
    58. PL._init();
    59. PL.joinListen(aSubject);
    60. } else {
    61. alert("实时请求失败! " + httpRequest.statusText);
    62. }
    63. }
    64. }
    65. url = '<%=request.getContextPath()%>' + '/TestServlet'
    66. + '?subject=' + aSubject;
    67. httpRequest.open("POST", url, true);
    68. httpRequest.send(reqData);
    69. }
    70. }
    71. // 请求关键字
    72. function getData() {
    73. var reqData = "[30200000001010, 30200000001012, 30800000003009, 30800000006009]";
    74. return reqData;
    75. }
    76. // 获取http请求
    77. function getXMLHttpRequest() {
    78. req = false;
    79. //本地XMLHttpRequest对象
    80. if (window.XMLHttpRequest) {
    81. try {
    82. req = new XMLHttpRequest();
    83. } catch (e) {
    84. req = false;
    85. }
    86. //IE/Windows ActiveX版本
    87. } else if (window.ActiveXObject) {
    88. try {
    89. req = new ActiveXObject("Msxml2.XMLHTTP");
    90. } catch (e) {
    91. try {
    92. req = new ActiveXObject("Microsoft.XMLHTTP");
    93. } catch (e) {
    94. req = false;
    95. }
    96. }
    97. }
    98. return req;
    99. }
    100. // 获取长度为len的随机字符串
    101. function _getRandomString(len) {
    102. var len = len || 32;
    103. var chars = 'ABCDEFGHJKMNPQRSTWXYZabcdefhijkmnprstwxyz';
    104. var maxPos = chars.length;
    105. var pwd = '';
    106. for (i = 0; i < len; i++) {
    107. pwd += chars.charAt(Math.floor(Math.random() * maxPos));
    108. }
    109. return pwd;
    110. }
    111. </script>
    112. </head>
    113. <body>
    114. <form name="dataEventDisplay">
    115. <table border="2" bordercolor="white" cellpadding="0" cellspacing="0">
    116. <tr>
    117. <td><textarea cols="60" rows="10" name="event">没有消息 </textarea></td>
    118. </tr>
    119. </table>
    120. </form>
    121. <button onclick="onUnsubscribe()">取消订阅</button>
    122. </body>
    123. </html>


    测试结果分析:

    运行程序,浏览器中输入:http://localhost:8080/pushletTest/receive.jsp
    如下是运行界面:(只开启了一个session,即一个界面)
    这是前台显示的界面:这些数据是服务器实时推送过来的。
    当在前端打开多个不同的界面,如3个:


    前台打开了3个界面,后台会自动开启3个对应的Thread,在各个Thread中分别处理每个session对应的业务。

    我们来看一下后台的线程数:

    我们再开启一个界面,现在总共有4个界面开启,见下图:

    看一下对应的后台:
    从上图可以看到,总共有4个线程在运行,对应4个session;在看一下线程数:

    与前一个对比:

    能看到线程由47个变为了48个。

    接下来我们关掉其中的3个界面,只留下一个界面,按照之前的分析,应该只剩下1个Thread、1个session,
    且后台的线程数应该由48个变为45个,下面我们看下截图:(现在只有一个界面留下,其他3个都关闭了):

    后台的javaw.exe的线程数还是48个;
    此时前台浏览器关闭了,但是Thread并没有立即关闭;
    等待一会,大约半分钟(等待时间不会太长),再次出现下面的界面:


    看一下后台:


    从上面的分析可以看到,前台界面关闭后,稍后延时一会,后台会将其对应的Thread释放掉。
    (多次测试后,发现关闭浏览器界面后,很快(大约2~3)后台线程j就释放掉了)
    如果不关闭界面,而是直接点击“取消订阅”按键,会发现后台会立即将其对应的Thread释放掉。(这个已测试过);
    所以,为了在关闭浏览器界面时,立即释放掉后台对应的Thread,我们可以在页面关闭时,自动执行“取消订阅”函数,
    这样,在每次关闭浏览器界面时,后台检测到就会立即释放对应的线程。

    还有一点需要注意,Thread释放了,但是我们发现session的个数并没有立即减少;
    继续等待,大约几分钟过后,session的个数会减少到当前打开的界面个数。
    这个还没有研究出为什么,需要继续研究。






    附件列表

    • 相关阅读:
      Celery框架
      Tensorflow安装记录
      OpenFace的一些了解
      CentOS+Uwsgi+Nginx发布Flask开发的WebAPI
      Always On 集群监听创建失败问题
      SQL Server 安装好后 Always On群组配置
      Sql server 2016 Always On 搭建Windows集群配置
      汕头市队赛 SRM13 T3
      bzoj 1314: River过河 树套树+单调队列
      hdu 6119 …&&百度之星 T6
    • 原文地址:https://www.cnblogs.com/ssslinppp/p/4480889.html
    Copyright © 2011-2022 走看看