zoukankan      html  css  js  c++  java
  • zookeeper+kafka集群的安装

    时效性要求很高的数据,库存,采取的是数据库+缓存双写的技术方案,也解决了双写的一致性的问题
    缓存数据生产服务,监听一个消息队列,然后数据源服务(商品信息管理服务)发生了数据变更之后,就将数据变更的消息推送到消息队列中
    缓存数据生产服务可以去消费到这个数据变更的消息,然后根据消息的指示提取一些参数,然后调用对应的数据源服务的接口,拉去数据,这个时候一般是从mysql库中拉去的
    1、zookeeper集群搭建
    zookeeper-3.4.5.tar.gz使用WinSCP拷贝到/usr/local目录下。
    对zookeeper-3.4.5.tar.gz进行解压缩:tar -zxvf zookeeper-3.4.5.tar.gz。
    对zookeeper目录进行重命名:mv zookeeper-3.4.5 zk
    配置zookeeper相关的环境变量
    vi ~/.bashrc
    export ZOOKEEPER_HOME=/usr/local/zk
    export PATH=$ZOOKEEPER_HOME/bin
    source ~/.bashrc
    
    cd zk/conf
    cp zoo_sample.cfg zoo.cfg
    
    vi zoo.cfg
    

      修改:

    dataDir=/usr/local/zk/data
    

      新增:

    server.0=eshop-cache01:2888:3888	
    server.1=eshop-cache02:2888:3888
    server.2=eshop-cache03:2888:3888
    

      

    cd zk
    mkdir data
    cd data

    vi myid
    0

    在另外两个节点上按照上述步骤配置ZooKeeper,使用scp将zk和.bashrc拷贝到eshop-cache02和eshop-cache03上即可。唯一的区别是标识号分别设置为1和2。

    分别在三台机器上执行:zkServer.sh start。
    检查ZooKeeper状态:zkServer.sh status,应该是一个leader,两个follower
    jps:检查三个节点是否都有QuromPeerMain进程
     
    2、kafka集群搭建
    scala,就是一门编程语言,现在比较火,很多比如大数据领域里面的spark(计算引擎)就是用scala编写的

    scala-2.11.4.tgz使用WinSCP拷贝到/usr/local目录下。
    对scala-2.11.4.tgz进行解压缩:tar -zxvf scala-2.11.4.tgz。
    对scala目录进行重命名:mv scala-2.11.4 scala

    配置scala相关的环境变量
    vi ~/.bashrc
    export SCALA_HOME=/usr/local/scala
    export PATH=$SCALA_HOME/bin
    source ~/.bashrc

    查看scala是否安装成功:scala -version

    按照上述步骤在其他机器上都安装好scala。使用scp将scala和.bashrc拷贝到另外两台机器上即可。
    kafka_2.9.2-0.8.1.tgz使用WinSCP拷贝到/usr/local目录下。
    对kafka_2.9.2-0.8.1.tgz进行解压缩:tar -zxvf kafka_2.9.2-0.8.1.tgz。
    对kafka目录进行改名:mv kafka_2.9.2-0.8.1 kafka

    配置kafka
    vi /usr/local/kafka/config/server.properties
    broker.id:依次增长的整数,0、1、2,集群中Broker的唯一id
    zookeeper.connect=192.168.31.187:2181,192.168.31.19:2181,192.168.31.227:2181

    安装slf4j
    slf4j-1.7.6.zip上传到/usr/local目录下
    unzip slf4j-1.7.6.zip
    把slf4j中的slf4j-nop-1.7.6.jar复制到kafka的libs目录下面

    解决kafka Unrecognized VM option 'UseCompressedOops'问题

    vi /usr/local/kafka/bin/kafka-run-class.sh

    if [ -z "$KAFKA_JVM_PERFORMANCE_OPTS" ]; then
    KAFKA_JVM_PERFORMANCE_OPTS="-server -XX:+UseCompressedOops -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:+CMSClassUnloadingEnabled -XX:+CMSScavengeBeforeRemark -XX:+DisableExplicitGC -Djava.awt.headless=true"
    fi

    去掉-XX:+UseCompressedOops即可

    按照上述步骤在另外两台机器分别安装kafka。用scp把kafka拷贝到其他机器即可。
    唯一区别的,就是server.properties中的broker.id,要设置为1和2

    在三台机器上的kafka目录下,分别执行以下命令:nohup bin/kafka-server-start.sh config/server.properties &

    使用jps检查启动是否成功

    使用基本命令检查kafka是否搭建成功

    bin/kafka-topics.sh --zookeeper 192.168.31.187:2181,192.168.31.19:2181,192.168.31.227:2181 --topic test --replication-factor 1 --partitions 1 --create

    bin/kafka-console-producer.sh --broker-list 192.168.31.181:9092,192.168.31.19:9092,192.168.31.227:9092 --topic test

    bin/kafka-console-consumer.sh --zookeeper 192.168.31.187:2181,192.168.31.19:2181,192.168.31.227:2181 --topic test --from-beginning
     
    kafka+ehcache+redis完成缓存数据生产服务的开发:
    import org.apache.ibatis.session.SqlSessionFactory;
    import org.apache.tomcat.jdbc.pool.DataSource;
    import org.mybatis.spring.SqlSessionFactoryBean;
    import org.mybatis.spring.annotation.MapperScan;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.boot.context.embedded.ServletListenerRegistrationBean;
    import org.springframework.boot.context.properties.ConfigurationProperties;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.ComponentScan;
    import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
    import org.springframework.jdbc.datasource.DataSourceTransactionManager;
    import org.springframework.transaction.PlatformTransactionManager;
    
    import com.roncoo.eshop.cache.listener.InitListener;
    
    import redis.clients.jedis.HostAndPort;
    import redis.clients.jedis.JedisCluster;
    
    @EnableAutoConfiguration
    @SpringBootApplication
    @ComponentScan
    @MapperScan("com.roncoo.eshop.cache.mapper")
    public class Application {
     
        @Bean
        @ConfigurationProperties(prefix="spring.datasource")
        public DataSource dataSource() {
            return new org.apache.tomcat.jdbc.pool.DataSource();
        }
        
        @Bean
        public SqlSessionFactory sqlSessionFactoryBean() throws Exception {
            SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean();
            sqlSessionFactoryBean.setDataSource(dataSource());
            PathMatchingResourcePatternResolver resolver = new PathMatchingResourcePatternResolver();
            sqlSessionFactoryBean.setMapperLocations(resolver.getResources("classpath:/mybatis/*.xml"));
            return sqlSessionFactoryBean.getObject();
        }
     
        @Bean
        public PlatformTransactionManager transactionManager() {
            return new DataSourceTransactionManager(dataSource());
        }
        
        @Bean
        public JedisCluster JedisClusterFactory() {
            Set<HostAndPort> jedisClusterNodes = new HashSet<HostAndPort>();
            jedisClusterNodes.add(new HostAndPort("192.168.31.19", 7003));
            jedisClusterNodes.add(new HostAndPort("192.168.31.19", 7004));
            jedisClusterNodes.add(new HostAndPort("192.168.31.227", 7006));
            JedisCluster jedisCluster = new JedisCluster(jedisClusterNodes);
            return jedisCluster;
        }
    
        @SuppressWarnings({ "rawtypes", "unchecked" })
       	@Bean
           public ServletListenerRegistrationBean servletListenerRegistrationBean() {
           	ServletListenerRegistrationBean servletListenerRegistrationBean = 
           			new ServletListenerRegistrationBean();
           	servletListenerRegistrationBean.setListener(new InitListener());  
           	return servletListenerRegistrationBean;
         }
        
        public static void main(String[] args) {
            SpringApplication.run(Application.class, args);
        }
    }
    

      

    import javax.servlet.ServletContext;
    import javax.servlet.ServletContextEvent;
    import javax.servlet.ServletContextListener;
    
    import org.springframework.context.ApplicationContext;
    import org.springframework.web.context.support.WebApplicationContextUtils;
    
    import com.roncoo.eshop.kafka.KafkaConsumer;
    import com.roncoo.eshop.spring.SpringContext;
    
    
    /**
     * 系统初始化的监听器
     */
    public class InitListener implements ServletContextListener {
    	
    	public void contextInitialized(ServletContextEvent sce) {
    		ServletContext sc = sce.getServletContext();
    		ApplicationContext context = WebApplicationContextUtils.getWebApplicationContext(sc);
    		SpringContext.setApplicationContext(context);  
    		
    		new Thread(new KafkaConsumer("cache-message")).start();
    	}
    	
    	public void contextDestroyed(ServletContextEvent sce) {
    		
    	}
    
    }
    

      

    import org.springframework.cache.annotation.EnableCaching;
    import org.springframework.cache.ehcache.EhCacheCacheManager;
    import org.springframework.cache.ehcache.EhCacheManagerFactoryBean;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.core.io.ClassPathResource;
    
    //缓存配置管理类
    @Configuration
    @EnableCaching
    public class CacheConfiguration {
    	@Bean
        public EhCacheManagerFactoryBean ehCacheManagerFactoryBean(){
    		EhCacheManagerFactoryBean ehCacheManagerFactoryBean=
    				new EhCacheManagerFactoryBean();
    		ehCacheManagerFactoryBean.setConfigLocation(new ClassPathResource("ehcache.xml")); 
    		ehCacheManagerFactoryBean.setShared(true);
    		return ehCacheManagerFactoryBean;
    	}
    	@Bean
    	public EhCacheCacheManager ehCacheCacheManager(EhCacheManagerFactoryBean ehCacheManagerFactoryBean){
    		return new EhCacheCacheManager(ehCacheManagerFactoryBean.getObject());
    	}
    }
    

      

    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.Properties;
    
    import kafka.consumer.Consumer;
    import kafka.consumer.ConsumerConfig;
    import kafka.consumer.KafkaStream;
    import kafka.javaapi.consumer.ConsumerConnector;
    
    // kafka消费者
    public class KafkaConsumer implements Runnable   {
    	private ConsumerConnector consumerConnector;
    	private String topic;
    	
    	public KafkaConsumer(String topic){
    		this.consumerConnector=Consumer.createJavaConsumerConnector(
    				createConsumerConfig());
    		this.topic=topic;
    	}
    	
    	@SuppressWarnings("rawtypes")
    	public void run(){
    		Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
            topicCountMap.put(topic, 1);
            
            Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = 
            		consumerConnector.createMessageStreams(topicCountMap);
            List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
            
            for (KafkaStream stream : streams) {
                new Thread(new KafkaMessageProcessor(stream)).start();
            }
    	}
    	
    	//创建kafka cosumer config
    	private static ConsumerConfig createConsumerConfig() {
            Properties props = new Properties();
            props.put("zookeeper.connect", "192.168.31.187:2181,192.168.31.19:2181,192.168.31.227:2181");
            props.put("group.id", "eshop-cache-group");
            props.put("zookeeper.session.timeout.ms", "40000");
            props.put("zookeeper.sync.time.ms", "200");
            props.put("auto.commit.interval.ms", "1000");
            return new ConsumerConfig(props);
        }
    }
    

      

    import com.alibaba.fastjson.JSONObject;
    import com.roncoo.eshop.model.ProductInfo;
    import com.roncoo.eshop.model.ShopInfo;
    import com.roncoo.eshop.service.CacheService;
    import com.roncoo.eshop.spring.SpringContext;
    
    import kafka.consumer.ConsumerIterator;
    import kafka.consumer.KafkaStream;
    
    //kafka消息处理线程
    @SuppressWarnings("rawtypes")
    public class KafkaMessageProcessor implements Runnable {
    
    	private KafkaStream  kafkaStream;
    	private CacheService cacheService;
    	
    	public KafkaMessageProcessor(KafkaStream kafkaStream) {
    		this.kafkaStream = kafkaStream;
    		this.cacheService = (CacheService) SpringContext.getApplicationContext()
    				.getBean("cacheService"); 
    	}
    	
    	@SuppressWarnings("unchecked")
    	public void run(){
    		ConsumerIterator<byte[],byte[]> it=	kafkaStream.iterator();
    		while(it.hasNext()){
    			String message =new String(it.next().message());
    			// 首先将message转换成json对象
    			JSONObject messageJSONObject =JSONObject.parseObject(message);
    			
    			// 从这里提取出消息对应的服务的标识
    			String serviceId =messageJSONObject.getString("serviceId");  
    			
    			// 如果是商品信息服务
    			if("productInfoService".equals(serviceId)){
    				processProductInfoChangeMessage(messageJSONObject);
    			}else if("shopInfoService".equals(serviceId)) {
            		processShopInfoChangeMessage(messageJSONObject);  
            	}
    		}
    	}
    
    	//处理商品信息变更的消息
    	private void processProductInfoChangeMessage(JSONObject messageJSONObject){
    		// 提取出商品id
    		Long productId = messageJSONObject.getLong("productId");
    		
    		// 调用商品信息服务的接口
    		// 直接用注释模拟:getProductInfo?productId=1,传递过去
    		// 商品信息服务,一般来说就会去查询数据库,去获取productId=1的商品信息,然后返回回来
    		
    		String productInfoJSON = "{"id": 1, "name": "iphone7手机", "price": 5599, "pictureList":"a.jpg,b.jpg", "specification": "iphone7的规格", "service": "iphone7的售后服务", "color": "红色,白色,黑色", "size": "5.5", "shopId": 1}";
    		ProductInfo productInfo = JSONObject.parseObject(productInfoJSON, ProductInfo.class);
    		cacheService.saveProductInfo2LocalCache(productInfo);
    		System.out.println("===================获取刚保存到本地缓存的商品信息:" + cacheService.getProductInfoFromLocalCache(productId));  
    		cacheService.saveProductInfo2ReidsCache(productInfo);  
    	}
    	
    	//处理店铺信息变更的消息
    	private void processShopInfoChangeMessage(JSONObject messageJSONObject){
    		// 提取出商品id
    		Long productId = messageJSONObject.getLong("productId");
    		Long shopId = messageJSONObject.getLong("shopId"); 
    		
    		String shopInfoJSON = "{"id": 1, "name": "小王的手机店", "level": 5, "goodCommentRate":0.99}";
    		ShopInfo shopInfo = JSONObject.parseObject(shopInfoJSON, ShopInfo.class);
    		cacheService.saveShopInfo2LocalCache(shopInfo);
    		System.out.println("===================获取刚保存到本地缓存的店铺信息:" + cacheService.getShopInfoFromLocalCache(shopId));   
    		cacheService.saveShopInfo2ReidsCache(shopInfo);  
    	}
    }
    

      

    public class ProductInfo {
    	private Long id;
    	private String name;
    	private Double price;
    	
    	public ProductInfo() {
    		
    	}
    	
    	public ProductInfo(Long id, String name, Double price) {
    		this.id = id;
    		this.name = name;
    		this.price = price;
    	}
    	
    	public Long getId() {
    		return id;
    	}
    	public void setId(Long id) {
    		this.id = id;
    	}
    	public String getName() {
    		return name;
    	}
    	public void setName(String name) {
    		this.name = name;
    	}
    	public Double getPrice() {
    		return price;
    	}
    	public void setPrice(Double price) {
    		this.price = price;
    	}
    }
    

      

    //店铺信息
    public class ShopInfo {
    	private Long id;
    	private String name;
    	private Integer level;
    	private Double goodCommentRate;
    
    	public Long getId() {
    		return id;
    	}
    
    	public void setId(Long id) {
    		this.id = id;
    	}
    
    	public String getName() {
    		return name;
    	}
    
    	public void setName(String name) {
    		this.name = name;
    	}
    
    	public Integer getLevel() {
    		return level;
    	}
    
    	public void setLevel(Integer level) {
    		this.level = level;
    	}
    
    	public Double getGoodCommentRate() {
    		return goodCommentRate;
    	}
    
    	public void setGoodCommentRate(Double goodCommentRate) {
    		this.goodCommentRate = goodCommentRate;
    	}
    
    	@Override
    	public String toString() {
    		return "ShopInfo [id=" + id + ", name=" + name + ", level=" + level
    				+ ", goodCommentRate=" + goodCommentRate + "]";
    	}
    }
    

      

    import com.roncoo.eshop.model.ProductInfo;
    import com.roncoo.eshop.model.ShopInfo;
    
    //缓存service接口
    public interface CacheService {
       //将商品信息保存到本地缓存中
    	public ProductInfo saveLocalCache(ProductInfo productInfo);
    	
    	//从本地缓存中获取商品信息
    	public ProductInfo getLocalCache(Long id);
    	
    	//将商品信息保存到本地的ehcache缓存中
    	public ProductInfo saveProductInfo2LocalCache(ProductInfo productInfo);
    	
    	//从本地ehcache缓存中获取商品信息
    	public ProductInfo getProductInfoFromLocalCache(Long productId);
    	
    	// 将店铺信息保存到本地的ehcache缓存中
    	public ShopInfo saveShopInfo2LocalCache(ShopInfo shopInfo);
    	
    	//从本地ehcache缓存中获取店铺信息
    	public ShopInfo getShopInfoFromLocalCache(Long shopId);
    	
    	//将商品信息保存到redis中
    	public void saveProductInfo2ReidsCache(ProductInfo productInfo);
    	
    	//将店铺信息保存到redis中
    	public void saveShopInfo2ReidsCache(ShopInfo shopInfo);
    	
    }
    

      

    import javax.annotation.Resource;
    
    import org.springframework.cache.annotation.CachePut;
    import org.springframework.cache.annotation.Cacheable;
    import org.springframework.stereotype.Service;
    
    import com.alibaba.fastjson.JSONObject;
    import com.roncoo.eshop.model.ProductInfo;
    import com.roncoo.eshop.model.ShopInfo;
    import com.roncoo.eshop.service.CacheService;
    
    import redis.clients.jedis.JedisCluster;
    
    //缓存Service实现类
    @Service("cacheService")
    public class CacheServiceImpl implements CacheService {
    	public static final String CACHE_NAME ="local";
    	
    	@Resource
    	private JedisCluster jedisCluster;
    	
    	//将商品信息保存到本地缓存中
    	@CachePut(value = CACHE_NAME, key = "'key_'+#productInfo.getId()")
    	public ProductInfo saveLocalCache(ProductInfo productInfo) {
    		return productInfo;
    	}
    	
    	 // 从本地缓存中获取商品信息
    	@Cacheable(value = CACHE_NAME, key = "'key_'+#id")
    	public ProductInfo getLocalCache(Long id) {
    		return null;
    	}
    	
    	//将商品信息保存到本地的ehcache缓存中
    	@CachePut(value=CACHE_NAME,key = "'product_info_'+#productInfo.getId()")
    	public ProductInfo saveProductInfo2LocalCache(ProductInfo productInfo) {
    		return productInfo;
    	}
    	
    	//从本地ehcache缓存中获取商品信息
    	@Cacheable(value = CACHE_NAME, key = "'product_info_'+#productId")
    	public ProductInfo getProductInfoFromLocalCache(Long productId) {
    		return null;
    	}
    	
    	//将店铺信息保存到本地的ehcache缓存中
    	@CachePut(value = CACHE_NAME, key = "'shop_info_'+#shopInfo.getId()")
    	public ShopInfo saveShopInfo2LocalCache(ShopInfo shopInfo) {
    		return shopInfo;
    	}
    	
    	//从本地ehcache缓存中获取店铺信息
    	@Cacheable(value = CACHE_NAME, key = "'shop_info_'+#shopId")
    	public ShopInfo getShopInfoFromLocalCache(Long shopId) {
    		return null;
    	}
    	
    	//将商品信息保存到redis中
    	public void saveProductInfo2ReidsCache(ProductInfo productInfo){
    		String key="product_info_" + productInfo.getId();
    		jedisCluster.set(key, JSONObject.toJSONString(productInfo));
    	}
    	
    	//将店铺信息保存到redis中
    	public void saveShopInfo2ReidsCache(ShopInfo shopInfo){
    		String key="shop_info_" + shopInfo.getId();
    		jedisCluster.set(key, JSONObject.toJSONString(shopInfo));  
    	}
    }
    

      

    import org.springframework.context.ApplicationContext;
    
    /**
     * spring上下文
     */
    public class SpringContext {
    
    	private static ApplicationContext applicationContext;
    
    	public static ApplicationContext getApplicationContext() {
    		return applicationContext;
    	}
    
    	public static void setApplicationContext(ApplicationContext applicationContext) {
    		SpringContext.applicationContext = applicationContext;
    	}
    	
    }
    

      

    import javax.annotation.Resource;
    import org.springframework.stereotype.Controller;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.ResponseBody;
    import com.roncoo.eshop.model.ProductInfo;
    import com.roncoo.eshop.service.CacheService;
    
    @Controller
    public class CacheController {
    	@Resource
    	private CacheService cacheService;
    	
    	@RequestMapping("/testPutCache")
    	@ResponseBody
    	public String testPutCache(ProductInfo productInfo) {
    		cacheService.saveLocalCache(productInfo);
    		return "success";
    	}
    	
    	@RequestMapping("/testGetCache")
    	@ResponseBody
    	public ProductInfo testGetCache(Long id) {
    		return cacheService.getLocalCache(id);
    	}
    }
    

      如何提升缓存命中率:

    分发层+应用层,双层nginx

    分发层nginx,负责流量分发的逻辑和策略,这个里面它可以根据你自己定义的一些规则,比如根据productId去进行hash,然后对后端的nginx数量取模

    将某一个商品的访问的请求,就固定路由到一个nginx后端服务器上去,保证说只会从redis中获取一次缓存数据,后面全都是走nginx本地缓存了

    后端的nginx服务器,就称之为应用服务器; 最前端的nginx服务器,被称之为分发服务器

    大幅度提升你的nginx本地缓存这一层的命中率,大幅度减少redis后端的压力,提升性能

     缓存命中率低的原因:
  • 相关阅读:
    poj3181(Dollar Dayz)
    poj3666(Making the Grade)
    poj2392(Space Elevator)
    hdu5288(OO’s Sequence)
    hdu5289(Assignment)
    快学scala
    Spark Checkpointing
    Spark Performance Tuning (性能调优)
    Spark Memory Tuning (内存调优)
    Sparkstreaming and Kafka
  • 原文地址:https://www.cnblogs.com/sunliyuan/p/11366478.html
Copyright © 2011-2022 走看看