Program.cs
using System; using System.Text; using CommonLinkLibrary.Util; using Newtonsoft.Json; namespace SSDB { class Program { static void Main(string[] args) { InitSsdb(); var channelName = "PersonUpdateLog"; var channelNameKeys = channelName + "_Keys"; var ssdb = ConnectionPool.GetSocket(); ssdb.zclear(channelName); ssdb.del(channelNameKeys); Console.WriteLine("清空"+channelName+"成功!"); for (var i = 1; i <= 100; i++) { var pk = GetPrimaryKey(ssdb, channelNameKeys); var myBean = new PersonInfoLogBean { Action = "新增", NewPersonName = "黄海" + i, OldPersonName = "黄海" + i, PersonId = i.ToString(), OldAge = i.ToString(), NewAge = i.ToString() }; var msg = JsonConvert.SerializeObject(myBean); ssdb.zset(Encoding.UTF8.GetBytes(channelName), Encoding.UTF8.GetBytes(msg), pk); } for (var i = 1; i <= 100; i++) { var pk = GetPrimaryKey(ssdb, channelNameKeys); var myBean = new PersonInfoLogBean { Action = "修改", NewPersonName = "(新)黄海" + i, OldPersonName = "(旧)黄海" + i, PersonId = i.ToString(), OldAge = i.ToString(), NewAge = i.ToString() }; var msg = JsonConvert.SerializeObject(myBean); ssdb.zset(Encoding.UTF8.GetBytes(channelName), Encoding.UTF8.GetBytes(msg), pk); } for (var i = 1; i <= 100; i++) { var pk = GetPrimaryKey(ssdb, channelNameKeys); var myBean = new PersonInfoLogBean { Action = "删除", NewPersonName = "(新)黄海" + i, OldPersonName = "(旧)黄海" + i, PersonId = i.ToString(), OldAge = i.ToString(), NewAge = i.ToString() }; var msg = JsonConvert.SerializeObject(myBean); ssdb.zset(Encoding.UTF8.GetBytes(channelName), Encoding.UTF8.GetBytes(msg), pk); } Console.WriteLine("保存成功!"); const int size = 10; var startId = 0; while (true) { var keys = ssdb.zscan(channelName, "", startId, 999999999999, size); for (var i = 0; i < keys.Length; i++) { Console.WriteLine(keys[i].Key + " " + keys[i].Value); } if (size != keys.Length) { break; } else { startId = startId + size; } } ConnectionPool.PutSocket(ssdb); Console.ReadKey(); } public static void InitSsdb() { //连接池的初始化工作 var serverIp = "10.10.6.199"; ConnectionPool.InitializeConnectionPool(serverIp, 8888, 20, 100); } public static long GetPrimaryKey(SsdbClient ssdb, string primaryKeyName) { return ssdb.incr(Encoding.UTF8.GetBytes(primaryKeyName), 1); } } }
ConnectionPool.cs
using System; using System.Collections.Generic; using System.Threading; namespace CommonLinkLibrary.Util { public static class ConnectionPool { /// <summary> /// Queue of available socket connections. /// </summary> private static Queue<SsdbClient> _availableSockets; /// <summary> /// The maximum size of the connection pool. /// </summary> private static int _poolMaxSize = 20; private static string _hostIpAddress; private static int _hostPortNumber; /// <summary> /// Created host Connection counter /// </summary> private static int _socketCounter; public static bool Initialized; /// <summary> /// Initialize host Connection pool /// </summary> /// <param name="hostPortNumber"></param> /// <param name="minConnections">Initial number of connections</param> /// <param name="maxConnections">The maximum size of the connection pool</param> /// <param name="hostIpAddress"></param> public static void InitializeConnectionPool(string hostIpAddress, int hostPortNumber, int minConnections, int maxConnections) { _socketCounter = 0; _poolMaxSize = maxConnections; _hostIpAddress = hostIpAddress; _hostPortNumber = hostPortNumber; _availableSockets = new Queue<SsdbClient>(); for (var i = 0; i < minConnections; i++) { var cachedSocket = OpenSocket(hostIpAddress, hostPortNumber); PutSocket(cachedSocket); } Initialized = true; } /// <summary> /// Get an open socket from the connection pool. /// </summary> /// <returns>Socket returned from the pool or new socket opened. </returns> public static SsdbClient GetSocket() { //如果连接池中还有可用的连接,那么调用 if (_availableSockets.Count > 0) { lock (_availableSockets) { SsdbClient socket = null; while (_availableSockets.Count > 0) { socket = _availableSockets.Dequeue(); if (socket.Connected) { return socket; } socket.Close(); Interlocked.Decrement(ref _socketCounter); } } } //如果没有可用的连接,那么新打开一个连接 return OpenSocket(_hostIpAddress, _hostPortNumber); } /// <summary> /// Return the given socket back to the socket pool. /// </summary> /// <param name="socket">Socket connection to return.</param> public static void PutSocket(SsdbClient socket) { lock (_availableSockets) { if (_availableSockets.Count < _poolMaxSize) // Configuration Value { if (socket != null) { if (socket.Connected) { _availableSockets.Enqueue(socket); } else { socket.Close(); } } } else { socket.Close(); } } } /// <summary> /// Open a new socket connection. /// </summary> /// <returns>Newly opened socket connection.</returns> private static SsdbClient OpenSocket(string hostIpAddress, int hostPortNumber) { if (_socketCounter < _poolMaxSize) { Interlocked.Increment(ref _socketCounter); var client = new SsdbClient(hostIpAddress, hostPortNumber); return client; } throw new Exception("Connection Pool reached its limit"); } } }
Link.cs
using System; using System.Collections.Generic; using System.IO; using System.Net.Sockets; using System.Text; namespace CommonLinkLibrary.Util { internal class Link : IDisposable { private MemoryStream _recvBuf = new MemoryStream(8*1024); private TcpClient _sock; //供程序员显式调用的Dispose方法 public void Dispose() { _recvBuf.Dispose(); close(); } //protected的Dispose方法,保证不会被外部调用。 //传入bool值disposing以确定是否释放托管资源 //供GC调用的析构函数 ~Link() { close(); } public Link(string host, int port) { _sock = new TcpClient(host, port) {NoDelay = true}; _sock.Client.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.KeepAlive, true); } public void close() { if (_sock != null) { _sock.Close(); } _sock = null; } public List<byte[]> request(string cmd, params string[] args) { var req = new List<byte[]>(1 + args.Length) {Encoding.UTF8.GetBytes(cmd)}; foreach (var s in args) { req.Add(Encoding.UTF8.GetBytes(s)); } return request(req); } public List<byte[]> request(string cmd, params byte[][] args) { var req = new List<byte[]>(1 + args.Length) {Encoding.UTF8.GetBytes(cmd)}; req.AddRange(args); return request(req); } public List<byte[]> request(List<byte[]> req) { var buf = new MemoryStream(); foreach (var p in req) { if(p!=null) { var len = Encoding.UTF8.GetBytes(p.Length.ToString()); buf.Write(len, 0, len.Length); buf.WriteByte((byte)' '); buf.Write(p, 0, p.Length); buf.WriteByte((byte)' '); } } buf.WriteByte((byte) ' '); var bs = buf.GetBuffer(); _sock.GetStream().Write(bs, 0, (int) buf.Length); //Console.Write(Encoding.UTF8.GetString(bs, 0, (int)buf.Length)); return recv(); } private List<byte[]> recv() { while (true) { var ret = parse(); if (ret != null) { return ret; } var bs = new byte[8192]; var len = _sock.GetStream().Read(bs, 0, bs.Length); //Console.WriteLine("<< " + Encoding.UTF8.GetString(bs)); _recvBuf.Write(bs, 0, len); } } private static int memchr(byte[] bs, byte b, int offset) { for (var i = offset; i < bs.Length; i++) { if (bs[i] == b) { return i; } } return -1; } private List<byte[]> parse() { var list = new List<byte[]>(); var buf = _recvBuf.GetBuffer(); var idx = 0; while (true) { var pos = memchr(buf, (byte) ' ', idx); //System.out.println("pos: " + pos + " idx: " + idx); if (pos == -1) { break; } if (pos == idx || (pos == idx + 1 && buf[idx] == ' ')) { idx += 1; // if ' ', next time will skip ' ' // ignore empty leading lines if (list.Count == 0) { continue; } var left = (int) _recvBuf.Length - idx; _recvBuf = new MemoryStream(8192); if (left > 0) { _recvBuf.Write(buf, idx, left); } return list; } var lens = new byte[pos - idx]; Array.Copy(buf, idx, lens, 0, lens.Length); var len = int.Parse(Encoding.UTF8.GetString(lens)); idx = pos + 1; if (idx + len >= _recvBuf.Length) { break; } var data = new byte[len]; Array.Copy(buf, idx, data, 0, data.Length); //Console.WriteLine("len: " + len + " data: " + Encoding.UTF8.GetString(data)); idx += len + 1; // skip ' ' list.Add(data); } return null; } } }
PersonInfoLogBean.cs
namespace SSDB { class PersonInfoLogBean { public string Action { get; set; } public string PersonId { get; set; } public string OldPersonName { get; set; } public string NewPersonName { get; set; } public string OldAge { get; set; } public string NewAge { get; set; } } }
SsdbClient.cs
using System; using System.Collections.Generic; using System.Net.Sockets; using System.Text; namespace CommonLinkLibrary.Util { public class SsdbClient : TcpClient { private readonly Link _link; private string _respCode; public SsdbClient(string hostIpAddress, int hostPortNumber) : base(hostIpAddress, hostPortNumber) { _link = new Link(hostIpAddress, hostPortNumber); } public List<byte[]> request(string cmd, params string[] args) { return _link.request(cmd, args); } public List<byte[]> request(string cmd, params byte[][] args) { return _link.request(cmd, args); } public List<byte[]> request(List<byte[]> req) { return _link.request(req); } private byte[] _bytes(string s) { if (s == null) return null; return Encoding.UTF8.GetBytes(s); } private string _string(byte[] bs) { return Encoding.UTF8.GetString(bs); } private KeyValuePair<string, byte[]>[] parse_scan_resp(List<byte[]> resp) { _respCode = _string(resp[0]); var size = (resp.Count - 1)/2; var kvs = new KeyValuePair<string, byte[]>[size]; for (var i = 0; i < size; i += 1) { var key = _string(resp[i*2 + 1]); var val = resp[i*2 + 2]; kvs[i] = new KeyValuePair<string, byte[]>(key, val); } return kvs; } /***** kv *****/ public bool exists(byte[] key) { var resp = request("exists", key); _respCode = _string(resp[0]); if (_respCode == "not_found") { return false; } if (resp.Count != 2) { throw new Exception("Bad response!"); } return (_string(resp[1]) == "1" ? true : false); } public bool exists(string key) { return exists(_bytes(key)); } public void set(byte[] key, byte[] val) { var resp = request("set", key, val); _respCode = _string(resp[0]); } public void set(string key, string val) { set(_bytes(key), _bytes(val)); } /// <summary> /// </summary> /// <param name="key"></param> /// <param name="val"></param> /// <returns>returns true if name.key is found, otherwise returns false.</returns> public bool get(byte[] key, out byte[] val) { val = null; var resp = request("get", key); _respCode = _string(resp[0]); if (_respCode == "not_found") { return false; } if (resp.Count != 2) { throw new Exception("Bad response!"); } val = resp[1]; return true; } public bool get(string key, out byte[] val) { return get(_bytes(key), out val); } public bool get(string key, out string val) { val = null; byte[] bs; if (!get(key, out bs)) { return false; } val = _string(bs); return true; } public void del(byte[] key) { var resp = request("del", key); _respCode = _string(resp[0]); } public void del(string key) { del(_bytes(key)); } public KeyValuePair<string, byte[]>[] scan(string key_start, string key_end, long limit) { var resp = request("scan", key_start, key_end, limit.ToString()); return parse_scan_resp(resp); } public KeyValuePair<string, byte[]>[] rscan(string key_start, string key_end, long limit) { var resp = request("rscan", key_start, key_end, limit.ToString()); return parse_scan_resp(resp); } /***** hash *****/ public void hset(byte[] name, byte[] key, byte[] val) { var resp = request("hset", name, key, val); _respCode = _string(resp[0]); } public void hset(string name, string key, byte[] val) { hset(_bytes(name), _bytes(key), val); } public void hset(string name, string key, string val) { hset(_bytes(name), _bytes(key), _bytes(val)); } /// <summary> /// </summary> /// <param name="name"></param> /// <param name="key"></param> /// <param name="val"></param> /// <returns>returns true if name.key is found, otherwise returns false.</returns> public bool hget(byte[] name, byte[] key, out byte[] val) { val = null; var resp = request("hget", name, key); _respCode = _string(resp[0]); if (_respCode == "not_found") { return false; } if (resp.Count != 2) { throw new Exception("Bad response!"); } val = resp[1]; return true; } public bool hget(string name, string key, out byte[] val) { return hget(_bytes(name), _bytes(key), out val); } public bool hget(string name, string key, out string val) { val = null; byte[] bs; if (!hget(name, key, out bs)) { return false; } val = _string(bs); return true; } public void hdel(byte[] name, byte[] key) { var resp = request("hdel", name, key); _respCode = _string(resp[0]); } public void hdel(string name, string key) { hdel(_bytes(name), _bytes(key)); } public bool hexists(byte[] name, byte[] key) { var resp = request("hexists", name, key); _respCode = _string(resp[0]); if (_respCode == "not_found") { return false; } if (resp.Count != 2) { throw new Exception("Bad response!"); } return (_string(resp[1]) == "1" ? true : false); } public bool hexists(string name, string key) { return hexists(_bytes(name), _bytes(key)); } public long hsize(byte[] name) { var resp = request("hsize", name); _respCode = _string(resp[0]); if (resp.Count != 2) { throw new Exception("Bad response!"); } return long.Parse(_string(resp[1])); } public void hclear(string name) { request("hclear", _bytes(name)); } public void zclear(string name) { request("zclear", _bytes(name)); } public long hsize(string name) { return hsize(_bytes(name)); } public KeyValuePair<string, byte[]>[] hscan(string name, string key_start, string key_end, long limit) { var resp = request("hscan", name, key_start, key_end, limit.ToString()); return parse_scan_resp(resp); } public KeyValuePair<string, byte[]>[] hrscan(string name, string key_start, string key_end, long limit) { var resp = request("hrscan", name, key_start, key_end, limit.ToString()); return parse_scan_resp(resp); } public void multi_hset(byte[] name, KeyValuePair<byte[], byte[]>[] kvs) { var req = new byte[(kvs.Length*2) + 1][]; req[0] = name; for (var i = 0; i < kvs.Length; i++) { req[(2*i) + 1] = kvs[i].Key; req[(2*i) + 2] = kvs[i].Value; } var resp = request("multi_hset", req); _respCode = _string(resp[0]); } public void multi_hset(string name, KeyValuePair<string, string>[] kvs) { var req = new KeyValuePair<byte[], byte[]>[kvs.Length]; for (var i = 0; i < kvs.Length; i++) { req[i] = new KeyValuePair<byte[], byte[]>(_bytes(kvs[i].Key), _bytes(kvs[i].Value)); } multi_hset(_bytes(name), req); } public void multi_hdel(byte[] name, byte[][] keys) { var req = new byte[keys.Length + 1][]; req[0] = name; for (var i = 0; i < keys.Length; i++) { req[i + 1] = keys[i]; } var resp = request("multi_hdel", req); _respCode = _string(resp[0]); } public void multi_hdel(string name, string[] keys) { var req = new byte[keys.Length][]; for (var i = 0; i < keys.Length; i++) { req[i] = _bytes(keys[i]); } multi_hdel(_bytes(name), req); } public KeyValuePair<string, byte[]>[] multi_hget(byte[] name, byte[][] keys) { var req = new byte[keys.Length + 1][]; req[0] = name; for (var i = 0; i < keys.Length; i++) { req[i + 1] = keys[i]; } var resp = request("multi_hget", req); var ret = parse_scan_resp(resp); return ret; } public KeyValuePair<string, byte[]>[] multi_hget(string name, string[] keys) { var req = new byte[keys.Length][]; for (var i = 0; i < keys.Length; i++) { req[i] = _bytes(keys[i]); } return multi_hget(_bytes(name), req); } /***** zset *****/ public void zset(byte[] name, byte[] key, long score) { var resp = request("zset", name, key, _bytes(score.ToString())); _respCode = _string(resp[0]); } public void zset(string name, string key, long score) { zset(_bytes(name), _bytes(key), score); } public long incr(byte[] name, long increment) { var resp = request("incr", name, _bytes(increment.ToString())); _respCode = _string(resp[0]); if (resp.Count != 2) { throw new Exception("Bad response!"); } return long.Parse(_string(resp[1])); } public long zincr(byte[] name, byte[] key, long increment) { var resp = request("zincr", name, key, _bytes(increment.ToString())); _respCode = _string(resp[0]); if (resp.Count != 2) { throw new Exception("Bad response!"); } return long.Parse(_string(resp[1])); } public long zincr(string name, string key, long increment) { return zincr(_bytes(name), _bytes(key), increment); } /// <summary> /// </summary> /// <param name="name"></param> /// <param name="key"></param> /// <param name="score"></param> /// <returns>returns true if name.key is found, otherwise returns false.</returns> public bool zget(byte[] name, byte[] key, out long score) { score = -1; var resp = request("zget", name, key); _respCode = _string(resp[0]); if (_respCode == "not_found") { return false; } if (resp.Count != 2) { throw new Exception("Bad response!"); } score = long.Parse(_string(resp[1])); return true; } public bool zget(string name, string key, out long score) { return zget(_bytes(name), _bytes(key), out score); } public void zdel(byte[] name, byte[] key) { var resp = request("zdel", name, key); _respCode = _string(resp[0]); } public void zdel(string name, string key) { zdel(_bytes(name), _bytes(key)); } public long zsize(byte[] name) { var resp = request("zsize", name); _respCode = _string(resp[0]); if (resp.Count != 2) { throw new Exception("Bad response!"); } return long.Parse(_string(resp[1])); } public long zsize(string name) { return zsize(_bytes(name)); } public bool zexists(byte[] name, byte[] key) { var resp = request("zexists", name, key); _respCode = _string(resp[0]); if (_respCode == "not_found") { return false; } if (resp.Count != 2) { throw new Exception("Bad response!"); } return (_string(resp[1]) == "1" ? true : false); } public bool zexists(string name, string key) { return zexists(_bytes(name), _bytes(key)); } public KeyValuePair<string, long>[] zrange(string name, int offset, int limit) { var resp = request("zrange", name, offset.ToString(), limit.ToString()); var kvs = parse_scan_resp(resp); var ret = new KeyValuePair<string, long>[kvs.Length]; for (var i = 0; i < kvs.Length; i++) { var key = kvs[i].Key; var score = long.Parse(_string(kvs[i].Value)); ret[i] = new KeyValuePair<string, long>(key, score); } return ret; } public KeyValuePair<string, long>[] zrrange(string name, int offset, int limit) { var resp = request("zrrange", name, offset.ToString(), limit.ToString()); var kvs = parse_scan_resp(resp); var ret = new KeyValuePair<string, long>[kvs.Length]; for (var i = 0; i < kvs.Length; i++) { var key = kvs[i].Key; var score = long.Parse(_string(kvs[i].Value)); ret[i] = new KeyValuePair<string, long>(key, score); } return ret; } public KeyValuePair<string, long>[] zscan(string name, string key_start, long score_start, long score_end, long limit) { var scoreS = ""; var scoreE = ""; if (score_start != long.MinValue) { scoreS = score_start.ToString(); } if (score_end != long.MaxValue) { scoreE = score_end.ToString(); } var resp = request("zscan", name, key_start, scoreS, scoreE, limit.ToString()); var kvs = parse_scan_resp(resp); var ret = new KeyValuePair<string, long>[kvs.Length]; for (var i = 0; i < kvs.Length; i++) { var key = kvs[i].Key; var score = long.Parse(_string(kvs[i].Value)); ret[i] = new KeyValuePair<string, long>(key, score); } return ret; } public KeyValuePair<string, long>[] zrscan(string name, string key_start, long score_start, long score_end, long limit) { var scoreS = ""; var scoreE = ""; if (score_start != long.MaxValue) { scoreS = score_start.ToString(); } if (score_end != long.MinValue) { scoreE = score_end.ToString(); } var resp = request("zrscan", name, key_start, scoreS, scoreE, limit.ToString()); var kvs = parse_scan_resp(resp); var ret = new KeyValuePair<string, long>[kvs.Length]; for (var i = 0; i < kvs.Length; i++) { var key = kvs[i].Key; var score = long.Parse(_string(kvs[i].Value)); ret[i] = new KeyValuePair<string, long>(key, score); } return ret; } public void multi_zset(byte[] name, KeyValuePair<byte[], long>[] kvs) { var req = new byte[(kvs.Length*2) + 1][]; req[0] = name; for (var i = 0; i < kvs.Length; i++) { req[(2*i) + 1] = kvs[i].Key; req[(2*i) + 2] = _bytes(kvs[i].Value.ToString()); } var resp = request("multi_zset", req); _respCode = _string(resp[0]); } public void multi_zset(string name, KeyValuePair<string, long>[] kvs) { var req = new KeyValuePair<byte[], long>[kvs.Length]; for (var i = 0; i < kvs.Length; i++) { req[i] = new KeyValuePair<byte[], long>(_bytes(kvs[i].Key), kvs[i].Value); } multi_zset(_bytes(name), req); } public void multi_zdel(byte[] name, byte[][] keys) { var req = new byte[keys.Length + 1][]; req[0] = name; for (var i = 0; i < keys.Length; i++) { req[i + 1] = keys[i]; } var resp = request("multi_zdel", req); _respCode = _string(resp[0]); } public void multi_zdel(string name, string[] keys) { var req = new byte[keys.Length][]; for (var i = 0; i < keys.Length; i++) { req[i] = _bytes(keys[i]); } multi_zdel(_bytes(name), req); } public KeyValuePair<string, long>[] multi_zget(byte[] name, byte[][] keys) { var req = new byte[keys.Length + 1][]; req[0] = name; for (var i = 0; i < keys.Length; i++) { req[i + 1] = keys[i]; } var resp = request("multi_zget", req); var kvs = parse_scan_resp(resp); var ret = new KeyValuePair<string, long>[kvs.Length]; for (var i = 0; i < kvs.Length; i++) { var key = kvs[i].Key; var score = long.Parse(_string(kvs[i].Value)); ret[i] = new KeyValuePair<string, long>(key, score); } return ret; } public KeyValuePair<string, long>[] multi_zget(string name, string[] keys) { var req = new byte[keys.Length][]; for (var i = 0; i < keys.Length; i++) { req[i] = _bytes(keys[i]); } return multi_zget(_bytes(name), req); } } }