从GitHub 找了源码包
WebSocketDemo_CSharpMVC-master.zip
源码是个是一个webchat缩减版,代码简洁, 有一点就是异步的发送接收,感觉需要深刻理解await async ,里边建好了 用户websocket字典,方便数据从服务器返回客户端数据.同时加了离线消息,这个感觉木大有卵用,还需进一步贯通代码
前端我是用在一个动态加载line chart 上,打开页面时先加载24小时内数据, 根据数据返回数据把设备记录list <string>为value,加用户名为key到字典中,设备有新数据到来时,通过设备字典找到设备,通过设备找到用户名,知道了用户名,就可以
查询用户字典中的链接对象,通过对象destSocket就可以 发送及时消息给当前用户.
用websocket await destSocket.SendAsync(buffer, WebSocketMessageType.Text, true, CancellationToken.None);
逻辑就这些, 从前端chart 到后台还是调试了一段时间
@{ ViewBag.Title = "Index"; Layout = "~/Views/Shared/_LayoutList.cshtml"; } <!--模糊搜索区域--> <div class="layui-row"> <form class="layui-form layui-col-md12 ok-search"> <div class="layui-input-inline"> <input class="layui-input" placeholder="请输入Imei" name="SourceDataIMEI" id="SourceDataIMEI" autocomplete="off"> </div> <div class="layui-input-inline"> <select id="caijidianselect" placeholder="请选择采集点"> <option value='' disabled selected style='display:none;'>请选择采集点</option> </select> </div> <button class='layui-btn ' lay-submit='' lay-filter='search'> <i class='layui-icon'></i>查询 </button> <button type='reset' id='reset' class='layui-btn layui-btn-primary'>重置</button> <label id="labelmess"></label> </form> </div> <script src="~/Content/Scripts/js/jquery.min.js"></script> <script src="~/Content/js/moment.min.js"></script> <script src="~/Content/js/Chart.js"></script> @*<script src="~/Content/js/utils.js"></script> *@ <div class="layui-row " width="1000"> <canvas id="myChart" width="500" height="250"></canvas> <br /> </div> <script> layui.use(["form"], function () { let form = layui.form; let $ = layui.$; //初始化选择框 initCaijiName(); form.on('submit(search)', function (data) { console.log(data); $.ajax({ url: '/AutoNetwork/ChartsLocations/GetSerachData', type: 'GET', data: { keyword: $('#SourceDataIMEI').val() , StartEndDate: $('#StartEndDate').val() , caijidian: $('#caijidianselect option:selected').val() }, success: function (data1) { console.log(data1); if (data1 == "") { if (config.data.datasets.length > 0) { // 清空 config.data.datasets.splice(0, config.data.datasets.length); window.myLine.update(); } alert("没有相关数据"); return false; } // console.log(data1); if (config.data.datasets.length > 0) { // 清空 config.data.datasets.splice(0, config.data.datasets.length); window.myLine.update(); } //根据数量新建datasets var termlist = data1.Terminal_lst; // console.log(termlist); for (var i = 0; i < termlist.length; i++) { var colorName = colorNames[config.data.datasets.length % colorNames.length] var newColor = window.chartColors[colorName]; var newDataset = { label: '终端:' + termlist[i].DataIMEI, borderColor: newColor, backgroundColor: color(newColor).alpha(0).rgbString(), data: [], }; var tvlst = termlist[i].TimeAndValue_lst for (var j = 0; j < tvlst.length; j++) { // console.log(tvlst[i]) newDataset.data.push(tvlst[j]); } // console.log(da); config.data.datasets.push(newDataset); window.myLine.update(); } } , error: function (err) { // console.log(err.statusText); console.log('异常'); } }); return false; }); }); //chart area var timeFormat = 'YYYY/MM/DD HH:mm:ss'; function newDate(days) { // console.log(moment().add(days, 'h').toDate()) return moment().add(days, 'h').toDate(); } function newDateString(days) { // console.log(moment().add(days, 'h').format(timeFormat)) return moment().add(days, 'h').format(timeFormat); } var color = Chart.helpers.color; var config = { type: 'line', data: { datasets: [ ] }, options: { title: { text: 'Chart.js Time Scale' }, scales: { xAxes: [{ type: 'time', time: { parser: timeFormat, // round: 'day', // quarter: 'MMM YYYY' tooltipFormat: 'll h:mm:ss a' }, scaleLabel: { display: true, labelString: 'Date' } }], yAxes: [{ scaleLabel: { display: true, labelString: 'value' } }] }, } }; window.onload = function () { var ctx = document.getElementById('myChart').getContext('2d'); window.myLine = new Chart(ctx, config); }; var colorNames = Object.keys({ red: 'rgb(255, 99, 132)', orange: 'rgb(255, 159, 64)', yellow: 'rgb(255, 205, 86)', green: 'rgb(75, 192, 192)', blue: 'rgb(54, 162, 235)', purple: 'rgb(153, 102, 255)', grey: 'rgb(201, 203, 207)', SaddleBrown: 'rgb(139 69 19)', DarkCyan: 'rgb(0 139 139)' }); window.chartColors = { red: 'rgb(255, 99, 132)', orange: 'rgb(255, 159, 64)', yellow: 'rgb(255, 205, 86)', green: 'rgb(75, 192, 192)', blue: 'rgb(54, 162, 235)', purple: 'rgb(153, 102, 255)', grey: 'rgb(201, 203, 207)' }; //链接服务器 conn(); var ws; //连接 function conn() { //var msg = document.getElementById('msg'); // var user = document.getElementById('user'); ws = new WebSocket('ws://' + window.location.hostname + ':' + window.location.port + '/RealTimeData/RealTimeData?username= @ViewBag.userName'); //ws = new WebSocket('ws://localhost:25356/Home/WSChat?user=' + $("#user").val()); console.log('正在连接'); // msg.innerHTML += '<p>正在连接</p>'; ws.onopen = function () { // msg.innerHTML += '<p>已经连接</p>'; console.log('已经连接'); //发送请求 获取数据 var content = "realtimedata"; send(content); } ws.onmessage = function (evt) { // msg.innerHTML += '<p>' + evt.data + '</p>'; data1 = evt.data; // console.log(data1); if (data1 == "") { if (config.data.datasets.length > 0) { // 清空 config.data.datasets.splice(0, config.data.datasets.length); window.myLine.update(); } var labelm = document.getElementById('labelmess'); labelm.innerHTML = '24小时内无数据'; // alert("没有相关数据"); return false; } // console.log(data1); @* if (config.data.datasets.length > 0) { // 清空 config.data.datasets.splice(0, config.data.datasets.length); window.myLine.update(); }*@ //根据数量新建datasets var termlist = JSON.parse(data1).Terminal_lst; // var termlist = data1.Terminal_lst; // console.log(termlist); for (var i = 0; i < termlist.length; i++) { //及时数据到来时,不破坏原图,在原有线上添加 if (config.data.datasets.length > 0) { for (var j = 0; j < config.data.datasets.length; j++) { if (config.data.datasets[j].label == '终端:' + termlist[i].DataIMEI) { var tvlst = termlist[i].TimeAndValue_lst for (var k = 0; k < tvlst.length; k++) { // console.log(tvlst[i]) config.data.datasets[j].data.push(tvlst[k]); } } } window.myLine.update(); return; } var colorName = colorNames[config.data.datasets.length % colorNames.length] var newColor = window.chartColors[colorName]; var newDataset = { label: '终端:' + termlist[i].DataIMEI, borderColor: newColor, backgroundColor: color(newColor).alpha(0).rgbString(), data: [], }; var tvlst = termlist[i].TimeAndValue_lst for (var j = 0; j < tvlst.length; j++) { // console.log(tvlst[i]) newDataset.data.push(tvlst[j]); } // console.log(da); config.data.datasets.push(newDataset); window.myLine.update(); } } ws.onerror = function (evt) { // msg.innerHTML += '<p>' + JSON.stringify(evt) + '</p>'; console.log(JSON.stringify(evt)) } ws.onclose = function () { console.log('已经关闭'); } } //关闭 function close() { ws.close(); } //发送 function send( content ) { var to = "@ViewBag.userName"; // var content = "realtimedata"; var msg = document.getElementById('msg'); if (ws.readyState == WebSocket.OPEN) { ws.send(to + "|" + content); } else { // msg.innerHTML = '<p>连接已经关闭</p>'; console.log('已经关闭'); } } //清空 function btn_clear() { var msg = document.getElementById('msg'); msg.innerHTML = ''; } function initCaijiName() { $.ajax({ type: 'GET', url: '/AutoNetwork/ChartsLocations/GetLocations', dataType: "json", success: function (data) { // console.log(data); if (data.length > 0) { var add = document.getElementById("caijidianselect"); for (var i = 0; i < data.length; i++) { var option = document.createElement("option"); option.value = data[i].LocationID; option.innerText = data[i].LocationName; add.append(option); layui.form.render('select') } } } }); } </script> <!DOCTYPE html> <html> <head> <meta http-equiv="Content-Type" content="text/html; charset=utf-8" /> <meta name="viewport" content="width=device-width, initial-scale=1.0, maximum-scale=1.0" /> <title>WebSocket Test</title> </head> <body> <div style="margin-top:50px;text-align:center"> <h1>WebSocket Test</h1> <input type="text" id="user" pattern="用户名称" /> <input type="button" id="conn" onclick="btn_conn()" value="连接" /> <input type="button" id="close" onclick="btn_close()" value="关闭" /><br /> <input type="text" id="content" pattern="内容" /> <input type="button" id="send" onclick="btn_send()" value="发送" /> <input type="button" id="clear" onclick="btn_clear()" value="清空" /><br /> <input type="text" id="to" /> :目标用户 <div id="msg"></div> </div> <script> var ws; //连接 function btn_conn() { var msg = document.getElementById('msg'); var user = document.getElementById('user'); ws = new WebSocket('ws://' + window.location.hostname + ':' + window.location.port + '/RealTimeData/RealTimeData'); //ws = new WebSocket('ws://localhost:25356/Home/WSChat?user=' + $("#user").val()); msg.innerHTML += '<p>正在连接</p>'; ws.onopen = function () { msg.innerHTML += '<p>已经连接</p>'; } ws.onmessage = function (evt) { msg.innerHTML += '<p>' + evt.data + '</p>'; } ws.onerror = function (evt) { msg.innerHTML += '<p>' + JSON.stringify(evt) + '</p>'; } ws.onclose = function () { msg.innerHTML += '<p>已经关闭</p>'; } } //关闭 function btn_close() { ws.close(); } //发送 function btn_send() { var to = "@ViewBag.userName"; var content = "realtimedata"; var msg = document.getElementById('msg'); if (ws.readyState == WebSocket.OPEN) { ws.send(to + "|" + content); } else { msg.innerHTML = '<p>连接已经关闭</p>'; } } //清空 function btn_clear() { var msg = document.getElementById('msg'); msg.innerHTML = ''; } </script> </body> </html>
using AutoNetwork.Services; using AutoNetwork.Web; using Newtonsoft.Json; using System; using System.Collections.Generic; using System.Net.WebSockets; using System.Text; using System.Threading; using System.Threading.Tasks; using System.Web; using System.Web.WebSockets; namespace AutoNetwork.Common { public class WebSocket { public WebSocket(){ //定义接收事件 //查询或者初始化时,把获取到的terminals 存入list,当终端设备有数据传来时,判断终端有没有在这list中,如果有则发送 //及时信息 } /// <summary> /// 用户连接池 /// </summary> public static Dictionary<string, System.Net.WebSockets.WebSocket> CONNECT_POOL = new Dictionary<string, System.Net.WebSockets.WebSocket>(); /// <summary> /// 离线消息池 /// </summary> private static Dictionary<string, List<MessageInfo>> MESSAGE_POOL = new Dictionary<string, List<MessageInfo>>(); //当前被查询的设备与用户 public static Dictionary<string, List<string>> Devices_POOL = new Dictionary<string, List<string>>(); /// <summary> /// 监听WebSocket /// </summary> /// <param name="context">HttpContext</param> /// <param name="userName">用户名称</param> /* public void Monitor(HttpContext context ) { if (context.IsWebSocketRequest) { context.AcceptWebSocketRequest((c) => ProcessChat(c,string userName="")); } }*/ /// <summary> /// 监听WebSocket /// </summary> /// <param name="context">HttpContext</param> /// <param name="userName">用户名称</param> public void Monitor(HttpContextBase context) { string userName = context.User.Identity.Name; if(userName=="") { return; } if (context.IsWebSocketRequest) { context.AcceptWebSocketRequest((c) => ProcessGet(c,userName)); } } /// <summary> /// 进行恢复 /// </summary> /// <param name="context">AspNetWebSocket上下文</param> /// <param name="userName">用户昵称</param> /// <returns></returns> public async Task ProcessGet(AspNetWebSocketContext context, string userName) { System.Net.WebSockets.WebSocket socket = context.WebSocket; try { #region 用户添加连接池 //第一次 Open 时,添加到连接池中 if (!CONNECT_POOL.ContainsKey(userName)) CONNECT_POOL.Add(userName, socket);//不存在,添加 else if (socket != CONNECT_POOL[userName])//当前对象不一致,更新 CONNECT_POOL[userName] = socket; #endregion 用户添加连接池 #region 离线消息处理 if (MESSAGE_POOL.ContainsKey(userName)) { List<MessageInfo> msgs = MESSAGE_POOL[userName]; foreach (MessageInfo item in msgs) { string msgTime = item.MsgTime.ToString("yyyy年MM月dd日HH:mm:ss"); string msgContent = Encoding.UTF8.GetString(item.MsgContent.Array); await socket.SendAsync(new ArraySegment<byte>(Encoding.UTF8.GetBytes("时间:" + msgTime + "内容:" + msgContent)), WebSocketMessageType.Text, true, CancellationToken.None); } MESSAGE_POOL.Remove(userName);//移除离线消息 } #endregion 离线消息处理 string descUser = string.Empty;//目的用户 while (true) { if (socket.State == WebSocketState.Open) { ArraySegment<byte> buffer = new ArraySegment<byte>(new byte[2048]); WebSocketReceiveResult result = await socket.ReceiveAsync(buffer, CancellationToken.None); #region 消息处理(字符截取、消息转发) try { #region 关闭Socket处理,删除连接池 if (socket.State != WebSocketState.Open)//连接关闭 { if (CONNECT_POOL.ContainsKey(userName)) CONNECT_POOL.Remove(userName);//删除连接池 break; } #endregion 关闭Socket处理,删除连接池 string userMsg = Encoding.UTF8.GetString(buffer.Array, 0, result.Count);//发送过来的消息 string[] msgList = userMsg.Split('|'); if (msgList.Length == 2) { if (msgList[0].Trim().Length > 0) descUser = msgList[0].Trim();//记录消息目的用户 string jsonre=""; //调用查询接口返回json数据 if (msgList[1].Trim() == "realtimedata") { List<string> devlist; jsonre = WebSocketService.GetWebSocketSerachData("","",out devlist); if(devlist.Count>0) { //第一次 Open 时,添加到连接池中 if (!Devices_POOL.ContainsKey(userName)) Devices_POOL.Add(userName, devlist);//不存在,添加 else if (devlist != Devices_POOL[userName])//当前对象不一致,更新 Devices_POOL[userName] = devlist; } //添加当前请求设备池 // jsonre= JsonConvert.SerializeObject(re); } buffer = new ArraySegment<byte>(Encoding.UTF8.GetBytes(jsonre)); if (CONNECT_POOL.ContainsKey(descUser))//判断客户端是否在线 { System.Net.WebSockets.WebSocket destSocket = CONNECT_POOL[descUser];//目的客户端 if (destSocket != null && destSocket.State == WebSocketState.Open) await destSocket.SendAsync(buffer, WebSocketMessageType.Text, true, CancellationToken.None); } else { await Task.Run(() => { if (!MESSAGE_POOL.ContainsKey(descUser))//将用户添加至离线消息池中 MESSAGE_POOL.Add(descUser, new List<MessageInfo>()); MESSAGE_POOL[descUser].Add(new MessageInfo(DateTime.Now, buffer));//添加离线消息 }); } } else { buffer = new ArraySegment<byte>(Encoding.UTF8.GetBytes(userMsg)); foreach (KeyValuePair<string, System.Net.WebSockets.WebSocket> item in CONNECT_POOL) { await item.Value.SendAsync(buffer, WebSocketMessageType.Text, true, CancellationToken.None); } } } catch (Exception exs) { //消息转发异常处理,本次消息忽略 继续监听接下来的消息 string message = exs.Message; } #endregion 消息处理(字符截取、消息转发) } else { break; } } } catch (Exception ex) { string message = ex.Message; //整体异常处理 if (CONNECT_POOL.ContainsKey(userName)) CONNECT_POOL.Remove(userName); } } public static void Send(string username, string result) { SendToClient(username, result); } /// <summary> /// 发送消息给客户端 /// </summary> /// <param name="username"></param> /// <returns></returns> public static async Task SendToClient(string username,string result) { ArraySegment<byte> buffer = new ArraySegment<byte>(new byte[2048]); System.Net.WebSockets.WebSocket destSocket = CONNECT_POOL[username]; if (destSocket != null && destSocket.State == WebSocketState.Open) { buffer = new ArraySegment<byte>(Encoding.UTF8.GetBytes(result)); await destSocket.SendAsync(buffer, WebSocketMessageType.Text, true, CancellationToken.None); } } } }
private void ProcessReceivedDataWebsocket(object networkConnectionParameter) { lock (lockProcessReceivedData) { Byte[] data = new byte[((NetConnParam)networkConnectionParameter).bdata.Length]; // if (debug) StoreLogData.Instance.Store("Received Data: " + BitConverter.ToString(bytes), System.DateTime.Now); NetworkStream stream = ((NetConnParam)networkConnectionParameter).stream; int portIn = ((NetConnParam)networkConnectionParameter).portIn; IPAddress ipAddressIn = ((NetConnParam)networkConnectionParameter).ipAddressIn; string sdata = ((NetConnParam)networkConnectionParameter).sdata; Array.Copy(((NetConnParam)networkConnectionParameter).bdata, 0, data, 0, ((NetConnParam)networkConnectionParameter).bdata.Length); //假使 收到了terminal 850001245 温度值36 时间现在 在此包装成前端需要 ,调用前端用户在websocket 用户字典中的 值,查看当前用户,如果都存在 //此终端,则调用用户通信字典,给每个用户发送数值信息. //1: 查询字典 有无此设备dev string devimei = "850001245"; if(WebSocket.Devices_POOL.Count>0) { foreach (KeyValuePair<string, List<string>> kvp in WebSocket.Devices_POOL) { for (int i = 0; i < kvp.Value.Count; i++) { //如果设备imei 在此表中,则找到 用户名,给此用户名更新信息 if(kvp.Value[i]== devimei) { //判断用户是否在线 if (WebSocket.CONNECT_POOL.ContainsKey(kvp.Key)) { //如果在线 打包 发送指令 TerminalsDataChartModel tdcm = new TerminalsDataChartModel(); List<TerminalData> ltd = new List<TerminalData>(); TerminalData td = new TerminalData(); td.DataIMEI = "850001245"; TimeAndValue tav = new TimeAndValue(); // tav.x = DateTime.Now.ToUniversalTime().ToString(); //item["DataCreateOn"].ToString(); //item.DataCreateOn.ToString(); tav.x = DateTime.Now.ToString("yyyy/MM/dd HH:mm:ss"); tav.y = int.Parse(sdata);//.DataValue; td.TimeAndValue_lst.Add(tav); ltd.Add(td); tdcm.Terminal_lst = ltd; string result= JsonConvert.SerializeObject(tdcm); //发送 WebSocket.Send(kvp.Key, result); } } } } } } }