import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.Tuple;
import java.util.Calendar;
import java.util.Set;
public class AppRedisTest {
private static final String ADDR = "127.0.0.1";
private static final int PORT = 6379;
private static JedisPool jedisPool = new JedisPool(ADDR, PORT);
public static Jedis getJedis() {
return jedisPool.getResource();
}
//生产者,生成5个订单放进去
public void productionDelayMessage() {
for (int i = 0; i < 5; i++) {
//延迟3秒
Calendar cal1 = Calendar.getInstance();
cal1.add(Calendar.SECOND, 10);
int second3later = (int) (cal1.getTimeInMillis() / 1000);
AppRedisTest.getJedis().zadd("OrderId", second3later, "OID0000001" + i);
System.out.println(System.currentTimeMillis() + "ms:redis生成了一个订单任务:订单ID为" + "OID0000001" + i);
}
}
//消费者,取订单
public void consumerDelayMessage() {
Jedis jedis = AppRedisTest.getJedis();
while (true) {
Set<Tuple> items = jedis.zrangeWithScores("OrderId", 0, 5);
if (items == null || items.isEmpty()) {
System.out.println("当前没有等待的任务");
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
continue;
}
int score = (int) ((Tuple) items.toArray()[0]).getScore();
Calendar cal = Calendar.getInstance();
int nowSecond = (int) (cal.getTimeInMillis() / 1000);
if (nowSecond >= score) {
String orderId = ((Tuple) items.toArray()[0]).getElement();
Long orderId1 = jedis.zrem("OrderId", orderId);
// if(orderId1>0){
System.out.println(System.currentTimeMillis() + "ms:redis消费了一个任务:消费的订单OrderId为" + orderId);
// }
}
}
}
public static void main(String[] args) {
AppRedisTest appTest = new AppRedisTest();
appTest.productionDelayMessage();
appTest.consumerDelayMessage();
}
}
public class ThreadAppRedisTest1 {
private static final int threadNum = 10;
public static class MyThread extends Thread {
@Override
public void run() {
//1.将要在线程中执行的代码编写在run方法中
AppRedisTest appTest = new AppRedisTest();
appTest.consumerDelayMessage();
}
}
public static void main(String[] args) {
AppRedisTest appTest = new AppRedisTest();
appTest.productionDelayMessage();
for (int i = 0; i < threadNum; i++) {
MyThread mt = new MyThread();
mt.start();
}
}
}
1631159740392ms:redis生成了一个订单任务:订单ID为OID00000010
1631159740394ms:redis生成了一个订单任务:订单ID为OID00000011
1631159740397ms:redis生成了一个订单任务:订单ID为OID00000012
1631159740398ms:redis生成了一个订单任务:订单ID为OID00000013
1631159740400ms:redis生成了一个订单任务:订单ID为OID00000014
1631159750000ms:redis消费了一个任务:消费的订单OrderId为OID00000010
1631159750000ms:redis消费了一个任务:消费的订单OrderId为OID00000010
1631159750000ms:redis消费了一个任务:消费的订单OrderId为OID00000010
1631159750001ms:redis消费了一个任务:消费的订单OrderId为OID00000011
1631159750001ms:redis消费了一个任务:消费的订单OrderId为OID00000011
1631159750001ms:redis消费了一个任务:消费的订单OrderId为OID00000011
1631159750001ms:redis消费了一个任务:消费的订单OrderId为OID00000012
1631159750001ms:redis消费了一个任务:消费的订单OrderId为OID00000012
1631159750001ms:redis消费了一个任务:消费的订单OrderId为OID00000012
1631159750001ms:redis消费了一个任务:消费的订单OrderId为OID00000013
1631159750001ms:redis消费了一个任务:消费的订单OrderId为OID00000014
1631159750001ms:redis消费了一个任务:消费的订单OrderId为OID00000013
当前没有等待的任务
当前没有等待的任务
当前没有等待的任务
当前没有等待的任务
当前没有等待的任务
解决方案
(1)用分布式锁,但是用分布式锁,性能下降了,该方案不细说。
(2)对ZREM的返回值进行判断,只有大于0的时候,才消费数据,于是将consumerDelayMessage()方法里
if(nowSecond >= score)
{
String orderId = ((Tuple) items.toArray()[0]).getElement();
jedis.zrem("OrderId", orderId);
System.out.println(System.currentTimeMillis() + "ms:redis消费了一个任务:消费的订单OrderId为" + orderId);
}
修改为
if(nowSecond >= score)
{
String orderId = ((Tuple) items.toArray()[0]).getElement();
Long num = jedis.zrem("OrderId", orderId);
if(num != null && num > 0)
{
System.out.println(System.currentTimeMillis() + "ms:redis消费了一个任务:消费的订单OrderId为" + orderId);
}
}