zoukankan      html  css  js  c++  java
  • 用Jetty 9.1运行Java WebSockets微服务

      Jetty 9.1的发布将Java WebSockets (JSR-356) 带入了非Java EE环境,从而开启了微服务时代。我们可以将Jetty的容器包含在java应用程序中(注意,不是Java代码运行在容器中,而是相反),这种微服务轻量概念开始得到提倡推广,为模块化开启新的探索方向。

      该案例目标是要建设一个从客户端程序接受消息并广播到当前连接的所有其他客户端WebSocket服务器。假设有一个消息模型:

    package com.example.services;

    public class Message {
        private String username;
        private String message;

        public Message() {
        }

        public Message( final String username, final String message ) {
            this.username = username;
            this.message = message;
        }

        public String getMessage() {
            return message;
        }

        public String getUsername() {
            return username;
        }

        public void setMessage( final String message ) {
            this.message = message;
        }

        public void setUsername( final String username ) {
            this.username = username;
        }
    }

    为了分离服务器端和客户端,JSR-356规定了两个元注解@ServerEndpoint 和@ClientEndpoit

    客户端代码:

    @ClientEndpoint
    public class BroadcastClientEndpoint {
        private static final Logger log = Logger.getLogger( 
            BroadcastClientEndpoint.class.getName() );

        @OnOpen
        public void onOpen( final Session session ) throws IOException, EncodeException  {
            session.getBasicRemote().sendObject( new Message( "Client", "Hello!" ) );
        }

        @OnMessage
        public void onMessage( final Message message ) {
            log.info( String.format( "Received message '%s' from '%s'",
                message.getMessage(), message.getUsername() ) );
        }
    }

    @OnOpen 是当客户端连接到服务器开始调用,@OnMessage是每次服务器向客户端发送消息时调用。

    消息传递使用Json,这里使用JSR-353规范的Json类将对象进行序列化。我们需要在Message里面加上一下Json反序列化,也就是将Json转为Message对象:

    public class Message {
        public static class MessageDecoder implements Decoder.Text< Message > {
            private JsonReaderFactory factory = Json.createReaderFactory( Collections.< String, Object >emptyMap() );

            @Override
            public void init( final EndpointConfig config ) {
            }

            @Override
            public Message decode( final String str ) throws DecodeException {
                final Message message = new Message();

                try( final JsonReader reader = factory.createReader( new StringReader( str ) ) ) {
                    final JsonObject json = reader.readObject();
                    message.setUsername( json.getString( "username" ) );
                    message.setMessage( json.getString( "message" ) );
                }

                return message;
            }

            @Override
            public boolean willDecode( final String str ) {
                return true;
            }

            @Override
            public void destroy() {
            }
        }
    }

    我们需要告诉客户端,我们有一个Json编码器和解码器,在BroadcastClientEndpoint类上加入:

    @ClientEndpoint( encoders = { MessageEncoder.class }, decoders = { MessageDecoder.class } )
    public class BroadcastClientEndpoint {
    }

    下面是调用运行代码:

    public class ClientStarter {
        public static void main( final String[] args ) throws Exception {
            final String client = UUID.randomUUID().toString().substring( 0, 8 );

            final WebSocketContainer container = ContainerProvider.getWebSocketContainer();    
            final String uri = "ws://localhost:8080/broadcast"; 

            try( Session session = container.connectToServer( BroadcastClientEndpoint.class, URI.create( uri ) ) ) {
                for( int i = 1; i <= 10; ++i ) {
                    session.getBasicRemote().sendObject( new Message( client, "Message #" + i ) );
                    Thread.sleep( 1000 );
                }
            }

            // Application doesn't exit if container's threads are still running
            ( ( ClientContainer )container ).stop();
        }
    }

    这是连接URL ws://localhost:8080/broadcast,随机挑选一些客户端名称(从UUID),每1秒的延迟产生10条信息,(只是为了确保我们有时间去接收他们都回来了)。

    下面是服务器端的代码:

    @ServerEndpoint( 
        value = "/broadcast", 
        encoders = { MessageEncoder.class }, 
        decoders = { MessageDecoder.class } 

    public class BroadcastServerEndpoint {
        private static final Set< Session > sessions = 
            Collections.synchronizedSet( new HashSet< Session >() );

        @OnOpen
        public void onOpen( final Session session ) {
            sessions.add( session );
        }

        @OnClose
        public void onClose( final Session session ) {
            sessions.remove( session );
        }

        @OnMessage
        public void onMessage( final Message message, final Session client ) 
                throws IOException, EncodeException {
            for( final Session session: sessions ) {
                session.getBasicRemote().sendObject( message );
            }
        }
    }

    为了使这个服务器端点能够运行,我们将其注册入Jetty服务器,Jetty9.能够在嵌入下运行:

    public class ServerStarter  {
        public static void main( String[] args ) throws Exception {
            Server server = new Server( 8080 );

            // Create the 'root' Spring application context
            final ServletHolder servletHolder = new ServletHolder( new DefaultServlet() );
            final ServletContextHandler context = new ServletContextHandler();

            context.setContextPath( "/" );
            context.addServlet( servletHolder, "/*" );
            context.addEventListener( new ContextLoaderListener() );   
            context.setInitParameter( "contextClass", AnnotationConfigWebApplicationContext.class.getName() );
            context.setInitParameter( "contextConfigLocation", AppConfig.class.getName() );

            server.setHandler( context );
            WebSocketServerContainerInitializer.configureContext( context );       

            server.start();
            server.join(); 
        }
    }

    最重要的是WebSocketServerContainerInitializer.configureContext:,它是创建一个Websockets的容器,目前容器内什么也没有,我们没有注册进入我们的服务器端点。

    Spring的AppConfig 能够帮助我们做到这点:

    @Configuration
    public class AppConfig  {
        @Inject private WebApplicationContext context;
        private ServerContainer container;

        public class SpringServerEndpointConfigurator extends ServerEndpointConfig.Configurator {
            @Override
            public < T > T getEndpointInstance( Class< T > endpointClass ) 
                    throws InstantiationException {
                return context.getAutowireCapableBeanFactory().createBean( endpointClass );   
            }
        }

        @Bean
        public ServerEndpointConfig.Configurator configurator() {
            return new SpringServerEndpointConfigurator();
        }

        @PostConstruct
        public void init() throws DeploymentException {
            container = ( ServerContainer )context.getServletContext().
                getAttribute( javax.websocket.server.ServerContainer.class.getName() );

            container.addEndpoint( 
                new AnnotatedServerEndpointConfig( 
                    BroadcastServerEndpoint.class, 
                    BroadcastServerEndpoint.class.getAnnotation( ServerEndpoint.class )  
                ) {
                    @Override
                    public Configurator getConfigurator() {
                        return configurator();
                    }
                }
            );
        }  
    }

    容器通过调用构造函数将创建container,然后每一次新的客户端连接创建一个服务器端点的新实例。

    我们检索的WebSockets容器的方法是Jetty专用规范:查询来自名为“javax.websocket.server.ServerContainer”上下文的属性。

    最后运行:

    mvn clean package
    java -jar targetjetty-web-sockets-jsr356-0.0.1-SNAPSHOT-server.jar // run server
    java -jar target/jetty-web-sockets-jsr356-0.0.1-SNAPSHOT-client.jar // run yet another client

    输出结果部分:

    Nov 29, 2013 9:21:29 PM com.example.services.BroadcastClientEndpoint onMessage
    INFO: Received message 'Hello!' from 'Client'
    Nov 29, 2013 9:21:29 PM com.example.services.BroadcastClientEndpoint onMessage
    INFO: Received message 'Message #1' from '392f68ef'
    Nov 29, 2013 9:21:29 PM com.example.services.BroadcastClientEndpoint onMessage
    INFO: Received message 'Message #2' from '8e3a869d'
    Nov 29, 2013 9:21:29 PM com.example.services.BroadcastClientEndpoint onMessage
    INFO: Received message 'Message #7' from 'ca3a06d0'
    Nov 29, 2013 9:21:30 PM com.example.services.BroadcastClientEndpoint onMessage
    INFO: Received message 'Message #4' from '6cb82119'
    Nov 29, 2013 9:21:30 PM com.example.services.BroadcastClientEndpoint onMessage
    INFO: Received message 'Message #2' from '392f68ef'
    Nov 29, 2013 9:21:30 PM com.example.services.BroadcastClientEndpoint onMessage
    INFO: Received message 'Message #3' from '8e3a869d'
    Nov 29, 2013 9:21:30 PM com.example.services.BroadcastClientEndpoint onMessage
    INFO: Received message 'Message #8' from 'ca3a06d0'
    Nov 29, 2013 9:21:31 PM com.example.services.BroadcastClientEndpoint onMessage
    INFO: Received message 'Message #5' from '6cb82119'
    Nov 29, 2013 9:21:31 PM com.example.services.BroadcastClientEndpoint onMessage

    该项目源码下载: GitHub

  • 相关阅读:
    VUE课程参考---2、VUE基本使用
    VUE课程---1、VUE课程介绍
    JS数组常用方法---3、pop方法使用及原理
    JavaScript中数组元素删除的七大方法汇总
    Stack的三种含义
    JS数组常用方法---6、reverse方法
    数据库水平切分的实现原理解析---分库,分表,主从,集群,负载均衡器
    服务框架HSF分析之一容器启动
    淘宝HSF服务的原理以及简单的实现
    DAS 原文出自【比特网】
  • 原文地址:https://www.cnblogs.com/meetrice/p/3525583.html
Copyright © 2011-2022 走看看