spring boot基于1.x.
一 集成redis
1.1 配置
spring.redis.host = localhost spring.redis.port = 6379 spring.redis.timeout = 10000 spring.redis.database = 0 spring.redis.pool.max-active = 100 spring.redis.pool.max-wait = -1 spring.redis.pool.max-idle = 8 spring.redis.pool.min-idle = 0
1.2
工具类
package com.test.util; import java.util.List; import java.util.Set; import java.util.concurrent.TimeUnit; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.connection.jedis.JedisConnectionFactory; import org.springframework.data.redis.core.HashOperations; import org.springframework.data.redis.core.ListOperations; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.SetOperations; import org.springframework.data.redis.core.ValueOperations; import org.springframework.data.redis.core.ZSetOperations; import org.springframework.stereotype.Component; /** * @ClassName:RedisTemplateUtil * @Description: redis工具类 * @author: * @date:2018-03-08 23:28:23 * */ @SuppressWarnings("unchecked") @Component public class RedisTemplateUtil { @SuppressWarnings("rawtypes") @Autowired private RedisTemplate redisTemplate; /** * 写入缓存 * @param key * @param value * @return */ public boolean set(final String key, Object value, Integer database) { boolean result = false; try { JedisConnectionFactory jedisConnectionFactory = (JedisConnectionFactory) redisTemplate.getConnectionFactory(); jedisConnectionFactory.setDatabase(database); redisTemplate.setConnectionFactory(jedisConnectionFactory); ValueOperations<String, Object> operations = (ValueOperations<String, Object>) redisTemplate.opsForValue(); operations.set(key, value); result = true; } catch (Exception e) { e.printStackTrace(); } return result; } /** * 写入缓存设置时效时间 * @param key * @param value * @return */ public boolean set(final String key, Object value, Long expireTime, TimeUnit unit, Integer database) { boolean result = false; try { JedisConnectionFactory jedisConnectionFactory = (JedisConnectionFactory) redisTemplate.getConnectionFactory(); jedisConnectionFactory.setDatabase(database); redisTemplate.setConnectionFactory(jedisConnectionFactory); ValueOperations<String, Object> operations = redisTemplate.opsForValue(); operations.set(key, value, expireTime, unit); result = true; } catch (Exception e) { e.printStackTrace(); } return result; } /** * 批量删除对应的value * @param keys */ public void remove(Integer database, final String... keys) { for (String key : keys) { remove(database, key); } } /** * 批量删除key * @param pattern */ public void removePattern(Integer database, final String pattern) { Set<String> keys = redisTemplate.keys(pattern); if (keys.size() > 0){ JedisConnectionFactory jedisConnectionFactory = (JedisConnectionFactory) redisTemplate.getConnectionFactory(); jedisConnectionFactory.setDatabase(database); redisTemplate.setConnectionFactory(jedisConnectionFactory); redisTemplate.delete(keys); } } /** * 删除对应的value * @param key */ public void remove(Integer database, final String key) { if (exists(database, key)) { redisTemplate.delete(key); } } /** * 判断缓存中是否有对应的value * @param key * @return */ public boolean exists(Integer database, final String key) { JedisConnectionFactory jedisConnectionFactory = (JedisConnectionFactory) redisTemplate.getConnectionFactory(); jedisConnectionFactory.setDatabase(database); redisTemplate.setConnectionFactory(jedisConnectionFactory); return redisTemplate.hasKey(key); } /** * 读取缓存 * @param key * @return */ public Object get(Integer database, final String key) { Object result = null; JedisConnectionFactory jedisConnectionFactory = (JedisConnectionFactory) redisTemplate.getConnectionFactory(); jedisConnectionFactory.setDatabase(database); redisTemplate.setConnectionFactory(jedisConnectionFactory); ValueOperations<String, Object> operations = redisTemplate.opsForValue(); result = operations.get(key); return result; } /** * 哈希 添加 * @param key * @param hashKey * @param value */ public void hmSet(String key, Object hashKey, Object value){ HashOperations<String, Object, Object> hash = redisTemplate.opsForHash(); hash.put(key,hashKey,value); } /** * 哈希获取数据 * @param key * @param hashKey * @return */ public Object hmGet(String key, Object hashKey){ HashOperations<String, Object, Object> hash = redisTemplate.opsForHash(); return hash.get(key,hashKey); } /** * 列表添加 * @param k * @param v */ public void lPush(String k,Object v){ ListOperations<String, Object> list = redisTemplate.opsForList(); list.rightPush(k,v); } /** * 列表获取 * @param k * @param l * @param l1 * @return */ public List<Object> lRange(String k, long l, long l1){ ListOperations<String, Object> list = redisTemplate.opsForList(); return list.range(k,l,l1); } /** * 集合添加 * @param key * @param value */ public void add(String key,Object value){ SetOperations<String, Object> set = redisTemplate.opsForSet(); set.add(key,value); } /** * 集合获取 * @param key * @return */ public Set<Object> setMembers(String key){ SetOperations<String, Object> set = redisTemplate.opsForSet(); return set.members(key); } /** * 有序集合添加 * @param key * @param value * @param scoure */ public void zAdd(String key,Object value,double scoure){ ZSetOperations<String, Object> zset = redisTemplate.opsForZSet(); zset.add(key,value,scoure); } /** * 有序集合获取 * @param key * @param scoure * @param scoure1 * @return */ public Set<Object> rangeByScore(String key,double scoure,double scoure1){ ZSetOperations<String, Object> zset = redisTemplate.opsForZSet(); return zset.rangeByScore(key, scoure, scoure1); } public void extentExpire(String key, Long expireTime, TimeUnit unit) { redisTemplate.boundValueOps(key).expire(expireTime, unit); } }
二 集成elasticsearch
2.1 配置
elasticsearch.ip=localhost elasticsearch.port=9300 elasticsearch.cluster.name=my-elasticsearch elasticsearch.pool=100 elasticsearch.index=test elasticsearch.type=test
2.2
package com.test.util; import java.util.Map; import java.util.UUID; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse; import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest; import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse; import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.get.GetRequestBuilder; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.client.transport.TransportClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import com.alibaba.fastjson.JSONObject; /** * ES的操作数据类 * * 备注:对es的一些操作做了一些封装,抽出来一些操作,就是传统的dao层,数据服务 * * * */ @Component public class ESRepository { private static final Logger log = LoggerFactory.getLogger(ESRepository.class); @Autowired private TransportClient client; /** * 创建索引 * * @param index * @return */ public boolean buildIndex(String index) { if (!isIndexExist(index)) { log.info("Index is not exits!"); } CreateIndexResponse buildIndexresponse = client.admin().indices().prepareCreate(index).execute().actionGet(); log.info(" 创建索引的标志: " + buildIndexresponse.isAcknowledged()); return buildIndexresponse.isAcknowledged(); } /** * 删除索引 * * @param index * @return */ public boolean deleteIndex(String index) { if (!isIndexExist(index)) { log.info(" 索引不存在 !!!!!!"); } DeleteIndexResponse diResponse = client.admin().indices().prepareDelete(index).execute().actionGet(); if (diResponse.isAcknowledged()) { log.info("删除索引**成功** index->>>>>>>" + index); } else { log.info("删除索引**失败** index->>>>> " + index); } return diResponse.isAcknowledged(); } /** * 查询数据 * @param index 索引<----->关系型数据库 * @param type 类型<----->关系型数据表 * @param id 数据ID<----->id * @return */ public Map<String, Object> searchDataByParam(String index, String type, String id) { if(index == null || type == null || id == null) { log.info(" 无法查询数据,缺唯一值!!!!!!! "); return null; } //来获取查询数据信息 GetRequestBuilder getRequestBuilder = client.prepareGet(index, type, id); GetResponse getResponse = getRequestBuilder.execute().actionGet(); //这里也有指定的时间获取返回值的信息,如有特殊需求可以 return getResponse.getSource(); } /** * 更新数据 * * @param data 添加的数据类型 json格式的 * @param index 索引<----->关系型数据库 * @param type 类型<----->关系型数据表 * @param id 数据ID<----->id * @return */ public void updateDataById(JSONObject data, String index, String type, String id) { if(index == null || type == null || id == null) { log.info(" 无法更新数据,缺唯一值!!!!!!! "); return; } //更新步骤 UpdateRequest up = new UpdateRequest(); up.index(index).type(type).id(id).doc(data); //获取响应信息 //.actionGet(timeoutMillis),也可以用这个方法,当过了一定的时间还没得到返回值的时候,就自动返回。 UpdateResponse response = client.update(up).actionGet(); log.info("更新数据状态信息,status{}", response.status().getStatus()); } /** * 添加数据 * * @param data 添加的数据类型 json格式的 * @param index 索引<----->关系型数据库 * @param type 类型<----->关系型数据表 * @param id 数据ID<----->id * @return */ public String addTargetDataALL(String data, String index, String type, String id) { //判断一下次id是否为空,为空的话就设置一个id if(id == null) { id = UUID.randomUUID().toString(); } //正式添加数据进去 IndexResponse response = client.prepareIndex(index, type, id).setSource(data).get(); log.info("addTargetDataALL 添加数据的状态:{}", response.status().getStatus()); return response.getId(); } /** * 通过ID删除数据 * * @param index 索引,类似数据库 * @param type 类型,类似表 * @param id 数据ID */ public void delDataById(String index, String type, String id) { if(index == null || type == null || id == null) { log.info(" 无法删除数据,缺唯一值!!!!!!! "); return; } //开始删除数据 DeleteResponse response = client.prepareDelete(index, type, id).execute().actionGet(); log.info("删除数据状态,status-->>>>{},", response.status().getStatus()); } /** * 判断索引是否存在 * * @param index * @return */ public boolean isIndexExist(String index) { IndicesExistsResponse iep = client.admin().indices().exists(new IndicesExistsRequest(index)).actionGet(); if (iep.isExists()) { log.info("此索引 [" + index + "] 已经在ES集群里存在"); } else { log.info(" 没有此索引 [" + index + "] "); } return iep.isExists(); } }
三 集成fastdfs
3.1
配置
fastdfs.minPoolSize=10 fastdfs.maxPoolSize=30 fastdfs.waitTimes=200 connect_timeout = 2 network_timeout = 30 charset = UTF-8 http.tracker_http_port = 8180 tracker_server = 10.20.8.252:22122
3.2
工具类
package com.test.comm; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import org.csource.fastdfs.ClientGlobal; import org.csource.fastdfs.StorageClient1; import org.csource.fastdfs.StorageServer; import org.csource.fastdfs.TrackerServer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; /** * Title:ConnectionPool Copyright:Copyright(c)2018 Company:test * * @author * @date 2018年9月18日 下午3:15:50 */ @Component public class ConnectionPool { private final static Logger logger = LoggerFactory.getLogger(ConnectionPool.class); /** 空闲的连接池 */ private LinkedBlockingQueue<StorageClient1> idleConnectionPool = new LinkedBlockingQueue<StorageClient1>(); /** 连接池默认最小连接数 */ @Value("${fastdfs.minPoolSize}") private long minPoolSize; /** 连接池默认最大连接数 */ @Value("${fastdfs.maxPoolSize}") private long maxPoolSize; /** 默认等待时间(单位:秒) */ @Value("${fastdfs.waitTimes}") private long waitTimes; @Value("${spring.profiles.active}") private String profile; /** * @Description: 创建TrackerServer,并放入空闲连接池 */ public void createTrackerServer() { logger.debug("[创建TrackerServer(createTrackerServer)]"); TrackerServer trackerServer = null; try { initClientGlobal(); for (int i = 0; i < minPoolSize; i++) { // 把client1添加到连接池 StorageServer storageServer = null; StorageClient1 client1 = new StorageClient1(trackerServer, storageServer); idleConnectionPool.add(client1); } } catch (Exception e) { logger.error("[创建TrackerServer(createTrackerServer)][异常:{}]", e); } } /** * @Description: 获取空闲连接 1).在空闲池(idleConnectionPool)中弹出一个连接; * 2).把该连接放入忙碌池(busyConnectionPool)中; 3).返回 connection * 4).如果没有idle connection, 等待 wait_time秒, and check again * @throws AppException */ public StorageClient1 checkout() { StorageClient1 client1 = idleConnectionPool.poll(); if (client1 == null) { if (idleConnectionPool.size() < maxPoolSize) { createTrackerServer(); try { client1 = idleConnectionPool.poll(waitTimes, TimeUnit.SECONDS); } catch (Exception e) { logger.error("[获取空闲连接(checkout)-error][error:获取连接超时:{}]", e); } } } // 添加到忙碌连接池 // busyConnectionPool.put(client1, obj); logger.debug("[获取空闲连接(checkout)][获取空闲连接成功]"); return client1; } /** * @Description: 释放繁忙连接 1.如果空闲池的连接小于最小连接值,就把当前连接放入idleConnectionPool; * 2.如果空闲池的连接等于或大于最小连接值,就把当前释放连接丢弃; * @param client1 * 需释放的连接对象 */ public void checkin(StorageClient1 client1) { logger.debug("[释放当前连接(checkin)]"); client1 = null; if (idleConnectionPool.size() < minPoolSize) { createTrackerServer(); } } private void initClientGlobal() throws Exception { String FASTDFS_CONFIG = "application-" + profile + ".properties"; ClientGlobal.init(FASTDFS_CONFIG); } public LinkedBlockingQueue<StorageClient1> getIdleConnectionPool() { return idleConnectionPool; } public long getMinPoolSize() { return minPoolSize; } public void setMinPoolSize(long minPoolSize) { if (minPoolSize != 0) { this.minPoolSize = minPoolSize; } } public long getMaxPoolSize() { return maxPoolSize; } public void setMaxPoolSize(long maxPoolSize) { if (maxPoolSize != 0) { this.maxPoolSize = maxPoolSize; } } public long getWaitTimes() { return waitTimes; } public void setWaitTimes(int waitTimes) { if (waitTimes != 0) { this.waitTimes = waitTimes; } } }
四 集成rabbitmq
4.1 配置
spring.rabbitmq.host=localhost spring.rabbitmq.port=5672 spring.rabbitmq.username=admin spring.rabbitmq.password=12345678 spring.rabbitmq.publisher-confirms=true spring.rabbitmq.virtual-host=/
4.2
package com.test.rabbitmq; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.context.annotation.Bean; /** * 在这里新建一个队列,并将队列与交换机绑定 * * <p> * Title:Application * </p> * <p> * Description:TODO * </p> * <p> * Copyright:Copyright(c)2005 * </p> * <p> * Company:test * </p> * * @author * @date 2018年9月12日 上午9:40:48 */ public class Application { /** * 新建队列 */ @Bean public Queue queuePush() { return new Queue("sy-admin-push"); } /** * 创建交换机 */ @Bean TopicExchange exchange() { return new TopicExchange("sy-exchange-admin"); } /** * 绑定交换机 */ /** * 将队列topic.message与exchange绑定,binding_key为topic.message,就是完全匹配 * @param queueMessage * @param exchange * @return */ @Bean Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) { return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message"); } @Bean public void binding () { Queue queue = queuePush(); TopicExchange exchange = exchange(); bindingExchangeMessage(queue, exchange); } }
模拟消费者
package com.test.rabbitmq; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import com.test.util.AESUtils; @Component @RabbitListener(queues = "test") public class Consumer { private final static Logger logger = LoggerFactory.getLogger(Consumer.class); @RabbitHandler public void process(String message) { logger.debug("模拟移动端接收到一条推送消息" + message); logger.debug("解密后的消息 " + AESUtils.decryptData(message)); } }
模拟生产者
package com.test.rabbitmq; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import com.test.util.AESUtils; @Component public class Producer { private final static Logger logger = LoggerFactory.getLogger(Producer.class); @Autowired private RabbitTemplate rabbitTemplate; public void producer (final String queue, final String message) { new Thread(new Runnable() { @Override public void run() { logger.debug("接收到一条消息" + message); //加密 String newMessage = AESUtils.encryptData(message); logger.debug("加密后的消息为 " + newMessage); rabbitTemplate.convertSendAndReceive(queue, newMessage); logger.debug("向移动端推送消息" + newMessage); } }).start(); } }