zoukankan      html  css  js  c++  java
  • Kafka小记

    kafka简介

      kafka是由LinkedIn开发,主要是用来处理Linkedin的大面积活跃数据流处理(activity stream).  此类的数据经常用来反映网站的一些有用的信息,比如PV,页面展示给哪些用户访问,用户搜索什么关键字最多,这类信息经常被log到文件里,然后线下且周期性的去分析这些数据。现在这种用户活跃数据已经成为互联网公司重要的一部分,所以必须构建一个更轻量且更精炼的基础架构。 
     
      活跃数据 使用案列 
        分析一下用户行为(pageviews),以便我能设计出更好的广告位。 
        快速的统计用户投票,点击。
        对用户的搜索关键词进行统计,分析出当前的流行趋势。
        防止用户对网站进行无限制的抓取数据,以及超限制的使用API,辨别垃圾。 
        对网站进行全方位的实时监控,从而得到实时有效的性能数据,并且及时的发成警告。
        批量的导入数据到数据仓库,对数据进行离线分析,从而得到有价值的商业信息。(0.6可以直接将数据导入Hadoop) 
     
      活跃数据的特点 

            高流量的活跃数据是无法确定其大小的,因为他可能随时的变化,比如商家可能促销,节假日打折,突然又冒出一个跳楼价等等。所有的数据可能是数量级的往上递增。 传统日志分析方式都是需要离线,而且操作起来比较复杂,根本无法满足实时的分析。另一方面,现有的消息队列系统只能达到近似实时的分析,因为无法消费大量的持久化在队列系统上的信息。Kafka的目标就是能够成为一个高效的队列平台,无论是处理离线的信息还是在线的信息。



    kafka是一个消息订阅和发布的系统,我们将message的发布(publish)者称为producer,将message的订阅(subscribe)者称为consumer,将中间的存储阵列称作broker。它的核心概念有如下几个:
      topic
      partition
      offset
      consumer group


    安装伪分布式kafka
        cd /usr/local
        tar -zxvf kafka_2.10-0.8.2.0.tgz
        mv kafka_2.10-0.8.2.0 kafka
        cd /usr/local/kafka/
        启动Kafka自带的ZooKeeper,后台运行
        bin/zookeeper-server-start.sh config/zookeeper.properties >/dev/null 2>&1 &
        启动Kafka服务,后台运行
        bin/kafka-server-start.sh config/server.properties >/dev/null 2>&1 &
        创建一个Kafka的主题,连接到本地zk,副本因子1,分区1,主题名是test
        bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
        查看ZooKeeper上Kafka的主题
        bin/kafka-topics.sh --list --zookeeper localhost:2181
        查看Kafka的主题详情
        bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
        创建生产者
        bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
        创建消费者
        bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

    安装完全分布式kafka,在h5 h6 h7节点上
        在h5节点上安装Kafka,
        要求启动ZooKeeper集群。
        cd /usr/local
        tar -zxvf kafka_2.10-0.8.2.0.tgz
        mv kafka_2.10-0.8.2.0 kafka
        cd /usr/local/kafka/
        vi config/server.properties
            broker.id=36        ##必须是数字
            host.name=h6        ##可以是IP、主机名、域名
            log.dirs=/usr/local/kafka/logs
        scp -rq /usr/local/kafka/ h6:/usr/local
        scp -rq /usr/local/kafka/ h7:/usr/local
        
        
        创建一个Kafka的主题,连接到zk集群,副本因子3,分区3,主题名是test
            bin/kafka-topics.sh --create --zookeeper h5:2181 --topic test111 --replication-factor 3 --partitions 3
        查看Kafka的主题详情
            bin/kafka-topics.sh --describe --zookeeper h5:2181 --topic test111
            zkCli.sh
                ls /brokers/topics/test/
        使用java代码实现kafka的生产者和消费者
            1、生产者 

     1 package com.mengyao.kafka;
     2 
     3             import java.util.Properties;
     4 
     5             import kafka.javaapi.producer.Producer;
     6             import kafka.producer.KeyedMessage;
     7             import kafka.producer.ProducerConfig;
     8             import kafka.serializer.StringEncoder;
     9 
    10             public class KafkaProducerTest extends Thread {
    11 
    12             private String topic;
    13     
    14             public KafkaProducerTest(){
    15         
    16             }
    17     
    18             public KafkaProducerTest(String topic){
    19                 this.topic = topic;
    20             }
    21 
    22             private Producer<Integer, String> getProducer(Properties prop) {
    23                 return new Producer<Integer, String>(new ProducerConfig(prop));
    24             }
    25 
    26             private Properties getProperties() {
    27                 Properties prop = new Properties();
    28                 prop.put("zookeeper.connect", "h5:2181,h6:2181,h7:2181");
    29                 prop.put("serializer.class", StringEncoder.class.getName());
    30                 prop.put("metadata.broker.list", "h5:9092,h6:9092,h7:9092");
    31                 return prop;
    32             }
    33             
    34             @Override
    35             public void run() {
    36                 Properties prop = getProperties();
    37                 Producer<Integer, String> producer = getProducer(prop);    
    38                 int i = 0;
    39                 while (true) {
    40                     producer.send(new KeyedMessage<Integer, String>(topic, "msg:"+i++));
    41                     try {
    42                         Thread.sleep(1000);
    43                     } catch (InterruptedException e) {
    44                         e.printStackTrace();
    45                     }
    46                 }
    47             }
    48 
    49             public static void main(String[] args) {
    50                 new KafkaProducerTest("test111").start();
    51             }
    52 
    53         }


                
            2、消费者            

     1 package com.mengyao.kafka;
     2 
     3             import java.util.HashMap;
     4             import java.util.List;
     5             import java.util.Map;
     6             import java.util.Properties;
     7 
     8             import kafka.consumer.Consumer;
     9             import kafka.consumer.ConsumerConfig;
    10             import kafka.consumer.ConsumerIterator;
    11             import kafka.consumer.KafkaStream;
    12             import kafka.javaapi.consumer.ConsumerConnector;
    13             import kafka.serializer.StringEncoder;
    14 
    15             public class KafkaConsumerTest extends Thread {
    16     
    17             private String topic;
    18     
    19             public KafkaConsumerTest() {
    20         
    21             }
    22             
    23             public KafkaConsumerTest(String topic) {
    24                 this.topic = topic;
    25             }
    26     
    27             private ConsumerConnector getConsumer(Properties prop) {
    28                 return Consumer.createJavaConsumerConnector(new ConsumerConfig(prop));
    29             }
    30     
    31             private Properties getProperties() {
    32                 Properties prop = new Properties();
    33                 prop.put("zookeeper.connect", "h5:2181,h6:2181,h7:2181");
    34                 prop.put("serializer.class", StringEncoder.class.getName());
    35                 prop.put("metadata.broker.list", "h5:9092,h6:9092,h7:9092");
    36                 prop.put("group.id", "group1");
    37                 return prop;
    38             }
    39 
    40             @Override
    41             public void run() {
    42                 Properties prop = getProperties();
    43                 ConsumerConnector consumer = getConsumer(prop);
    44                 HashMap<String, Integer> topicCountMap = new HashMap<String, Integer>();
    45                 topicCountMap.put(topic, 1);
    46                 Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = consumer.createMessageStreams(topicCountMap);
    47                 KafkaStream<byte[], byte[]> kafkaStream = messageStreams.get(topic).get(0);
    48                 ConsumerIterator<byte[], byte[]> iterator = kafkaStream.iterator();
    49                 while (iterator.hasNext()) {
    50                     final String msg = new String(iterator.next().message());
    51                     System.out.println(msg);
    52                 }
    53             }
    54     
    55             public static void main(String[] args) {
    56                 new KafkaConsumerTest("test111").start();
    57             }
    58 
    59         }    

           

  • 相关阅读:
    坑爹的微信支付v3,其实没有那么坑
    Mysql探究之null与not null
    Mysql的空值与NULL的区别
    Java编程思想(第4版) 中文清晰PDF完整版
    URI和URL的区别
    html 文本输入框效果大汇集
    HTTP状态码大全
    Silverlight ModelView中调用UI进程
    appium部分api
    appium元素定位
  • 原文地址:https://www.cnblogs.com/mengyao/p/4516007.html
Copyright © 2011-2022 走看看