Redis不仅可作为缓存服务器,还可用作消息队列。它的列表类型天生支持用作消息队列。
性质:由于Redis的列表是使用双向链表实现的,保存了头尾节点,所以在列表头尾两边插取元素都是非常快的。
所以可以直接使用Redis的List实现消息队列,只需简单的两个指令lpush和rpop或者rpush和lpop。
(列表常用命令)
RPUSH : RPUSH key-name value [value1 value2,...] ------------将一个或多个值推入列表右端
LPUSH : LPUSH key-name value [value1 value2,...] ------------将一个或多个值推入列表左端
RPOP : RPOP key-name----------移除并返回列表最右端元素
LPOP :LPOP key-name----------移除并返回列表最左端元素
LINDEX : LINDEX key-name offset --------------返回列表中偏移量为offset的元素
LRANGE : LRANGE key-name start end -------------返回列表中偏移量从start到end范围内的元素
LTRIM : LTRIM key-name start end ----------------对列表进行修剪,只保留偏移量从start到end范围内的元素
其中简单示例如下:
首先连接redis服务器,其中我应用了Jedispool,代码如下:
package redis;
import java.io.IOException;
import java.util.Properties;
import org.springframework.core.io.support.PropertiesLoaderUtils;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
/**
* redis单例连接池
* @author admin
*
*/
public class RedisPool {
private static int TIMEOUT = 1000*30;
private static int MAXTOTAL = 1024;
private static int MAXIDLE = 100;
private static String REDISIP = "bei1";
private static int PORT = 6379;
private static String PASSWORD ="default";
static {
try {
Properties prop = PropertiesLoaderUtils.loadAllProperties("redis.properties");
TIMEOUT = Integer.parseInt(prop.getProperty("TIMEOUT","300000"));
MAXTOTAL = Integer.parseInt(prop.getProperty("MAXTOTAL","1024"));
MAXIDLE = Integer.parseInt(prop.getProperty("MAXIDLE","100"));
REDISIP = prop.getProperty("REDISIP","127.0.0.1");
PORT = Integer.parseInt(prop.getProperty("PORT","6379"));
PASSWORD = prop.getProperty("PASSWORD","default");
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
private static JedisPool[] pool = new JedisPool[10];
private RedisPool() {}
private static JedisPool getPool(int database) {
if(database>10) {
return null;
}
if(pool[database] == null) {
JedisPoolConfig config = new JedisPoolConfig();
config.setMaxTotal(MAXTOTAL);
config.setMaxIdle(MAXIDLE);
config.setMaxWaitMillis(TIMEOUT);
config.setTestOnBorrow(true);
pool[database] = new JedisPool(config,REDISIP,PORT,TIMEOUT,PASSWORD,database);
}
return pool[database];
}
//单例获取redis连接资源
public static Jedis getResource(int database) {
if(database>10) {
return null;
}
Jedis jedis = null;
if(pool[database] == null) {
synchronized(RedisPool.class) {
try {
if(pool[database] == null) {
pool[database] = getPool(database);
try {
if (pool[database] != null) {
jedis = pool[database].getResource();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}catch (Exception e) {
e.printStackTrace();
}
}
}else {
jedis = pool[database].getResource();
}
return jedis;
}
}
定义一个生产者,代码:
package RedisMq;
import com.sun.deploy.util.StringUtils;
import redis.RedisPool;
import redis.clients.jedis.Jedis;
import java.util.concurrent.TimeUnit;
/**
* <p> </p>
*
* @author ly
* @since 2019/1/5
*/
public class Producer extends Thread{
public static final String MESSAGE_KEY = "queue";
private Jedis jedis;
private String produceName;
private volatile int count;
public Producer(String name){
this.produceName = name;
init();
}
private void init(){
jedis = RedisPool.getResource(1);
}
public void putMessage(String message) {
Long size = jedis.lpush(MESSAGE_KEY, message);
System.out.println(produceName + ": 当前未被处理消息条数为:" + size);
count++;
}
public int getCount() {
return count;
}
@Override
public void run() {
try {
while (true) {
putMessage("hello world");
TimeUnit.SECONDS.sleep(1);
}
} catch (InterruptedException e) {
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws InterruptedException {
Producer producer = new Producer("myProducer");
producer.start();
for (; ; ) {
System.out.println("main : 已存储消息条数:" + producer.getCount());
TimeUnit.SECONDS.sleep(10);
}
}
}
再定义一个消费者
package RedisMq;
import redis.RedisPool;
import redis.clients.jedis.Jedis;
/**
* <p> </p>
*
* @author ly
* @since 2019/1/7
*/
/**
* 消息消费者
* @author yamikaze
*/
public class Customer extends Thread{
private String customerName;
private volatile int count;
private Jedis jedis;
public Customer(String name) {
this.customerName = name;
init();
}
private void init() {
jedis = RedisPool.getResource(1);
}
public void processMessage() {
String message = jedis.rpop(Producer.MESSAGE_KEY);
if(message != null) {
count++;
handle(message);
}
}
public void handle(String message) {
System.out.println(customerName + " 正在处理消息,消息内容是: " + message + " 这是第" + count + "条");
}
@Override
public void run() {
while (true) {
processMessage();
}
}
public static void main(String[] args) {
Customer customer = new Customer("小花");
customer.start();
}
}
运行后 生产者和消费者控制台信息分别如下:
Redis 发布与订阅
redis 支持消息队列。发布订阅即是一种消息通信模式:发送者发送消息,订阅者订阅消息。
redis 客户端可以订阅任意数量的频道
(一)发布订阅
使用 publish 指令,格式为 publish channel message
127.0.0.1:6379> publish fruit "apple"
(integer) 0
该返回值为0,说明没有人订阅
(二)订阅消息
使用subscribe指令接受消息,格式为 subscribe channel
127.0.0.1:6379> subscribe fruit
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "fruit"
3) (integer) 1
可以看到使用SUBSCRIBE指令后进入了订阅模式,但没有接收到publish发送的消息,这是因为只有在消息发出去前订阅才会接收到。在这个模式下其他指令,只能看到回复。
回复信息分为3类:
1 如果为subscribe,第二个值表示订阅的频道,如上述代码
2 如果为message(消息),第二个值为产生该消息的频道,第三个值为消息,如图:
3 如果退订消息 ,第二个值表示取消订阅的频道,第三个值表示当前客户端的订阅数量。则接受信息如下
(三)取消订阅
使用Unsubscribe 指令,格式为 UNSUBSCRIBE channel [channel ...]
127.0.0.1:6379> unsubscribe fruit
1) "unsubscribe"
2) "fruit"
3) (integer) 0
参考文章https://blog.csdn.net/qq_34212276/article/details/78455004