zoukankan      html  css  js  c++  java
  • kafka-node+socket.io 测试配置

    1.安装需要插件

       npm install express 

       npm install  socket.io 

       npm install  kafka-node

    2.kafkatest.js文件

    var express = require('express'); 
    var app = express();
    
    var server = require('http').createServer(app);
    var io = require('socket.io')(server);
    var kafka = require('kafka-node');
    var users = [];
    app.use('/', express.static(__dirname + '/')); 
    
    app.get('/send', function (req, res) {
        var msg=req.query.msg;
        var Producer = kafka.Producer,
        client = new kafka.Client('localhost:2181'),
        producer = new Producer(client);
        payloads = [
            { topic: 'test', messages: msg, partition: 0 },
        ];
        producer.on('ready', function(){
            producer.send(payloads, function(err, data){
                console.log(data);
               // socket.emit('server_counter',data);
            });
        });
        producer.on('error', function(err){})
        
        res.send('输入消息='+msg);
     })
    server.listen(8080);
    
    
    setTimeout(function(){
        var Consumer = kafka.Consumer;
        var Offset = kafka.Offset;
        var topic = 'test';
    
        var client = new kafka.Client('localhost:2181');//'localhost:2181'
        var topics = [{ topic: topic, partition: 0 }];
        var options = { autoCommit: false };//, fetchMaxWaitMs: 1000, fetchMaxBytes: 1024 * 1024
    
        var consumer = new Consumer(client, topics, options);
        var offset = new Offset(client);
        consumer.on('message', function (message) {
            console.log(message);
            io.sockets.emit('server_counter',message);
          });
          
          consumer.on('error', function (err) {
            console.log('error', err);
          });
          consumer.on('offsetOutOfRange', function (topic) {
            topic.maxNum = 2;
            offset.fetch([topic], function (err, offsets) {
              if (err) {
                return console.error(err);
              }
              var min = Math.min.apply(null, offsets[topic.topic][topic.partition]);
              consumer.setOffset(topic.topic, topic.partition, min);
            });
          });
        
    },1000);

    3.counter.html

    <!DOCTYPE html>
    <html>
    
    <head>
        <title>socket</title>
    </head>
    
    <body>
    
    <div style="margin: 0 auto;" id='msg'>
    
    </div>
    </body>
    <script type="text/javascript" src="./node_modules/socket.io-client/dist/socket.io.js"></script>
    <script type="text/javascript">
        var socket=io.connect('localhost:8080'),//与服务器进行连接
            send=document.getElementById('sendBtn'),
            leave=document.getElementById('leaveBtn');
    
        //接收来自服务端的信息事件
        socket.on('server_counter',function(msg){
            var div =document.createElement('div');
            div.innerHTML=JSON.stringify(msg);
           document.getElementById('msg').appendChild(div);
        })
    </script>
    
    </html>

    3.效果展示

     启动 kafkatest.js

     打开http://localhost:8080/counter.html

     打开http://localhost:8080/send?msg=谢大神你好

      

  • 相关阅读:
    OI竞赛常见错误总结
    lis最长上升子序列o(nlogn)优化
    链表及其简单应用
    栈及其简单应用
    哈希表Hash:概念与基本操作
    队列及其简单应用
    poj1418 Viva Confetti 判断圆是否可见
    poj1981 Circle and Points 单位圆覆盖问题
    poj2187 Beauty Contest(旋转卡壳)
    poj2932 Coneology (扫描线)
  • 原文地址:https://www.cnblogs.com/linsu/p/10196865.html
Copyright © 2011-2022 走看看