1.安装需要插件
npm install express
npm install socket.io
npm install kafka-node
2.kafkatest.js文件
var express = require('express'); var app = express(); var server = require('http').createServer(app); var io = require('socket.io')(server); var kafka = require('kafka-node'); var users = []; app.use('/', express.static(__dirname + '/')); app.get('/send', function (req, res) { var msg=req.query.msg; var Producer = kafka.Producer, client = new kafka.Client('localhost:2181'), producer = new Producer(client); payloads = [ { topic: 'test', messages: msg, partition: 0 }, ]; producer.on('ready', function(){ producer.send(payloads, function(err, data){ console.log(data); // socket.emit('server_counter',data); }); }); producer.on('error', function(err){}) res.send('输入消息='+msg); }) server.listen(8080); setTimeout(function(){ var Consumer = kafka.Consumer; var Offset = kafka.Offset; var topic = 'test'; var client = new kafka.Client('localhost:2181');//'localhost:2181' var topics = [{ topic: topic, partition: 0 }]; var options = { autoCommit: false };//, fetchMaxWaitMs: 1000, fetchMaxBytes: 1024 * 1024 var consumer = new Consumer(client, topics, options); var offset = new Offset(client); consumer.on('message', function (message) { console.log(message); io.sockets.emit('server_counter',message); }); consumer.on('error', function (err) { console.log('error', err); }); consumer.on('offsetOutOfRange', function (topic) { topic.maxNum = 2; offset.fetch([topic], function (err, offsets) { if (err) { return console.error(err); } var min = Math.min.apply(null, offsets[topic.topic][topic.partition]); consumer.setOffset(topic.topic, topic.partition, min); }); }); },1000);
3.counter.html
<!DOCTYPE html> <html> <head> <title>socket</title> </head> <body> <div style="margin: 0 auto;" id='msg'> </div> </body> <script type="text/javascript" src="./node_modules/socket.io-client/dist/socket.io.js"></script> <script type="text/javascript"> var socket=io.connect('localhost:8080'),//与服务器进行连接 send=document.getElementById('sendBtn'), leave=document.getElementById('leaveBtn'); //接收来自服务端的信息事件 socket.on('server_counter',function(msg){ var div =document.createElement('div'); div.innerHTML=JSON.stringify(msg); document.getElementById('msg').appendChild(div); }) </script> </html>
3.效果展示
启动 kafkatest.js
打开http://localhost:8080/counter.html
打开http://localhost:8080/send?msg=谢大神你好