using System; using System.Collections.Generic; using System.Data; using System.IO; using System.Linq; using System.Net; using System.Net.Sockets; using System.Runtime.CompilerServices; using System.Text; using System.Text.RegularExpressions; using System.Threading; using System.Threading.Tasks; using DBContext; namespace WatchServer { public static class OnLineUserCollection { private static IList<EngineUser> _onlineUserList; public static int Count { get { return OnlineUserList.Count; } } public static IList<EngineUser> OnlineUserList { get { return _onlineUserList ?? (_onlineUserList = new List<EngineUser>()); } set { _onlineUserList = value; } } } interface IEngineUser { void SendMessage(String message); } public class EngineUser : IEngineUser { public string watchId = "Anonymous"; public TcpClient Client; public StreamWriter Sw; public StreamReader Sr; public bool Active { get { return this.Client.Connected; } } public EngineUser(TcpClient client) { this.Client = client; NetworkStream netStream = client.GetStream(); Sw = new StreamWriter(netStream, Encoding.UTF8); Sr = new StreamReader(netStream, Encoding.UTF8); } public void SendMessage(String message) { if (Sw != null) { Sw.WriteLine(message); Sw.Flush(); } } } public class AnalysizeEngine : IDisposable { #region 多线程 singleton private static AnalysizeEngine _instance; private static readonly object LockObject = new object(); public static AnalysizeEngine GetInstance() { if (_instance == null) { lock (LockObject) { if (_instance == null) _instance = new AnalysizeEngine(); } } return _instance; } #endregion #region 私有变量 将engine中的线程全部用变量声明,保存在这里,退出的时候,要全部结束! private TcpListener _listener; //todo:up not implemented #endregion #region 构造函数 private AnalysizeEngine() : this("tcp engine") { } private AnalysizeEngine(String name) { } #endregion #region 属性事件 public event HandleOverActionHandler Handle; public delegate void HandleOverActionHandler(String message); public event Action<String> OnDisconnect; #endregion #region 成员方法 private IPAddress ResolveIp(IPAddress[] addrIP) { var ipAddressRegex = new Regex(@"((?:(?:25[0-5]|2[0-4]d|((1d{2})|([1-9]?d))).){3}(?:25[0-5]|2[0-4]d|((1d{2})|([1-9]?d))))"); IPAddress returnValue = null; addrIP.ToList().ForEach(t => { if (returnValue == null) returnValue = ipAddressRegex.IsMatch(t.ToString()) ? t : null; }); returnValue = returnValue ?? IPAddress.Parse(ConfigHelper.Get("localIp")); return returnValue; } #endregion #region 操作方法 public AnalysizeEngine Start() { IPAddress ip = ResolveIp(Dns.GetHostAddresses(Dns.GetHostName())); try { if (_listener == null) _listener = new TcpListener(ip, Int32.Parse(ConfigHelper.Get("localPort"))); _listener.Start(); } catch (Exception ex) { LogHelper.Error(ex); } var mainTask = Task.Factory.StartNew(() => GetInstance().Process()); var msgDistributeTask = Task.Factory.StartNew(() => GetInstance().DistributeMessage()); return this; } private void DistributeMessage() { while (true) { try { if (OnLineUserCollection.OnlineUserList.Count <= 0) break; OnLineUserCollection.OnlineUserList.ToList().ForEach(a => { var commandsToSend = GetCommandsToSendByWatchId(a.watchId); if (commandsToSend.Count > 0) commandsToSend.ForEach(b => { a.SendMessage(b.CommandText); b.ExecuteResult = (int) WatchCommandExecuteEnum .ExecuteSuccess; using (var context = DataContext.Context) { context.Attach(b); context.ObjectStateManager.ChangeObjectState(b, EntityState.Modified); //refresh online status context.W_User.Select(e => e.WatchId).ToList().ForEach(c => { UserOnlineStatus status = OnLineUserCollection.OnlineUserList.Select(d => d.watchId).ToArray().Contains(c) ? UserOnlineStatus.Online : UserOnlineStatus.Offline; SetWatchStatusByWatchId(c, status); }); context.SaveChanges(); } //UpdateWatchOnlineStatusByWatchId(a.watchId); }); }); Thread.Sleep(5000); } catch (Exception ex) { LogHelper.Error(ex); } } } /// <summary> /// when command is sent,ie reboot or shutoff,update alternate status ,alternative states are not implied yet. /// </summary> /// <param name="watchId"></param> private void UpdateWatchOnlineStatusByWatchId(string watchId) { var watchStatus = (int)UserOnlineStatus.Offline; using (var context = DataContext.Context) { IList<W_LocationData> commandList = context.W_LocationData.Top("2").Where(a => a.WatchId == watchId).ToList(); var watch = context.W_User.First(a => a.WatchId == watchId); if (watch == null) throw new Exception("cannot find specified watchid,watch id null"); } } private List<W_Command> GetCommandsToSendByWatchId(string watchId) { using (var context = DataContext.Context) { IEnumerable<W_Command> commands = context.W_Command.Where( a => a.WatchId == watchId && a.ExecuteResult == (int)WatchCommandExecuteEnum.NotExecuted ); return commands.ToList(); } } public void Process() { while (true) { TcpClient guest; try { guest = _listener.AcceptTcpClient(); EngineUser u = new EngineUser(guest); Task.Factory.StartNew(f => { EngineUser client = f as EngineUser; if (client == null) return; while (client != null && client.Active) { try { if (client.Client.GetStream().CanRead) { string receiveString = ""; byte[] buffer = new byte[int.Parse(ConfigHelper.Get("bufferSize"))]; int i = client.Client.GetStream().Read(buffer, 0, buffer.Length); receiveString = Encoding.UTF8.GetString(buffer, 0, i); if (!client.Client.GetStream().DataAvailable) { if (receiveString == "exit") break; if (this.Handle != null && receiveString != "") Handle(receiveString); if (receiveString != "") { var analysizer = new ReceiveDataAnalysizer(receiveString); analysizer.Analysize(); client.watchId = analysizer.AnalysizeResult; if (!string.IsNullOrEmpty(client.watchId)) { LogHelper.Info( OnLineUserCollection.OnlineUserList.Any( a => a.watchId == client.watchId) ? DateTime.Now + client.watchId + "conneted" : null); SetWatchStatusByWatchId(client.watchId, UserOnlineStatus.Online); OnLineUserCollection.OnlineUserList.Add(OnLineUserCollection .OnlineUserList.Any( a => a.watchId == client.watchId) ? null : u); } } if (receiveString == "") { OnLineUserCollection.OnlineUserList.Remove(client); LogHelper.Info(client.watchId + " disconnected"); SetWatchStatusByWatchId(client.watchId, UserOnlineStatus.Offline); if (OnDisconnect != null) OnDisconnect(client.watchId); break; } } } } catch (Exception ex) { LogHelper.Error(ex); } } }, u); } catch { break; } } } private static void SetWatchStatusByWatchId(String watchId, UserOnlineStatus status) { using (var context = DataContext.Context) { var model = context.W_User.FirstOrDefault(a => a.WatchId == watchId); if (model != null) model.Status = (int)status; context.SaveChanges(); } } public AnalysizeEngine Stop() { //here we close all online connections if (OnLineUserCollection.OnlineUserList.Count > 0) OnLineUserCollection.OnlineUserList.ToList().ForEach(t => { if (t.Client != null && t.Active) t.Client.Close(); }); if (Handle != null) Handle.GetInvocationList().ToList().ForEach(a => { Handle -= a as HandleOverActionHandler; }); if (OnDisconnect != null) OnDisconnect.GetInvocationList().ToList().ForEach(a => { OnDisconnect -= a as Action<string>; }); if (_listener != null) _listener.Stop(); return this; } #endregion #region IDisposable 成员 public void Dispose() { Handle = null; OnDisconnect = null; _listener.Stop(); _instance = null; _listener = null; } #endregion } }