zoukankan      html  css  js  c++  java
  • Springboot + Rabbitmq + WebSocet + vue

    1、pom.xml

    		<dependency>
    			<groupId>org.springframework.boot</groupId>
    			<artifactId>spring-boot-starter-amqp</artifactId>
    		</dependency>
    		<dependency>
    			<groupId>org.springframework.boot</groupId>
    			<artifactId>spring-boot-starter-websocket</artifactId>
    		</dependency>
    

    2、WebSocketConfig

    启用WebSocket的支持也是很简单,几句代码搞定

    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.web.socket.server.standard.ServerEndpointExporter;
    
    @Configuration
    public class WebSocketConfig {
        /**
         * 注入ServerEndpointExporter,
         * 这个bean会自动注册使用了@ServerEndpoint注解声明的Websocket endpoint
         *
         * @return
         */
        @Bean
        public ServerEndpointExporter serverEndpointExporter() {
            return new ServerEndpointExporter();
        }
    }
    

    3、WebSocketServer

    因为WebSocket是类似客户端服务端的形式(采用ws协议),那么这里的WebSocketServer其实就相当于一个ws协议的Controller
    直接@ServerEndpoint("/websocket")@Component启用即可,然后在里面实现@OnOpen,@onClose,@onMessage等方法

    package com.yanan.base.config.rabbitmq;
    
    import com.alibaba.fastjson.JSONObject;
    import lombok.extern.slf4j.Slf4j;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.stereotype.Component;
    
    import javax.websocket.*;
    import javax.websocket.server.PathParam;
    import javax.websocket.server.ServerEndpoint;
    import java.io.IOException;
    import java.util.Map;
    import java.util.concurrent.ConcurrentHashMap;
    @ServerEndpoint("/websocket/{userId}") //WebSocket客户端建立连接的地址 @Component @Slf4j public class WebSocketServer { private final Logger log= LoggerFactory.getLogger(WebSocketServer.class); /** * 存活的session集合(使用线程安全的map保存) */ private static Map<String, Session> livingSessions = new ConcurrentHashMap<>(); /** * 建立连接的回调方法 * * @param session 与客户端的WebSocket连接会话 * @param userId 用户名,WebSocket支持路径参数 */ @OnOpen public void onOpen(Session session, @PathParam("userId") String userId) { livingSessions.put(session.getId(), session); log.info(userId + "进入连接"); } @OnMessage public void onMessage(String message, Session session, @PathParam("userId") String userId) { log.info(userId + " : " + message); sendMessageToAll(userId + " : " + message); } @OnError public void onError(Session session, Throwable error) { log.info("发生错误"); log.error(error.getStackTrace() + ""); } @OnClose public void onClose(Session session, @PathParam("userId") String userId) { livingSessions.remove(session.getId()); log.info(userId + " 关闭连接"); } /** * 单独发送消息 * * @param session * @param message */ public void sendMessage(Session session, String message) { try { session.getBasicRemote().sendText(message); } catch (IOException e) { e.printStackTrace(); } } /** * 群发消息 * * @param message */ public void sendMessageToAll(String message) { livingSessions.forEach((sessionId, session) -> { sendMessage(session, message); }); } }

     4、接收rabbitmq消息

          application.yml配置

    spring:
      rabbitmq:
        host: localhost
        port: 5672
        username: test
        password: test
        virtual-host: /
    

      监听器接收消息队列RabbitmqListener.java

    package com.yanan.base.config.rabbit;
    
    import com.alibaba.fastjson.JSON;
    import com.alibaba.fastjson.JSONObject;
    import com.yanan.base.config.rabbitmq.WebSocketServerEndpoint;
    import lombok.extern.slf4j.Slf4j;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    import sun.misc.BASE64Decoder;
    
    import sun.misc.BASE64Decoder;
    import sun.misc.BASE64Encoder;
    
    import javax.crypto.Cipher;
    import javax.crypto.spec.IvParameterSpec;
    import javax.crypto.spec.SecretKeySpec;
    
    @Component
    @Slf4j
    public class RabbitmqListener {
        private final Logger log= LoggerFactory.getLogger(RabbitmqListener.class);
        @Autowired
        private WebSocketServer webSocketServer;
        //  AES加密解密
        private String sKey = "785641234654";
        private String ivParamter = "0392039203";
        //queue指要监听列队的名字
        @RabbitListener(queues = "sdf")
        public void reveiverDirectQueue(String message){
            decrypt(message);
        }
        public JSONObject decrypt(String message) {
            try {
                byte[] raw = sKey.getBytes("ASCII");
                SecretKeySpec skeySpec = new SecretKeySpec(raw, "AES");
                Cipher cipher = Cipher.getInstance("AES/CBC/PKCS5Padding");
                IvParameterSpec iv = new IvParameterSpec(ivParamter.getBytes());
                cipher.init(Cipher.DECRYPT_MODE, skeySpec, iv);
                byte[] encrypted1 = new BASE64Decoder().decodeBuffer(message);// 先用 base64 解密
                byte[] original = cipher.doFinal(encrypted1);
                String originalString = new String(original, "utf-8");
                JSONObject object = JSON.parseObject(originalString);
    // websocket发送消息 webSocketServer.sendMessageToAll(originalString); log.info("监听消息:" + originalString); return object; } catch (Exception ex) { return null; } } }

    5、vue

    data () {
        return {
          queueReceiveSetting: { // 消息队列配置
            websock: null,
            client: null,
            wsuri: 'ws://localhost:8089/websocket/dfgd'
          }
        }
    }
      methods: {
        initWebSocket () {
          // ws地址
          if (this.queueReceiveSetting.websock) {
            this.queueReceiveSetting.websock.close()
          }
          this.queueReceiveSetting.websock = new WebSocket(this.queueReceiveSetting.wsuri)
          this.queueReceiveSetting.websock.onopen = res => {
            console.log('开启连接')
          }
          this.queueReceiveSetting.websock.onmessage = res => {
            let data = JSON.parse(res.data)
            console.log('接收到的数据:', data)
          }
          this.queueReceiveSetting.websock.onclose = res => {
            console.log('连接关闭')
          }
          this.queueReceiveSetting.websock.onerror = res => {
            console.log('连接出错')
          }
        }
      },
      mounted () {
    this.initWebSocket()
    }

      

  • 相关阅读:
    Codeforces Round #485 (Div. 2) C题求三元组(思维)
    MongoDB设置访问权限、设置用户
    与MySQL交互(felixge/node-mysql)
    centos Supervisor
    Async详解之一:流程控制
    C# Socket连接请求超时机制
    tcp-client-c++
    C#TCPClient应用-一个简单的消息发送和接收
    centos下各种c++库文件的安装
    AngularJS与RequireJS集成方案
  • 原文地址:https://www.cnblogs.com/zqyw/p/11132467.html
Copyright © 2011-2022 走看看