引用: https://www.jianshu.com/p/69657b7dbd62
MQTT ACL
1、ACL 访问控制列表
a、用户控制
b、发布控制
c、订阅控制
var mosca = require("mosca"); var auth = new mosca.Authorizer(); // 创建ACL对象 var server = new mosca.Server({ http: { port: 3000, bundle: true, static: './' } }); server.on('ready', function(){ console.log('mqtt server started'); server.authenticate = auth.authenticate; //创建ACL 用户列表 server.authorizePublish = auth.authorizePublish;//创建ACL 发布列表 server.authorizeSubscribe = auth.authorizeSubscribe;//创建ACL 订阅列表 // 添加用户,并且只能发布presence和订阅presence消息 auth.addUser('yy','123','presence', 'presence',function(error){ if(error){ console.log('auth add user error:' + error); }else{ console.log("auth add user yy success"); } }) }); server.on('published', function(packet, client){ console.log('Published: ', packet.payload); }) server.on('subscribed', function(topic, client){ console.log('subscribed: ', topic); }); server.on('unSubscribed', function(topic, client){ console.log('unSubscribed: ', topic); }) server.on('clientConnected', function(client){ console.log('client connected: ', client.id); }); server.on('clientDisConnected', function(client){ console.log('client disConnected: ' + client.id + " userNumber:" + usermap.keys.length); });
客户端代码
var mqtt = require('mqtt'); var client = mqtt.connect('mqtt://127.0.0.1:1883',{ username: "yy", password: '123' }); client.on('connect', function () { client.subscribe('presence'); client.publish('presence', 'Hello mqtt'); }); client.on('message', function (topic, message) { // message is Buffer console.log(message.toString()); client.end(); });
//控制台,只有订阅和发布presence 的消息,没有node的消息 subscribed: presence Published: {"clientId":"mqttjs_97f6e502","topic":"presence"} Published: <Buffer 48 65 6c 6c 6f 20 6d 71 74 74> Published: {"clientId":"mqttjs_97f6e502","topic":"presence"} Published: mqttjs_97f6e502
自定义访问控制列表
var mosca = require("mosca"); // 权限控制更加灵活 var users = [{ userId: 1, username:'yy1', password:'123', publishTopics:['abc', 'abc/e'], subscribeTopics:['abc', 'text'] }]; var usermap = new Map(); var authenticate = function(client, username, password, callback){ console.log("client: " + client + " username: " + username + " password:" + password ); var user = users.find(function(data){ console.log(data.toString()); if(username == data.username && password == data.password) { return data; } }) if(user){ console.log("用户验证成功"); usermap.set(client.id, { userId: user.userId, publishTopics: user.publishTopics, subscribeTopics: user.subscribeTopics }); callback(null, true); }else{ console.log("用户验证成功"); callback(null, false); } } var authorizePublish = function(client, topic, payload, callback){ console.log("authorizePublish: " + client + " topic: " + topic + " payload:" + payload ); var user = usermap.get(client.id); if(!user){ console.log('canot find user'); return; } if(user.publishTopics.indexOf(topic) < 0){ console.log('没有找到该主题: ' + topic); callback(null, false); }else{ console.log('找到该主题: ' + topic); callback(null, true); } } var authorizeSubscribe = function(client, topic, callback){ console.log("authorizeSubscribe: " + client + " topic: " + topic ); var user = usermap.get(client.id); if(!user){ console.log('canot find user'); return; } if(user.subscribeTopics.indexOf(topic) < 0){ console.log('订阅: 没有找到该主题: ' + topic); callback(null, false); }else{ console.log('订阅: 找到该主题: ' + topic); callback(null, true); } } var settings = { http: { bundle:true } } var server = new mosca.Server(settings); server.on('ready', function(){ console.log('mqtt server started'); // 自定义权限列表 server.authenticate = authenticate; server.authorizePublish = authorizePublish; server.authorizeSubscribe = authorizeSubscribe; console.log('auth ready'); }); server.on('published', function(packet, client){ console.log('YYPublished: ', packet.payload); }) server.on('subscribed', function(topic, client){ console.log('subscribed: ', topic); }); server.on('unSubscribed', function(topic, client){ console.log('unSubscribed: ', topic); }) server.on('clientConnected', function(client){ console.log('client connected: ', client.id); }); server.on('clientDisConnected', function(client){ usermap.delete(client.id); console.log('client disConnected: ' + client.id + " userNumber:" + usermap.keys.length); });
client.js
var mqtt =require('mqtt'); var client = mqtt.connect('mqtt://localhost:1883',{ username:'yy1', password:'123' }) client.on('connect', function () { client.subscribe('abc'); // 主题 消息 内功 client.publish('abc', 'Hello node1/node2/node3'); client.subscribe('text111'); }) client.on('message', function (topic, message) { // message is Buffer console.log('receive message: ' + topic); console.log(message.toString()) client.end() }) client.on('reconnect', function(){ console.log('reconnect'); }) client.on('offline', function(){ console.log('offline'); })
本质上就是自己实现授权部分的接口。具体的接口实现可以参考mosca的author。
Authorizer.js
Authorizer.prototype._authenticate = function(client, user, pass, cb) { var missingUser = !user || !pass || !this.users[user]; if (missingUser) { cb(null, false); return; } user = user.toString(); client.user = user; user = this.users[user]; hasher({ password: pass.toString(), salt: user.salt }, function(err, pass, salt, hash) { if (err) { cb(err); return; } var success = (user.hash === hash); cb(null, success); }); }; /** * An utility function to add an user. * * @api public * @param {String} user The username * @param {String} pass The password * @param {String} authorizePublish The authorizePublish pattern * (optional) * @param {String} authorizeSubscribe The authorizeSubscribe pattern * (optional) * @param {Function} cb The callback that will be called after the * insertion. */ Authorizer.prototype.addUser = function(user, pass, authorizePublish, authorizeSubscribe, cb) { var that = this; if (typeof authorizePublish === "function") { cb = authorizePublish; authorizePublish = null; authorizeSubscribe = null; } else if (typeof authorizeSubscribe == "function") { cb = authorizeSubscribe; authorizeSubscribe = null; } if (!authorizePublish) { authorizePublish = defaultGlob; } if (!authorizeSubscribe) { authorizeSubscribe = defaultGlob; } hasher({ password: pass.toString() }, function(err, pass, salt, hash) { if (!err) { that.users[user] = { salt: salt, hash: hash, authorizePublish: authorizePublish, authorizeSubscribe: authorizeSubscribe }; } cb(err); }); return this; }; /** * An utility function to delete a user. * * @api public * @param {String} user The username * @param {String} pass The password * @param {Function} cb The callback that will be called after the * deletion. */ Authorizer.prototype.rmUser = function(user, cb) { delete this.users[user]; cb(); return this; };