import com.fasterxml.jackson.annotation.JsonAutoDetect; import com.fasterxml.jackson.annotation.PropertyAccessor; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.commons.pool2.impl.GenericObjectPoolConfig; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.cache.annotation.CachingConfigurerSupport; import org.springframework.cache.annotation.EnableCaching; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Primary; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.connection.RedisPassword; import org.springframework.data.redis.connection.RedisStandaloneConfiguration; import org.springframework.data.redis.connection.lettuce.LettuceClientConfiguration; import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory; import org.springframework.data.redis.connection.lettuce.LettucePoolingClientConfiguration; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer; import org.springframework.stereotype.Component; /** * @author * redis配置类 */ @Configuration @EnableCaching @Component public class RedisMainConfig extends CachingConfigurerSupport{ /** * 配置lettuce连接池 * @return */ @Bean @ConfigurationProperties(prefix = "spring.redis.lettuce.pool") public GenericObjectPoolConfig redisPool() { return new GenericObjectPoolConfig<>(); } /** * 配置第一个数据源的 * @return */ @Bean @Primary public RedisStandaloneConfiguration redisConfig(@Value("${spring.redis.host}") String host, @Value("${spring.redis.port}") int port , @Value("${spring.redis.database}") int db, @Value("${spring.redis.password}") String password) { RedisStandaloneConfiguration redisStandaloneConfiguration = new RedisStandaloneConfiguration(host, port); redisStandaloneConfiguration.setDatabase(db); redisStandaloneConfiguration.setPassword(RedisPassword.of(password)); return redisStandaloneConfiguration; } /** * 配置第一个数据源的连接工厂 * 这里注意:需要添加@Primary 指定bean的名称,目的是为了创建两个不同名称的LettuceConnectionFactory * * @param config * @param redisConfig * @return */ @Bean("cachefactory") @Primary public LettuceConnectionFactory cachefactory(GenericObjectPoolConfig config, RedisStandaloneConfiguration redisConfig) { LettuceClientConfiguration clientConfiguration = LettucePoolingClientConfiguration.builder().poolConfig(config).build(); return new LettuceConnectionFactory(redisConfig, clientConfiguration); } /** * 配置第一个数据源的RedisTemplate * 注意:这里指定使用名称=factory 的 RedisConnectionFactory * 并且标识第一个数据源是默认数据源 @Primary * * @param cachefactory * @return */ @Bean("redisTemplate") @Primary public RedisTemplate<String, String> redisTemplate(@Qualifier("cachefactory") RedisConnectionFactory cachefactory) { return getStringStringRedisTemplate(cachefactory); } /** * 配置第二个数据源 * @return */ @Bean public RedisStandaloneConfiguration mqRedisConfig(@Value("${spring.mq.host}") String host, @Value("${spring.mq.port}") int port , @Value("${spring.mq.database}") int db, @Value("${spring.mq.password}") String password) { RedisStandaloneConfiguration redisStandaloneConfiguration = new RedisStandaloneConfiguration(host, port); redisStandaloneConfiguration.setDatabase(db); redisStandaloneConfiguration.setPassword(RedisPassword.of(password)); return redisStandaloneConfiguration; } /** * 配置第二个数据源的连接工厂 * @param config * @param mqRedisConfig * @return */ @Bean("mqfactory") public LettuceConnectionFactory mqfactory(GenericObjectPoolConfig config, @Qualifier("mqRedisConfig")RedisStandaloneConfiguration mqRedisConfig) { LettuceClientConfiguration clientConfiguration = LettucePoolingClientConfiguration.builder().poolConfig(config).build(); return new LettuceConnectionFactory(mqRedisConfig, clientConfiguration); } /** * 配置第二个数据源的RedisTemplate * 注意:这里指定使用名称=mqfactory 的 RedisConnectionFactory * * @param mqfactory * @return */ @Bean("mqRedisTemplate") public StringRedisTemplate mqRedisTemplate(@Qualifier("mqfactory") RedisConnectionFactory mqfactory) { StringRedisTemplate template = new StringRedisTemplate(mqfactory); return template; } /** * 设置序列化方式 (这一步不是必须的) * @param factory * @return */ private RedisTemplate<String, String> getStringStringRedisTemplate(RedisConnectionFactory factory) { StringRedisTemplate template = new StringRedisTemplate(factory); Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class); ObjectMapper om = new ObjectMapper(); om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); jackson2JsonRedisSerializer.setObjectMapper(om); template.setValueSerializer(jackson2JsonRedisSerializer); template.afterPropertiesSet(); return template; } }
配置文件 是yml文件
spring: redis: database: 0 host: 10.100.2.246 port: 6379 password: 123456 lettuce: pool: max-active: 8 max-wait: -1 max-idle: 8 min-idle: 0 timeout: 30000 mq: database: 0 host: 10.100.2.246 port: 6380 password: 123456
消息队列生产者
import lombok.extern.log4j.Log4j; import lombok.extern.slf4j.Slf4j; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Component; import javax.annotation.Resource; /** * ClassName:@Publisher * Description:消息队列生产者 **/ @Component("mqPublisher") @Slf4j public class Publisher { @Resource(name = "mqRedisTemplate") private StringRedisTemplate redisTemplate; /** * 发送消息 * @param topic * @param msg * @return */ public boolean sendMessage(String topic,String msg) { log.info("publisher msg,topic:" + topic + ",msg:" + msg); try { redisTemplate.convertAndSend(topic, msg); return true; } catch (Exception e) { log.error("publisher msg,error:" + e.toString() ); return false; } } }
监听队列
import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.autoconfigure.AutoConfigureAfter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.listener.PatternTopic; import org.springframework.data.redis.listener.RedisMessageListenerContainer; import org.springframework.data.redis.listener.adapter.MessageListenerAdapter; @Configuration @AutoConfigureAfter({Receiver.class})//主要是控制类的加载顺序,即 指定的类加载完了,再加载本类 public class SubscriberConfig { //先同步数据 根据同步的返回结果 再视情况推送 /** * 创建消息监听容器 * @param redisConnectionFactory * @param messageListenerAdapter * @return */ @Bean public RedisMessageListenerContainer getRedisMessageListenerContainer(@Qualifier("mqfactory") RedisConnectionFactory redisConnectionFactory, MessageListenerAdapter messageListenerAdapter,MessageListenerAdapter synMessageListenerAdapter) { RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer(); redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory); //监听TOPIC_ORDER的广播 messageListenerAdapter注入接受消息方法名 用于推送 redisMessageListenerContainer.addMessageListener(messageListenerAdapter, new PatternTopic("TOPIC_ORDER")); //监听SYN_ORDER的广播 messageListenerAdapter注入接受消息方法名 用于同步 redisMessageListenerContainer.addMessageListener(synMessageListenerAdapter, new PatternTopic("SYN_ORDER")); return redisMessageListenerContainer; } /** * 消息监听适配器,注入接受消息方法,输入方法名字 同步数据 * @param receiver * @return */ @Bean public MessageListenerAdapter synMessageListenerAdapter(Receiver receiver) { return new MessageListenerAdapter(receiver,"synSeveiver"); //当没有继承MessageListener时需要写方法名字 synSeveiver方法名 } /** * 消息监听适配器,注入接受消息方法,输入方法名字 推送数据 * @param receiver * @return */ @Bean public MessageListenerAdapter messageListenerAdapter(Receiver receiver) { return new MessageListenerAdapter(receiver,"reveiver"); //当没有继承MessageListener时需要写方法名字 reveiver方法名 } }
消费者
import com.newflows.sync.service.IAsyncService;import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; /** * ClassName:@Publisher * Description:消息队列消费者 用户推送 监听处理业务 **/ @Component public class Receiver{ private static final Logger logger = LoggerFactory.getLogger(Receiver.class); @Resource(name = "mqRedisTemplate") private StringRedisTemplate mqRedisTemplate; @Autowired private IAsyncService asyncService; /** * 用于实现同步业务 * @param message */ public void synSeveiver(String message) { Map<String,Object> paramMap = new ConcurrentHashMap<>(); for (int i=0;i<1000;i++){ asyncService.executeAsync(paramMap); } } /** * 用于实现推送 * @param message */ public void reveiver(String message){ Log.info("进入监听mq消息队列==》推送业务==》message:" + message.toString()); Map<String,Object> paramMap = new ConcurrentHashMap<>(); try { Map<String, Object> map = ComJsonUtils.jsonToMap(message.toString()); }catch (Exception e){ logger.error("系统异常",e.getMessage()); } } }
IAsyncService 接口
import java.util.Map; public interface IAsyncService { public void executeAsync(Map<String, Object> paramMap); }
AsyncServiceImpl实现类
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; import java.util.Map; /** * 用于异步 * ClassName:@Publisher * Description:单线程进入线程池执行 **/ @Service public class AsyncServiceImpl implements IAsyncService { private static final Logger logger = LoggerFactory.getLogger(AsyncServiceImpl.class);
@Override public void executeAsync(Map<String, Object> paramMap) {
System.out.println(paramMap);
}
}
IRedisService接口
import java.util.List; import java.util.Set; public interface IRedisService { public boolean set(final String key, Object value) ; public boolean set(final String key, Object value, Long expireTime) ; public boolean exists(final String key); public Object get(final String key) ; public void remove(final String key); public void remove(final String... keys) ; public void removePattern(final String pattern) ; public void hashSet(String key, Object hashKey, Object value); public Object hashGet(String key, Object hashKey); public void push(String k, Object v); public List<Object> range(String k, long l, long l1); public void setAdd(String key, Object value); public Set<Object> setMembers(String key); public void zAdd(String key, Object value, double scoure); public Set<Object> rangeByScore(String key, double scoure, double scoure1); }
redisService实现类
import com.newflows.sync.service.IRedisService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.data.redis.core.*; import org.springframework.stereotype.Service; import java.io.Serializable; import java.util.List; import java.util.Set; import java.util.concurrent.TimeUnit; @Service("redisService") public class RedisServiceImpl implements IRedisService { private Logger logger = LoggerFactory.getLogger(RedisServiceImpl.class); @Autowired @Qualifier("redisTemplate") private RedisTemplate redisTemplate; /** * set value * @param key * @param value * @return */ public boolean set(final String key, Object value) { boolean result = false; try { ValueOperations<Serializable, Object> operations = redisTemplate.opsForValue(); operations.set(key, value); result = true; } catch (Exception e) { logger.error("set error: key {}, value {}",key,value,e); } return result; } /** * set value with expireTime * @param key * @param value * @param expireTime * @return */ public boolean set(final String key, Object value, Long expireTime) { boolean result = false; try { ValueOperations<Serializable, Object> operations = redisTemplate.opsForValue(); operations.set(key, value); redisTemplate.expire(key, expireTime, TimeUnit.SECONDS); result = true; } catch (Exception e) { logger.error("set error: key {}, value {},expireTime {}",key,value,expireTime,e); } return result; } /** * @param key * @return */ public boolean exists(final String key) { return redisTemplate.hasKey(key); } /** * @param key * @return */ public Object get(final String key) { Object result = null; ValueOperations<Serializable, Object> operations = redisTemplate.opsForValue(); result = operations.get(key); return result; } /** * remove single key * @param key */ public void remove(final String key) { if (exists(key)) { redisTemplate.delete(key); } } /** * batch delete * @param keys */ public void remove(final String... keys) { for (String key : keys) { remove(key); } } /** * batch delete with pattern * @param pattern */ public void removePattern(final String pattern) { Set<Serializable> keys = redisTemplate.keys(pattern); if (keys.size() > 0) redisTemplate.delete(keys); } /** * hash set * @param key * @param hashKey * @param value */ public void hashSet(String key, Object hashKey, Object value){ HashOperations<String, Object, Object> hash = redisTemplate.opsForHash(); hash.put(key,hashKey,value); } /** * hash get * @param key * @param hashKey * @return */ public Object hashGet(String key, Object hashKey){ HashOperations<String, Object, Object> hash = redisTemplate.opsForHash(); return hash.get(key,hashKey); } /** * list push * @param k * @param v */ public void push(String k,Object v){ ListOperations<String, Object> list = redisTemplate.opsForList(); list.rightPush(k,v); } /** * list range * @param k * @param l * @param l1 * @return */ public List<Object> range(String k, long l, long l1){ ListOperations<String, Object> list = redisTemplate.opsForList(); return list.range(k,l,l1); } /** * set add * @param key * @param value */ public void setAdd(String key,Object value){ SetOperations<String, Object> set = redisTemplate.opsForSet(); set.add(key,value); } /** * set get * @param key * @return */ public Set<Object> setMembers(String key){ SetOperations<String, Object> set = redisTemplate.opsForSet(); return set.members(key); } /** * ordered set add * @param key * @param value * @param scoure */ public void zAdd(String key,Object value,double scoure){ ZSetOperations<String, Object> zset = redisTemplate.opsForZSet(); zset.add(key,value,scoure); } /** * rangeByScore * @param key * @param scoure * @param scoure1 * @return */ public Set<Object> rangeByScore(String key, double scoure, double scoure1){ ZSetOperations<String, Object> zset = redisTemplate.opsForZSet(); return zset.rangeByScore(key, scoure, scoure1); } }
controller层
import com.example.copydemo.mq.Publisher; import com.example.copydemo.service.IRedisService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.ResponseBody; /** * @program: copy-demo * @description: 推送测试 * @author: Gaojq * @create: 2020-01-15 09:05 **/ @Controller @RequestMapping("test") public class PushController { private static final Logger logger = LoggerFactory.getLogger(PushController.class); @Autowired private IRedisService redisService; @Autowired private Publisher publisher; @RequestMapping(value = "/publish") @ResponseBody public String publish() { boolean flag = publisher.sendMessage("SYN_ORDER", "1"); return "ok"; } @RequestMapping("redis") @ResponseBody public String redisCache() { redisService.set("name","gaojq"); String name = (String) redisService.get("name"); logger.info("name="+name); return name; } }
pom.xml