zoukankan      html  css  js  c++  java
  • pushlet单播与多播

       近期要弄一个消息推送的功能,在网上找了非常多的关于pushlet的文章,尽管写的都非常具体,可是本人看了以后却总认为是模棱两可···不知道怎样下手,终于參考了这些文章中的一些内容,并结合官网的源码。做了自己的改动。

     第一部分  改动的地方

    首先改动了nl.justobjects.pushlet.core.Session,添加了even字段,添加了getEvent()方法,同一时候改动了create()方法,改动例如以下:

    // Copyright (c) 2000 Just Objects B.V. <just@justobjects.nl>
    // Distributable under LGPL license. See terms of license at gnu.org.
    
    package nl.justobjects.pushlet.core;
    
    import nl.justobjects.pushlet.util.Log;
    import nl.justobjects.pushlet.util.PushletException;
    
    /**
     * Represents client pushlet session state.
     *
     * @author Just van den Broecke - Just Objects &copy;
     * @version $Id: Session.java,v 1.8 2007/11/23 14:33:07 justb Exp $
     */
    public class Session implements Protocol, ConfigDefs {
    	private Controller controller;
    	private Subscriber subscriber;
    	/** 添加了even字段 */
    	private Event event;
    
    	private String userAgent;
    	private long LEASE_TIME_MILLIS = Config
    			.getLongProperty(SESSION_TIMEOUT_MINS) * 60 * 1000;
    	private volatile long timeToLive = LEASE_TIME_MILLIS;
    
    	public static String[] FORCED_PULL_AGENTS = Config.getProperty(
    			LISTEN_FORCE_PULL_AGENTS).split(",");
    
    	private String address = "unknown";
    	private String format = FORMAT_XML;
    
    	private String id;
    
    	/**
    	 * Protected constructor as we create through factory method.
    	 */
    	protected Session() {
    	}
    
    	/**
    	 * Create instance through factory method.
    	 *
    	 * @param anId
    	 *            a session id
    	 * @return a Session object (or derived)
    	 * @throws PushletException
    	 *             exception, usually misconfiguration
    	 */
    
    	/** 改动前的create方法 */
    	// public static Session create(String anId) throws PushletException {
    	// Session session;
    	// try {
    	// session = (Session) Config.getClass(SESSION_CLASS,
    	// "nl.justobjects.pushlet.core.Session").newInstance();
    	// } catch (Throwable t) {
    	// throw new PushletException(
    	// "Cannot instantiate Session from config", t);
    	// }
    	//
    	// // Init session
    	// session.id = anId;
    	// session.controller = Controller.create(session);
    	// session.subscriber = Subscriber.create(session);
    	// return session;
    	// }
    
    	public static Session create(String anId, Event anEvent)
    			throws PushletException {
    		Session session;
    		try {
    			session = (Session) Config.getClass(SESSION_CLASS,
    					"nl.justobjects.pushlet.core.Session").newInstance();
    		} catch (Throwable t) {
    			throw new PushletException(
    					"Cannot instantiate Session from config", t);
    		}
    
    		// Init session
    		session.id = anId;
    		session.controller = Controller.create(session);
    		session.subscriber = Subscriber.create(session);
    		session.event = anEvent;
    		return session;
    	}
    
    	/**
    	 * 添加了getEVent方法
    	 */
    	public Event getEvent() {
    		return event;
    	}
    
    	/**
    	 * Return (remote) Subscriber client's IP address.
    	 */
    	public String getAddress() {
    		return address;
    	}
    
    	/**
    	 * Return command controller.
    	 */
    	public Controller getController() {
    		return controller;
    	}
    
    	/**
    	 * Return Event format to send to client.
    	 */
    	public String getFormat() {
    		return format;
    	}
    
    	/**
    	 * Return (remote) Subscriber client's unique id.
    	 */
    	public String getId() {
    		return id;
    	}
    
    	/**
    	 * Return subscriber.
    	 */
    	public Subscriber getSubscriber() {
    		return subscriber;
    	}
    
    	/**
    	 * Return remote HTTP User-Agent.
    	 */
    	public String getUserAgent() {
    		return userAgent;
    	}
    
    	/**
    	 * Set address.
    	 */
    	protected void setAddress(String anAddress) {
    		address = anAddress;
    	}
    
    	/**
    	 * Set event format to encode.
    	 */
    	protected void setFormat(String aFormat) {
    		format = aFormat;
    	}
    
    	/**
    	 * Set client HTTP UserAgent.
    	 */
    	public void setUserAgent(String aUserAgent) {
    		userAgent = aUserAgent;
    	}
    
    	/**
    	 * Decrease time to live.
    	 */
    	public void age(long aDeltaMillis) {
    		timeToLive -= aDeltaMillis;
    	}
    
    	/**
    	 * Has session timed out?
    	 */
    	public boolean isExpired() {
    		return timeToLive <= 0;
    	}
    
    	/**
    	 * Keep alive by resetting TTL.
    	 */
    	public void kick() {
    		timeToLive = LEASE_TIME_MILLIS;
    	}
    
    	public void start() {
    		SessionManager.getInstance().addSession(this);
    	}
    
    	public void stop() {
    		subscriber.stop();
    		SessionManager.getInstance().removeSession(this);
    	}
    
    	/**
    	 * Info.
    	 */
    	public void info(String s) {
    		Log.info("S-" + this + ": " + s);
    	}
    
    	/**
    	 * Exceptional print util.
    	 */
    	public void warn(String s) {
    		Log.warn("S-" + this + ": " + s);
    	}
    
    	/**
    	 * Exceptional print util.
    	 */
    	public void debug(String s) {
    		Log.debug("S-" + this + ": " + s);
    	}
    
    	public String toString() {
    		return getAddress() + "[" + getId() + "]";
    	}
    }
    
    /*
     * $Log: Session.java,v $ Revision 1.8 2007/11/23 14:33:07 justb core classes
     * now configurable through factory
     * 
     * Revision 1.7 2005/02/28 15:58:05 justb added SimpleListener example
     * 
     * Revision 1.6 2005/02/28 12:45:59 justb introduced Command class
     * 
     * Revision 1.5 2005/02/28 09:14:55 justb sessmgr/dispatcher factory/singleton
     * support
     * 
     * Revision 1.4 2005/02/25 15:13:01 justb session id generation more robust
     * 
     * Revision 1.3 2005/02/21 16:59:08 justb SessionManager and session lease
     * introduced
     * 
     * Revision 1.2 2005/02/21 12:32:28 justb fixed publish event in Controller
     * 
     * Revision 1.1 2005/02/21 11:50:46 justb ohase1 of refactoring Subscriber into
     * Session/Controller/Subscriber
     */

    然后改动了nl.justobjects.pushlet.core.SessionManager,改动了createSession()方法:

    // Copyright (c) 2000 Just Objects B.V. <just@justobjects.nl>
    // Distributable under LGPL license. See terms of license at gnu.org.
    
    package nl.justobjects.pushlet.core;
    
    import nl.justobjects.pushlet.util.Log;
    import nl.justobjects.pushlet.util.PushletException;
    import nl.justobjects.pushlet.util.Rand;
    import nl.justobjects.pushlet.util.Sys;
    
    import java.rmi.server.UID;
    import java.util.*;
    import java.lang.reflect.Method;
    import java.lang.reflect.InvocationTargetException;
    
    /**
     * Manages lifecycle of Sessions.
     *
     * @author Just van den Broecke - Just Objects &copy;
     * @version $Id: SessionManager.java,v 1.12 2007/12/04 13:55:53 justb Exp $
     */
    public class SessionManager implements ConfigDefs {
    
    	/**
    	 * Singleton pattern: single instance.
    	 */
    	private static SessionManager instance;
    
    	static {
    		// Singleton + factory pattern: create single instance
    		// from configured class name
    		try {
    			instance = (SessionManager) Config.getClass(SESSION_MANAGER_CLASS,
    					"nl.justobjects.pushlet.core.SessionManager").newInstance();
    			Log.info("SessionManager created className=" + instance.getClass());
    		} catch (Throwable t) {
    			Log.fatal("Cannot instantiate SessionManager from config", t);
    		}
    	}
    
    	/**
    	 * Timer to schedule session leasing TimerTasks.
    	 */
    	private Timer timer;
    	private final long TIMER_INTERVAL_MILLIS = 60000;
    
    	/**
    	 * Map of active sessions, keyed by their id, all access is through mutex.
    	 */
    	private Map sessions = new HashMap(13);
    
    	/**
    	 * Cache of Sessions for iteration and to allow concurrent modification.
    	 */
    	private Session[] sessionCache = new Session[0];
    
    	/**
    	 * State of SessionCache, becomes true whenever sessionCache out of sync
    	 * with sessions Map.
    	 */
    	private boolean sessionCacheDirty = false;
    
    	/**
    	 * Lock for any operation on Sessions (Session Map and/or -cache).
    	 */
    	private final Object mutex = new Object();
    
    	/**
    	 * Singleton pattern: protected constructor needed for derived classes.
    	 */
    	protected SessionManager() {
    	}
    
    	/**
    	 * Visitor pattern implementation for Session iteration.
    	 * <p/>
    	 * This method can be used to iterate over all Sessions in a threadsafe way.
    	 * See Dispatcher.multicast and broadcast methods for examples.
    	 *
    	 * @param visitor
    	 *            the object that should implement method parm
    	 * @param method
    	 *            the method to be called from visitor
    	 * @param args
    	 *            arguments to be passed in visit method, args[0] will always be
    	 *            Session object
    	 */
    	public void apply(Object visitor, Method method, Object[] args) {
    
    		synchronized (mutex) {
    
    			// Refresh Session cache if required
    			// We use a cache for two reasons:
    			// 1. to prevent concurrent modification from within visitor method
    			// 2. some optimization (vs setting up Iterator for each apply()
    			if (sessionCacheDirty) {
    				// Clear out existing cache
    				for (int i = 0; i < sessionCache.length; i++) {
    					sessionCache[i] = null;
    				}
    
    				// Refill cache and update state
    				sessionCache = (Session[]) sessions.values().toArray(
    						sessionCache);
    				sessionCacheDirty = false;
    			}
    
    			// Valid session cache: loop and call supplied Visitor method
    			Session nextSession;
    			for (int i = 0; i < sessionCache.length; i++) {
    				nextSession = sessionCache[i];
    
    				// Session cache may not be entirely filled
    				if (nextSession == null) {
    					break;
    				}
    
    				try {
    					// First argument is always a Session object
    					args[0] = nextSession;
    
    					// Use Java reflection to call the method passed by the
    					// Visitor
    					method.invoke(visitor, args);
    				} catch (IllegalAccessException e) {
    					Log.warn("apply: illegal method access: ", e);
    				} catch (InvocationTargetException e) {
    					Log.warn("apply: method invoke: ", e);
    				}
    			}
    		}
    	}
    
    	/**
    	 * Create new Session (but add later).
    	 */
    
    	/**
    	 * public Session createSession(Event anEvent) throws PushletException { 
    	 * return Session.create(createSessionId());}
    	 * 
    	 * 
    	 * 这是原来的createSession方法,能够看到。尽管有anEvent參数··可是却没有在createSession的时候使用
    	 */
    
    	public Session createSession(Event anEvent) throws PushletException {
    		// Trivial
    		return Session.create(createSessionId(), anEvent);
    	}
    
    	/**
    	 * Singleton pattern: get single instance.
    	 */
    	public static SessionManager getInstance() {
    		return instance;
    	}
    
    	/**
    	 * Get Session by session id.
    	 */
    	public Session getSession(String anId) {
    		synchronized (mutex) {
    			return (Session) sessions.get(anId);
    		}
    	}
    
    	/**
    	 * Get copy of listening Sessions.
    	 */
    	public Session[] getSessions() {
    		synchronized (mutex) {
    			return (Session[]) sessions.values().toArray(new Session[0]);
    		}
    	}
    
    	/**
    	 * Get number of listening Sessions.
    	 */
    	public int getSessionCount() {
    		synchronized (mutex) {
    			return sessions.size();
    		}
    	}
    
    	/**
    	 * Get status info.
    	 */
    	public String getStatus() {
    		Session[] sessions = getSessions();
    		StringBuffer statusBuffer = new StringBuffer();
    		statusBuffer.append("SessionMgr: " + sessions.length + " sessions \n");
    		for (int i = 0; i < sessions.length; i++) {
    			statusBuffer.append(sessions[i] + "\n");
    		}
    		return statusBuffer.toString();
    	}
    
    	/**
    	 * Is Session present?

    . */ public boolean hasSession(String anId) { synchronized (mutex) { return sessions.containsKey(anId); } } /** * Add session. */ public void addSession(Session session) { synchronized (mutex) { sessions.put(session.getId(), session); sessionCacheDirty = true; } // log(session.getId() + " at " + session.getAddress() + " adding "); info(session.getId() + " at " + session.getAddress() + " added "); } /** * Register session for removal. */ public Session removeSession(Session aSession) { synchronized (mutex) { Session session = (Session) sessions.remove(aSession.getId()); if (session != null) { info(session.getId() + " at " + session.getAddress() + " removed "); } sessionCacheDirty = true; return session; } } /** * Starts us. */ public void start() throws PushletException { if (timer != null) { stop(); } timer = new Timer(false); timer.schedule(new AgingTimerTask(), TIMER_INTERVAL_MILLIS, TIMER_INTERVAL_MILLIS); info("started; interval=" + TIMER_INTERVAL_MILLIS + "ms"); } /** * Stopis us. */ public void stop() { if (timer != null) { timer.cancel(); timer = null; } synchronized (mutex) { sessions.clear(); } info("stopped"); } /** * Create unique Session id. */ protected String createSessionId() { // Use UUID if specified in config (thanks Uli Romahn) if (Config.hasProperty(SESSION_ID_GENERATION) && Config.getProperty(SESSION_ID_GENERATION).equals( SESSION_ID_GENERATION_UUID)) { // We want to be Java 1.4 compatible so use UID class (1.5+ we may // use java.util.UUID). return new UID().toString(); } // Other cases use random name // Create a unique session id // In 99.9999 % of the cases this should be generated at once // We need the mutext to prevent the chance of creating // same-valued ids (thanks Uli Romahn) synchronized (mutex) { String id; while (true) { id = Rand.randomName(Config.getIntProperty(SESSION_ID_SIZE)); if (!hasSession(id)) { // Created unique session id break; } } return id; } } /** * Util: stdout printing. */ protected void info(String s) { Log.info("SessionManager: " + new Date() + " " + s); } /** * Util: stdout printing. */ protected void warn(String s) { Log.warn("SessionManager: " + s); } /** * Util: stdout printing. */ protected void debug(String s) { Log.debug("SessionManager: " + s); } /** * Manages Session timeouts. */ private class AgingTimerTask extends TimerTask { private long lastRun = Sys.now(); private long delta; private Method visitMethod; public AgingTimerTask() throws PushletException { try { // Setup Visitor Methods for callback from SessionManager Class[] argsClasses = { Session.class }; visitMethod = this.getClass().getMethod("visit", argsClasses); } catch (NoSuchMethodException e) { throw new PushletException("Failed to setup AgingTimerTask", e); } } /** * Clock tick callback from Timer. */ public void run() { long now = Sys.now(); delta = now - lastRun; lastRun = now; debug("AgingTimerTask: tick"); // Use Visitor pattern to loop through Session objects (see visit() // below) getInstance().apply(this, visitMethod, new Object[1]); } /** * Callback from SessionManager during apply() */ public void visit(Session aSession) { try { // Age the lease aSession.age(delta); debug("AgingTimerTask: visit: " + aSession); // Stop session if lease expired if (aSession.isExpired()) { info("AgingTimerTask: Session expired: " + aSession); aSession.stop(); } } catch (Throwable t) { warn("AgingTimerTask: Error in timer task : " + t); } } } } /* * $Log: SessionManager.java,v $ Revision 1.12 2007/12/04 13:55:53 justb * reimplement SessionManager concurrency (prev version was not thread-safe!) * * Revision 1.11 2007/11/23 14:33:07 justb core classes now configurable through * factory * * Revision 1.10 2007/11/10 14:47:45 justb make session key generation * configurable (can use uuid) * * Revision 1.9 2007/11/10 14:17:18 justb minor cosmetic changes just commit now * * Revision 1.8 2007/07/02 08:12:16 justb redo to original version of session * cache (with break, but nullify array first) * * Revision 1.7 2007/07/02 07:33:02 justb small fix in sessionmgr for holes in * sessioncache array (continue i.s.o. break) * * Revision 1.6 2006/11/18 12:13:47 justb made SessionManager constructor * protected to allow constructing derived classes * * Revision 1.5 2005/02/28 15:58:05 justb added SimpleListener example * * Revision 1.4 2005/02/28 12:45:59 justb introduced Command class * * Revision 1.3 2005/02/28 09:14:55 justb sessmgr/dispatcher factory/singleton * support * * Revision 1.2 2005/02/25 15:13:01 justb session id generation more robust * * Revision 1.1 2005/02/21 16:59:09 justb SessionManager and session lease * introduced */



    接着··就大胆的改动了nl.justobjects.pushlet.core.EventPullSource,这里我改动了

    abstract protected Event pullEvent();

        改为了


    abstract protected void pullEvent();

      
    83    public void run() {
    84        Log.debug(getClass().getName() + ": starting...");
    85        alive = true;
    86        while (alive) {
    87            try {
    88
    89                Thread.sleep(getSleepTime());
    90
    91                // Stopped during sleep: end loop.
    92                if (!alive) {
    93                    break;
    94                }
    95
    96                // If passivated wait until we get
    97                // get notify()-ied. If there are no subscribers
    98                // it wasts CPU to remain producing events...
    99                synchronized (this) {
    00                    while (!active) {
    01                        Log.debug(getClass().getName() + ": waiting...");
    02                        wait();
    03                    }
    04                }
    05
    06            } catch (InterruptedException e) {
    07                break;
    08            }
    09
    10            try {
    11                // Derived class should produce an event.
    12                Event event = pullEvent();
    13
    14                // Let the publisher push it to subscribers.
    15                Dispatcher.getInstance().multicast(event);
    16            } catch (Throwable t) {
    17                Log.warn("EventPullSource exception while multicasting ", t);
    18                t.printStackTrace();
    19            }
    20        }
    21        Log.debug(getClass().getName() + ": stopped");
    22    }
    23}

       改为了


    	public void run() {
    		Log.debug(getClass().getName() + ": starting...");
    		alive = true;
    		while (alive) {
    			try {
    
    				Thread.sleep(getSleepTime());
    
    				// Stopped during sleep: end loop.
    				if (!alive) {
    					break;
    				}
    
    				// If passivated wait until we get
    				// get notify()-ied. If there are no subscribers
    				// it wasts CPU to remain producing events...
    				synchronized (this) {
    					while (!active) {
    						Log.debug(getClass().getName() + ": waiting...");
    						wait();
    					}
    				}
    
    			} catch (InterruptedException e) {
    				break;
    			}
    
    			try {
    				// Derived class should produce an event.
    				pullEvent();
    
    				// Let the publisher push it to subscribers.
    				//Dispatcher.getInstance().multicast(event);
    			} catch (Throwable t) {
    				Log.warn("EventPullSource exception while multicasting ", t);
    				t.printStackTrace();
    			}
    		}
    		Log.debug(getClass().getName() + ": stopped");
    	}

    		改动的原因··是原来的run线程启动以后会通过getEvent()来获得event然后通过  Dispatcher.getInstance().multicast(event); 将事件广播了出去。我这里的改造以后的思路是,pullEvent()不再返回event了,线程中也不去进行广播了,全部的操作,包含event的创建以及event的广播还是单播都在pullEvent()中进行。


    最后就是js的改动了,改动ajax-pushlet-client.js

    给PL添加字段parameters;改动后例如以下:

    var PL = {
    	NV_P_FORMAT: 'p_format=xml-strict',
    	NV_P_MODE: 'p_mode=pull',
    	pushletURL: null,
    	webRoot: null,
    	sessionId: null,
    	STATE_ERROR: -2,
    	STATE_ABORT: -1,
    	STATE_NULL: 1,
    	STATE_READY: 2,
    	STATE_JOINED: 3,
    	STATE_LISTENING: 3,
    	state: 1,
    	<span style="color:#cc0000;">parameters:[],</span>
    ......}

    _doRequest方法改动:

    _doRequest: function(anEvent, aQuery) {
    // Check if we are not in any error state
    if (PL.state < 0) {
    PL._setStatus('died (' + PL.state + ')');
    return;
    }


    // We may have (async) requests outstanding and thus
    // may have to wait for them to complete and change state.
    var waitForState = false;
    if (anEvent == 'join' || anEvent == 'join-listen') {
    // We can only join after initialization
    waitForState = (PL.state < PL.STATE_READY);
    } else if (anEvent == 'leave') {
    PL.state = PL.STATE_READY;
    } else if (anEvent == 'refresh') {
    // We must be in the listening state
    if (PL.state != PL.STATE_LISTENING) {
    return;
    }
    } else if (anEvent == 'listen') {
    // We must have joined before we can listen
    waitForState = (PL.state < PL.STATE_JOINED);
    } else if (anEvent == 'subscribe' || anEvent == 'unsubscribe') {
    // We must be listeing for subscription mgmnt
    waitForState = (PL.state < PL.STATE_LISTENING);
    } else {
    // All other requests require that we have at least joined
    waitForState = (PL.state < PL.STATE_JOINED);
    }


    // May have to wait for right state to issue request
    if (waitForState == true) {
    PL._setStatus(anEvent + ' , waiting... state=' + PL.state);
    setTimeout(function() {
    PL._doRequest(anEvent, aQuery);
    }, 100);
    return;
    }


    // ASSERTION: PL.state is OK for this request


    // Construct base URL for GET
    var url = PL.pushletURL + '?

    p_event=' + anEvent;


    // Optionally attach query string
    if (aQuery) {
    url = url + '&' + aQuery;
    }


    // Optionally attach session id
    if (PL.sessionId != null) {
    url = url + '&p_id=' + PL.sessionId;
    if (anEvent == 'p_leave') {
    PL.sessionId = null;
    }
    }

    //这里是我改动的地方。我的数组中的偶数是參数名,奇数是參数内容。这里把我的參数拼接到了url中。
        if(PL.parameters.length > 0) {  

        url+="&" + PL.parameters[0] + "=" + PL.parameters[1];
       }  

    PL.debug('_doRequest', url);
    PL._getXML(url, PL._onResponse);


    // uncomment to use synchronous XmlHttpRequest
    //var rsp = PL._getXML(url);
    //PL._onResponse(rsp);  */
    },



    额,改动完了。能够配置pushlet的相关參数,来使用pushlet了。


    在web.xml中配置例如以下參数

    	<servlet>
    		<servlet-name>pushlet</servlet-name>
    		<servlet-class>nl.justobjects.pushlet.servlet.Pushlet</servlet-class>
    		<load-on-startup>3</load-on-startup>
    	</servlet>
    	<servlet-mapping>
    		<servlet-name>pushlet</servlet-name>
    		<url-pattern>/admin/pushlet.srv</url-pattern>
    	</servlet-mapping>

    这是pushlet配置的基本參数,这里我配置的是/admin/pushlet.srv,是由于项目的路径是localhost:8080/项目名 ,而我的页面是在localhost:8080/项目名/admin/以下。所以加了/admin/pushlet.srv。

    假设你的页面就在项目文件夹下。就不用加前面的/admin了。


    在sources.properties中配置例如以下參数

    # 
    # Properties file for EventSource objects to be instantiated.
    #
    # Place this file in the CLASSPATH (e.g. WEB-INF/classes) or directly under WEB-INF.
    #
    # $Id: sources.properties,v 1.2 2007/11/10 14:12:16 justb Exp $
    #
    # Each EventSource is defined as <key>=<classname>
    # 1. <key> should be unique within this file but may be any name
    # 2. <classname> is the full class name
    #
    #
    # Define Pull Sources here. These classes must be derived from
    # nl.justobjects.pushlet.core.EventPullSource
    # Inner classes are separated with a $ sign from the outer class. 
    source1=org.calonlan.soulpower.plug.HwPushlet$MessClazz
    # TO BE DONE IN NEXT VERSION
    # define Push Sources here. These must implement the interface
    # nl.justobjects.pushlet.core.EventSource

    这里把我的pushlet的实现类配置进去了。



    最后就是怎样使用了。


    单播:


    package org.calonlan.soulpower.plug;
    
    import java.sql.Connection;
    import java.sql.DriverManager;
    import java.sql.ResultSet;
    import java.sql.SQLException;
    import java.sql.Statement;
    
    import nl.justobjects.pushlet.core.Dispatcher;
    import nl.justobjects.pushlet.core.Event;
    import nl.justobjects.pushlet.core.EventPullSource;
    import nl.justobjects.pushlet.core.Session;
    import nl.justobjects.pushlet.core.SessionManager;
    
    public class HwPushlet {
    	private static String driver = "com.mysql.jdbc.Driver";
    	private static String dbName = "dlw";
    	private static String userName = "root";
    	private static String passWord = "root";
    	private static String url = "jdbc:mysql://localhost:3306/";
    
    	static public class MessClazz extends EventPullSource {
    
    		@Override
    		protected long getSleepTime() {
    			return 1000 * 60 * 3;
    		}
    
    		@Override
    		protected void pullEvent() {
    			Session[] sessions = SessionManager.getInstance().getSessions();
    			for (int i = 0; i < sessions.length; i++) {
    				String userId = sessions[i].getEvent().getField("uid");
    				Event event = Event.createDataEvent("/mipc/he");
    				Connection conn = null;
    				String x = "";
    				try {
    					Class.forName(driver);
    					conn = DriverManager.getConnection(url + dbName, userName,
    							passWord);
    					Statement statement = conn.createStatement();
    					String sql = "select * from weiorder where mystate='0' and bname='"
    							+ userId + "'";
    					ResultSet rst = statement.executeQuery(sql);
    					if (rst.next()) {
    						x = "1";
    					} else {
    						x = "2";
    					}
    
    				} catch (ClassNotFoundException e) {
    					// TODO Auto-generated catch block
    					e.printStackTrace();
    				} catch (SQLException e) {
    					// TODO Auto-generated catch block
    					e.printStackTrace();
    				} finally {
    					try {
    						conn.close();
    					} catch (SQLException e) {
    						// TODO Auto-generated catch block
    						e.printStackTrace();
    					}
    				}
    				event.setField("mess", x);
    				Dispatcher.getInstance().unicast(event, sessions[i].getId());
    			}
    		}
    
    	}
    
    }
    


    我这里是server的实现。在单播的时候查找前台传过来的uid中是否有消息,有的话返回一个1,没有就返回一个2。最重要的是Dispatcher.getInstance().unicast(event, sessions[i].getId());这一句,通过单播的形式把消息传递给指定session的用户。


    页面js代码

    <script type="text/javascript">
    PL.parameters.push('uid');
    PL.parameters.push('${user.username}');
    PL._init();   
    PL.joinListen('/mipc/he');
    function onData(event) {
    	
    	if(event.get("mess")=="1"){
    		$.messager.show({
    			title:'系统消息',
    			msg:'您有新的订单请注意处理',
    			timeout:0,
    			showType:'slide'
    		});
    		document.getElementById('audio_player').play();
    	}
    
        // 离开  
        // PL.leave();  
    } 
    </script>



    广播:广播的还js页面的请求就和官网一样了。pullEvent中也仅仅是简单的进行广播。代码例如以下

    package org.calonlan.soulpower.plug;
    
    import nl.justobjects.pushlet.core.Dispatcher;
    import nl.justobjects.pushlet.core.Event;
    import nl.justobjects.pushlet.core.EventPullSource;
    
    public class HwPushlet {
    
    
    	static public class MessClazz extends EventPullSource {
    
    		@Override
    		protected long getSleepTime() {
    			return 1000 * 60 * 3;
    		}
    
    		@Override
    		protected void pullEvent() {
    
    			Event event = Event.createDataEvent("/mipc/he");
    			String x = "";
    
    			event.setField("mess", x);
    			Dispatcher.getInstance().multicast(event);
    
    		}
    
    	}
    
    }
    





    额。时间比較仓促···近期项目太忙了,唉。这里简单的记录一下。也不知道这样的是不是非常落后或是非常复杂的改动方法。还忘高手们批评指正。改动后的jar包和js请在http://download.csdn.net/detail/u012613903/9483881下载。



  • 相关阅读:
    VMware安装最新版CentOS7图文教程
    git 本地给远程仓库创建分支 三步法
    git如何利用分支进行多人开发
    题解 洛谷P6478 [NOI Online #2 提高组] 游戏
    题解 CF1146D Frog Jumping
    题解 洛谷P6477 [NOI Online #2 提高组] 子序列问题
    题解 LOJ2472 「九省联考 2018」IIIDX
    题解 CF1340 A,B,C Codeforces Round #637 (Div. 1)
    题解 LOJ3284 「USACO 2020 US Open Platinum」Exercise
    windows上的路由表
  • 原文地址:https://www.cnblogs.com/yjbjingcha/p/8400693.html
Copyright © 2011-2022 走看看