zoukankan      html  css  js  c++  java
  • JAVA实现的异步redisclient

    再使用redis的过程中,发现使用缓存尽管好,可是有些地方还是比較难权衡,缓存对象大了,存储对象时的序列化工作非常繁重,消耗大量cpu;那么切分成非常小的部分吧,存取的次数变多了,redisclient的交互次数上不去,这是一个矛盾。要是有一个client能支持很多其它的交互次数,那么在完毕既定指标的前提下,岂不是能够让我们的建模工作变的更宽松一些?


    于是參照redis协议,花了5天时间,做了一个具备基本功能的redisclient。它的特性:

    1.支持异步调用,在getA之后不用等结果,能继续getB,getC,等等。等要做的redis操作都做完了,再来检查结果。

    2.单连接,支持断线重连。client和随意一个redisserver仅仅建立一个连接。由于是异步调用,不是必需建立很多其它连接。

    3.底层支持pipeline,不管是异步调用,还是堵塞调用,底层使用的都有概率使用到pipeline。对pipeline的支持是在通信层做的,所以不管哪种调用都是隐性的使用pipeline。可是,连续的异步操作,本线程内的操作就有可能使用pipeline。而同步调用,则通常是线程之间的操作使用pipeline。更重要的是,这一切都是在底层完毕的,我们在调用redis api的时候根本不用管这些,仅仅管调用就可以。仅仅只是是建议採用连续的异步操作,由于这样效率最高。

    4.支持shard模式。採用一致性算法的分片。

    5.shard模式下仍然支持pipeline。由于对pipeline的支持是做在通信层的,所以,在不论什么模式下都支持pipeline。


    最后看一下,在我机器上跑的结果吧:

    机器配置:双核cpu,主频2GHz,8g内存,mac osx

    redis执行在本机的虚拟机上,虚拟机单核单线程,2G内存,ubuntu server。


    測试场景1(模拟堵塞调用的场景):

    80个线程,进行简单的get、set,

    每秒运行的get和set总数为:130000次以上。

    public class ShardClientTest {
    
    	public static void main(String[] args) throws IOException,
    			InterruptedException {
    		final int T = 80;
    		final int CONTINUOUS = 1;
    		final int N = 20000000;
    
    		EzSelector selector = new EzSelector();
    		List<BiTuple<String, Integer>> list = new ArrayList<>();
    		list.add(new BiTuple<>("10.211.55.5", 6379));
    
    		final ShardClient conn = new ShardClient(selector, list);
    
    		final AtomicLong count = new AtomicLong();
    		for (int n = 0; n < T; n++) {
    			Thread t = new Thread() {
    				public void run() {
    					try {
    						for (int i = 0; i < N; i++) {
    							try {
    								Result ret = null;
    								for (int cc = 0; cc < CONTINUOUS; cc++) {
    									ret = conn.asyncSet(i + "", i + ":" + cc);
    									// System.out.println(ret.get());
    								}
    								ret.get();
    
    								Result ret6 = null;
    								for (int cc = 0; cc < CONTINUOUS; cc++) {
    									ret6 = conn.asyncGet(i + "");
    									// System.out.println(ret6.get());
    								}
    								ret6.get();
    
    								count.addAndGet(CONTINUOUS * 2);
    							} catch (Exception e) {
    								e.printStackTrace();
    							} finally {
    							}
    						}
    					} catch (Exception e) {
    						e.printStackTrace();
    					}
    				};
    			};
    			t.start();
    		}
    
    		while (true) {
    			long start = count.get();
    			Thread.sleep(1000);
    			System.out.println(count.get() - start);
    		}
    	}
    
    }

    測试场景2(模拟异步调用,连续5次get和set):

    每秒可运行的get和set总数是330000次左右

    package zhmt.ezredis;
    
    import java.io.IOException;
    import java.util.concurrent.atomic.AtomicLong;
    
    import zhmt.eznet.EzSelector;
    import zhmt.eznet.EzSocketOption;
    import zhmt.eznet.SharedRpcConnection.Result;
    import zhmt.ezredis.AsyncClient;
    
    public class RedisClientTest {
    	public static void main(String[] args) throws IOException,
    			InterruptedException {
    		final int T = 100;
    		final int CONTINUOUS = 5;
    		final int N = 20000000;
    
    		EzSelector selector = new EzSelector();
    		final RedisClient conn = new AsyncClient(selector,
    				new EzSocketOption("10.211.55.5", 6379));
    
    		final AtomicLong count = new AtomicLong();
    		for (int n = 0; n < T; n++) {
    			Thread t = new Thread() {
    				public void run() {
    					try {
    						for (int i = 0; i < N; i++) {
    							try {
    								Result ret = null;
    								for (int cc = 0; cc < CONTINUOUS; cc++) {
    									ret = conn.asyncSet(i + "", i + ":" + cc);
    									// System.out.println(ret.get());
    								}
    								ret.get();
    
    								Result ret6 = null;
    								for (int cc = 0; cc < CONTINUOUS; cc++) {
    									ret6 = conn.asyncGet(i + "");
    									// System.out.println(ret6.get());
    								}
    								ret6.get();
    
    								count.addAndGet(CONTINUOUS * 2);
    							} catch (Exception e) {
    								e.printStackTrace();
    							} finally {
    							}
    						}
    					} catch (Exception e) {
    						e.printStackTrace();
    					}
    				};
    			};
    			t.start();
    		}
    
    		while (true) {
    			long start = count.get();
    			Thread.sleep(1000);
    			System.out.println(count.get() - start);
    		}
    	}
    }
    


    到眼下为止,仅仅实现了十几命令。

    接下来,准备优先实现用户自己定义shard key。使数据依照用户的意图去分片。以最大化pipeline的使用。

    其次,继续实现经常使用命令。


    源代码:

    https://github.com/zhmt/ezredis


  • 相关阅读:
    表详细操作
    库相关操作
    数据库一
    协程
    多线程2
    .Net鼠标随动窗口
    .Net操作音频
    .Net操作注册表--un
    .Net操作.exe文件
    .Net连接数据库(AOD.Net)
  • 原文地址:https://www.cnblogs.com/blfshiye/p/3802406.html
Copyright © 2011-2022 走看看