zoukankan      html  css  js  c++  java
  • Springboot+WebSocket+Kafka(写着玩的)

    闹着玩的来源:前台发送消息,后台接受处理发给kafka,kafka消费者接到消息传给前台显示。联想到websocket。

    最终效果如图:

    页面解释:

    不填写内容的话,表单值默认为Topic、Greeting、Name

    点击订阅,按钮变黑

    Send Topic 广播 前台显示前缀:T-You Send
    Subscribe Topic 订阅广播 前台显示前缀:A-You Receive、B-You Receive
    Send Queue 广播 前台显示前缀:Q-You Send
    Subscribe Queue 订阅广播 前台显示前缀:C-You Receive、D-You Receive
    Subscribe Point 订阅点对点 前台显示前缀:/user/110/queue/pushInfo,Receive
    Send Kafka 点对点 前台显示前缀:Kafka Send
    Receive Kafka 订阅点对点 前台显示前缀:Kafka Receive

    重要提示:欲接受消息,先点击订阅

    关键代码:

    配置websocket

    package com.example.demo.conf;
    
    import org.springframework.context.annotation.Configuration;
    import org.springframework.messaging.simp.config.MessageBrokerRegistry;
    import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
    import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
    import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;
    
    /**
     * @program: boot-kafka
     * @description:
     * @author: 001977
     * @create: 2018-07-11 11:30
     */
    @Configuration
    @EnableWebSocketMessageBroker
    public class WebSocketConfigurationWithSTOMP implements WebSocketMessageBrokerConfigurer {
    
        @Override
        public void registerStompEndpoints(StompEndpointRegistry registry) {
            registry.addEndpoint("/clientConnectThis").setAllowedOrigins("*").withSockJS();
        }
    
        @Override
        public void configureMessageBroker(MessageBrokerRegistry config) {
            // P2P should conf a /user  ;   broadcast should conf a /topic
            config.enableSimpleBroker("/topic", "/queue", "/user");
            config.setApplicationDestinationPrefixes("/app");   // Client to Server
            config.setUserDestinationPrefix("/user");           // Server to Client
        }
    }

    controller

    package com.example.demo.controller;
    
    import com.example.demo.entity.Welcome;
    import com.example.demo.service.KafkaService;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.messaging.handler.annotation.MessageMapping;
    import org.springframework.messaging.handler.annotation.SendTo;
    import org.springframework.messaging.simp.SimpMessagingTemplate;
    import org.springframework.messaging.simp.annotation.SendToUser;
    import org.springframework.messaging.simp.annotation.SubscribeMapping;
    import org.springframework.web.bind.annotation.RequestBody;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    import org.springframework.web.servlet.ModelAndView;
    
    /**
     * @program: boot-kafka
     * @description:
     * @author: 001977
     * @create: 2018-07-11 11:00
     */
    @RestController
    public class SimpleController {
    
        @Autowired
        private KafkaService kafkaService;
    
        @Autowired
        private SimpMessagingTemplate simpMessagingTemplate;
    
        @RequestMapping("/")
        public ModelAndView stomp(){
            return new ModelAndView("stomp_home");
        }
    
        @SubscribeMapping("/firstConnection")
        public Welcome thanks(){
            return new Welcome("...", "Thank You!");
        }
    
        @MessageMapping("/sendMessageTopic")
        @SendTo("/topic/webSocketTopic")
        public Welcome sendToTopic(@RequestBody Welcome welcome){
            System.out.println("Send-Topic-Msg:" + welcome);
            return welcome;
        }
    
        @MessageMapping("/sendMessageQueue")
        @SendToUser("/queue/webSocketQueue")
        public Welcome sendToQueue(@RequestBody Welcome welcome){
            System.out.println("Send-Queue-Msg:" + welcome);
            return welcome;
        }
    
        /**
         * P2P,后台模拟推送给前台,需打开@Scheduled注释
         */
        //@Scheduled(fixedRate = 1000L)
        public void send(){
            Welcome welcome = new Welcome("110","Hello!");
            simpMessagingTemplate.convertAndSendToUser("110", "/queue/pushInfo", welcome);
            System.err.println(welcome);
        }
    
        @MessageMapping("/sendKafka")
        public Welcome sendToKafka(@RequestBody Welcome welcome){
            boolean b = kafkaService.send(welcome);
            if (b)
                System.out.println("Send-Kafka-Msg:" + welcome);
            return welcome;
        }
    
    }

    前端JS

    var socket = new SockJS('/clientConnectThis');
    var stompClient = Stomp.over(socket);
    stompClient.connect({},
        function connectCallback(frame) {   // success
            connectResult("Connect Success");
            stompClient.subscribe('/app/firstConnection', function (response) {
                var returnData = JSON.parse(response.body);
                receiveText("/app/firstConnection,Test Receive:" + returnData.greeting);
            });
        },
        function errorCallBack(error) {     // failed
            connectResult("Connect Break");
        }
    );
    
    //发送消息
    function sendTopic() {
        var topic = $('#topic').val();
        var message = $('#message').val();
        var name = $('#name').val();
        if(topic == "")
            topic = "Topic";
        if(message == "")
            message = "Greeting";
        if(name == "")
            name = "Name";
        var messageJson = JSON.stringify({"topic":topic,"greeting": message,"name":name});
        stompClient.send("/app/sendMessageTopic", {}, messageJson);
        sendText("T-You Send:" + messageJson);
    }
    function sendQueue() {
        var topic = $('#topic').val();
        var message = $('#message').val();
        var name = $('#name').val();
        if(topic == "")
            topic = "Topic";
        if(message == "")
            message = "Greeting";
        if(name == "")
            name = "Name";
        var messageJson = JSON.stringify({"topic":topic,"greeting": message,"name":name});
        stompClient.send("/app/sendMessageQueue", {}, messageJson);
        sendText("Q-You Send:" + messageJson);
    }
    
    //订阅消息
    function subscribeTopic(t) {
        $(t).css({
            "backgroundColor": "#000"
        });
        stompClient.subscribe('/topic/webSocketTopic', function (response) {
            var returnData = JSON.parse(response.body);
            receiveText("A-You Receive:(name=" + returnData.name + ",greeting=" + returnData.greeting + ",topic=" + returnData.topic + ")")
        });
        stompClient.subscribe('/topic/webSocketTopic', function (response) {
            var returnData = JSON.parse(response.body);
            receiveText("B-You Receive:(name=" + returnData.name + ",greeting=" + returnData.greeting + ",topic=" + returnData.topic + ")")
        });
    }
    
    //订阅消息
    function subscribeQueue(t) {
        $(t).css({
            "backgroundColor": "#000"
        });
        stompClient.subscribe('/user/queue/webSocketQueue', function (response) {
            var returnData = JSON.parse(response.body);
            receiveText("C-You Receive:(name=" + returnData.name + ",greeting=" + returnData.greeting + ",topic=" + returnData.topic + ")")
        });
        stompClient.subscribe('/user/queue/webSocketQueue', function (response) {
            var returnData = JSON.parse(response.body);
            receiveText("D-You Receive:(name=" + returnData.name + ",greeting=" + returnData.greeting + ",topic=" + returnData.topic + ")")
        });
    }
    
    function subscribePoint(t) {
        $(t).css({
            "backgroundColor": "#000"
        });
        // /user/{userId}/**
        stompClient.subscribe('/user/110/queue/pushInfo', function (response) {
            var returnData = JSON.parse(response.body);
            receiveText("/user/110/queue/pushInfo,Receive:" + returnData.greeting);
        });
    }
    
    function sendKafka() {
        var topic = $('#topic').val();
        var message = $('#message').val();
        var name = $('#name').val();
        if(topic == "")
            topic = "Topic";
        if(message == "")
            message = "Greeting";
        if(name == "")
            name = "Name";
        var messageJson = JSON.stringify({"topic":topic,"greeting": message,"name":name});
        stompClient.send("/app/sendKafka", {}, messageJson);
        sendText("Kafka Send:" + messageJson);
    }
    
    function kafkaReceive(t) {
        $(t).css({
            "backgroundColor": "#000"
        });
        stompClient.subscribe('/user/kafka-user-id/queue/kafkaMsg', function (response) {
            var returnData = JSON.parse(response.body);
            receiveText("Kafka Receive:(name=" + returnData.name + ",greeting=" + returnData.greeting + ",topic=" + returnData.topic + ")")
        });
    }
    
    
    function sendText(v) {
        $('.container .right').append($('<div class="common-message send-text">'+ v +'</div>'));
    }
    
    function receiveText(v) {
        $('.container .right').append($('<div class="common-message receive-text">'+ v +'</div>'));
    }
    
    function connectResult(v) {
        $('.container').append($('<div class="connect-text">'+ v +'</div>'))
    }

    其余的见GitHub

  • 相关阅读:
    转载:C#制作PDF
    搜索研究
    HDU 4029 Distinct Submatrix [后缀数组]
    HDU 4336 Card Collector [状态压缩概率DP]
    ZOJ 3329 One Person Game [数学期望]
    POJ 2096 Collecting Bugs[数学期望]
    HDU 4338 Simple Path [双联通分量+RMQ(LCA)]
    POJ 1222 EXTENDED LIGHTS OUT [高斯消元]
    HDU 2258 Continuous Same Game (1)[模拟]
    HDU 4339 Query [树状数组]
  • 原文地址:https://www.cnblogs.com/LUA123/p/9298952.html
Copyright © 2011-2022 走看看