普通队列
说明
利用list 的push 和pop命令
代码
public class Charpter06_3 { public static void main(String[] args) throws Exception { Jedis conn = new Jedis("127.0.0.1", 6379); conn.flushDB(); //消费消息 new Thread(new Runnable() { @Override public void run() { Jedis conn = new Jedis("127.0.0.1", 6379); watch_work_queue(conn); } }).start(); //模拟发送10个消息 for(int i=0;i<10;i++){ addWork(conn,"{"id":"+i+"}"); } } /** * 入队 * @param conn * @param json */ public static void addWork(Jedis conn,String json){ conn.rpush("work:queue",json); } /** * 消费 * @param conn */ public static void watch_work_queue(Jedis conn){ while (true){ //当没有任务阻塞3秒 List<String> jsons= conn.blpop(3,"work:queue"); if(jsons==null||jsons.size()<=0){ //让出cpu Thread.yield(); continue; } System.out.println(jsons.get(1)); } } }
优先级队列
说明
不同优先级 加入到不同的list,消费的时候优先消费优先级高的
代码
public class Charpter06_4 { public static void main(String[] args) throws Exception { Jedis conn = new Jedis("127.0.0.1", 6379); conn.flushDB(); //假定有10个优先级 for (int i = 0; i < 10; i++) { prioritizations.add(i); } //倒序 prioritizations = (TreeSet) prioritizations.descendingSet(); for (int i = 0; i < 10; i++) { addWork(conn, "{"id":" + i + "}", i); } /** * 消费消息 */ new Thread(new Runnable() { @Override public void run() { Jedis conn = new Jedis("127.0.0.1", 6379); watch_work_queue(conn); } }).start(); } private static TreeSet<Integer> prioritizations = new TreeSet(); /** * * @param conn * @param json * @param prioritization 优先级 */ public static void addWork(Jedis conn, String json, Integer prioritization) { //不同的优先级放到不同队列 conn.rpush("work:queue" + prioritization, json); } public static void watch_work_queue(Jedis conn) { while (true) { //格局优先级 优先消费优先级高的 for (Integer p : prioritizations) { //当没有任务阻塞3秒 String json = conn.rpop("work:queue" + p); if (json == null) { continue; } System.out.println(json); break; } } } }
延迟队列
说明
如果入队设置了延迟时间,则计算消费时间并放入zset,
1.开启一个线程根据zset的 zrangeByScore方法获得到了消费时间的消息,并放入 push到list
2.开启一个线程消费list
代码
public class Charpter06_5 { public static void main(String[] args) throws Exception { Jedis conn = new Jedis("127.0.0.1", 6379); conn.flushDB(); //模拟数据 for (int i = 0; i < 10; i++) { addWork(conn, "{"id":" + i + "}", i); } //开启线程pop数据 new Thread(new Runnable() { @Override public void run() { Jedis conn = new Jedis("127.0.0.1", 6379); watch_work_queue(conn); } }).start(); //开启线程 将zset到了时间的放入list Thread th = new Thread(new Runnable() { @Override public void run() { Jedis conn2 = new Jedis("127.0.0.1", 6379); try { watch_delayed_work_push_queue(conn2); } catch (InterruptedException e) { e.printStackTrace(); } } }); th.start(); th.join(); System.out.println("exit"); } /** * @param conn * @param json * @param time 延迟时间 秒 */ public static void addWork(Jedis conn, String json, Integer time) { if (time > 0) { conn.zadd("work:queue:delayed", System.currentTimeMillis() + time * 1000, json); } else { conn.rpush("work:queue", json); } } public static void watch_work_queue(Jedis conn) { while (true) { //当没有任务阻塞3秒 List<String> jsons = conn.brpop(3,"work:queue"); if (jsons == null||jsons.size()<=0) { continue; } System.out.println(jsons.get(1)); } } public static void watch_delayed_work_push_queue(Jedis conn) throws InterruptedException { while (true) { //当没有任务阻塞3秒 每次取一个 Set<String> jsons = conn.zrangeByScore("work:queue:delayed", 0, System.currentTimeMillis(), 0, 1); if (jsons == null || jsons.size() <= 0) { Thread.sleep(1000); continue; } String item = jsons.iterator().next(); conn.rpush("work:queue", item); conn.zrem("work:queue:delayed", item); } } }