zoukankan      html  css  js  c++  java
  • spring websocket源码分析续Handler的使用

    1. handler的定义

    spring websocket支持的消息有以下几种:

    对消息的处理就使用了Handler模式,抽象handler类AbstractWebSocketHandler.java

    @Override
        public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception {
            if (message instanceof TextMessage) {
                handleTextMessage(session, (TextMessage) message);
            }
            else if (message instanceof BinaryMessage) {
                handleBinaryMessage(session, (BinaryMessage) message);
            }
            else if (message instanceof PongMessage) {
                handlePongMessage(session, (PongMessage) message);
            }
            else {
                throw new IllegalStateException("Unexpected WebSocket message type: " + message);
            }
        }
    
        protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
        }
    
        protected void handleBinaryMessage(WebSocketSession session, BinaryMessage message) throws Exception {
        }
    
        protected void handlePongMessage(WebSocketSession session, PongMessage message) throws Exception {
        }

    具体实现handler类BinaryWebSocketHandler(为例,其它略)

    public class BinaryWebSocketHandler extends AbstractWebSocketHandler {
    
        @Override
        protected void handleTextMessage(WebSocketSession session, TextMessage message) {
            try {
                session.close(CloseStatus.NOT_ACCEPTABLE.withReason("Text messages not supported"));
            }
            catch (IOException e) {
                // ignore
            }
        }
    
    }

    2.handler的使用

    StandardWebSocketClient和服务端握手时,调用

    @Override
        protected ListenableFuture<WebSocketSession> doHandshakeInternal(WebSocketHandler webSocketHandler,
                HttpHeaders headers, final URI uri, List<String> protocols,
                List<WebSocketExtension> extensions, Map<String, Object> attributes) {
    
            int port = getPort(uri);
            InetSocketAddress localAddress = new InetSocketAddress(getLocalHost(), port);
            InetSocketAddress remoteAddress = new InetSocketAddress(uri.getHost(), port);
    
            final StandardWebSocketSession session = new StandardWebSocketSession(headers,
                    attributes, localAddress, remoteAddress);
    
            final ClientEndpointConfig.Builder configBuilder = ClientEndpointConfig.Builder.create();
            configBuilder.configurator(new StandardWebSocketClientConfigurator(headers));
            configBuilder.preferredSubprotocols(protocols);
            configBuilder.extensions(adaptExtensions(extensions));
            final Endpoint endpoint = new StandardWebSocketHandlerAdapter(webSocketHandler, session);
    
            Callable<WebSocketSession> connectTask = new Callable<WebSocketSession>() {
                @Override
                public WebSocketSession call() throws Exception {
                    webSocketContainer.connectToServer(endpoint, configBuilder.build(), uri);
                    return session;
                }
            };
    
            if (this.taskExecutor != null) {
                return this.taskExecutor.submitListenable(connectTask);
            }
            else {
                ListenableFutureTask<WebSocketSession> task = new ListenableFutureTask<WebSocketSession>(connectTask);
                task.run();
                return task;
            }
        }

    红色部分调用一个适配器StandardWebSocketHandlerAdapter,它封装了Handler的调用

    @Override
        public void onOpen(final javax.websocket.Session session, EndpointConfig config) {
    
            this.wsSession.initializeNativeSession(session);
    
            if (this.handler.supportsPartialMessages()) {
                session.addMessageHandler(new MessageHandler.Partial<String>() {
                    @Override
                    public void onMessage(String message, boolean isLast) {
                        handleTextMessage(session, message, isLast);
                    }
                });
                session.addMessageHandler(new MessageHandler.Partial<ByteBuffer>() {
                    @Override
                    public void onMessage(ByteBuffer message, boolean isLast) {
                        handleBinaryMessage(session, message, isLast);
                    }
                });
            }
            else {
                session.addMessageHandler(new MessageHandler.Whole<String>() {
                    @Override
                    public void onMessage(String message) {
                        handleTextMessage(session, message, true);
                    }
                });
                session.addMessageHandler(new MessageHandler.Whole<ByteBuffer>() {
                    @Override
                    public void onMessage(ByteBuffer message) {
                        handleBinaryMessage(session, message, true);
                    }
                });
            }
    
            session.addMessageHandler(new MessageHandler.Whole<javax.websocket.PongMessage>() {
                @Override
                public void onMessage(javax.websocket.PongMessage message) {
                    handlePongMessage(session, message.getApplicationData());
                }
            });
    
            try {
                this.handler.afterConnectionEstablished(this.wsSession);
            }
            catch (Throwable t) {
                ExceptionWebSocketHandlerDecorator.tryCloseWithError(this.wsSession, t, logger);
                return;
            }
        }

    具体实现

    private void handleTextMessage(javax.websocket.Session session, String payload, boolean isLast) {
            TextMessage textMessage = new TextMessage(payload, isLast);
            try {
                this.handler.handleMessage(this.wsSession, textMessage);
            }
            catch (Throwable t) {
                ExceptionWebSocketHandlerDecorator.tryCloseWithError(this.wsSession, t, logger);
            }
        }
    
        private void handleBinaryMessage(javax.websocket.Session session, ByteBuffer payload, boolean isLast) {
            BinaryMessage binaryMessage = new BinaryMessage(payload, isLast);
            try {
                this.handler.handleMessage(this.wsSession, binaryMessage);
            }
            catch (Throwable t) {
                ExceptionWebSocketHandlerDecorator.tryCloseWithError(this.wsSession, t, logger);
            }
        }
    
        private void handlePongMessage(javax.websocket.Session session, ByteBuffer payload) {
            PongMessage pongMessage = new PongMessage(payload);
            try {
                this.handler.handleMessage(this.wsSession, pongMessage);
            }
            catch (Throwable t) {
                ExceptionWebSocketHandlerDecorator.tryCloseWithError(this.wsSession, t, logger);
            }
  • 相关阅读:
    elasticSearch 查询 bool
    elasticSearch 查询 term
    elasticSearch 查询 match
    python re
    vue day1
    mysql 报错记录
    node.js vue.js 安装
    mysql 插入数据 ,存在跳过
    打印乘法口诀表
    初步使用分支、循环判断数字大小
  • 原文地址:https://www.cnblogs.com/davidwang456/p/5672677.html
Copyright © 2011-2022 走看看