zoukankan      html  css  js  c++  java
  • 大数据统计分析平台之一、Kafka单机搭建

    1、zookeeper搭建

      Kafka集群依赖zookeeper,需要提前搭建好zookeeper

      单机模式(7步)(集群模式进阶请移步:http://blog.51cto.com/nileader/795230)

     Step1:

    cd /usr/local/software 

    jdk-8u161-linux-x64.rpm
    链接:https://pan.baidu.com/s/1i6iHIDJ 密码:bgcc

    rpm -ivh jdk-8u161-linux-x64.rpm

    vi /etc/profile

    JAVA_HOME=/usr/java/jdk1.8.0_161
    JRE_HOME=/usr/java/jdk1.8.0_161/jre
    PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin
    CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JRE_HOME/lib
    export JAVA_HOME JRE_HOME PATH CLASSPATH

    source /etc/profile

    echo $PATH

    Step2:

    # 下载zookeeper

    wget http://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.4.11/zookeeper-3.4.11.tar.gz

    # 如果下载不到,可以使用迅雷,或者使用百度云盘

    链接:https://pan.baidu.com/s/1MXYd4UlKWvqB6EcVLyF8cg 密码:an6t

    # 解压

    tar -zxvf zookeeper-3.4.11.tar.gz

    # 移动一下

    mv zookeeper-3.4.11 /usr/local/zookeeper-3.4.11

    Step3:重命名 zoo_sample.cfg文件

     mv /usr/local/zookeeper-3.4.11/conf/zoo_sample.cfg  /usr/local/zookeeper-3.4.11/conf/zoo.cfg

     Step4:vi /usr/local/zookeeper-3.4.11/conf/zoo.cfg,修改

    dataDir=/usr/local/zookeeper-3.4.11/data

    Step5:创建数据目录

    mkdir  /usr/local/zookeeper-3.4.11/data


    Step6:启动zookeeper:执行

    /usr/local/zookeeper-3.4.11/bin/zkServer.sh start

    Step7:检测是否成功启动:执行

    /usr/local/zookeeper-3.4.11/bin/zkCli.sh 
    或者
    yum install nc -y
    echo stat| nc localhost 2181

    ================================================================================================================

    2、下载Kafka

    # mkdir -p /usr/local/software
    # cd /usr/local/software
    # wget http://mirror.bit.edu.cn/apache/kafka/1.0.0/kafka_2.12-1.0.0.tgz

    # 百度云下载地址:
    链接:https://pan.baidu.com/s/1Kp0uD_5YjGKOLkbW_igm2g 密码:v1q7
       kafka_2.12-1.0.0.tgz    //其中2.12-1.0.0为Scala的版本,kafka-1.0.0-src.tgz为kafka版本
     
    3、解压
    # tar zxf kafka_2.12-1.0.0.tgz -C /usr/local/
    # cd /usr/local/
    # mv kafka_2.12-1.0.0/ kafka/
    4、配置
    mkdir -p /usr/local/kafka/kafkaLogs
    复制代码
    # vi /usr/local/kafka/config/server.properties

    # broker的ID,集群中每个broker ID不可相同
    broker.id=0
    # 监听器,端口号和port一致即可
    listeners=PLAINTEXT:/10.10.6.225/:9092
    # Broker的监听端口
    port=9092

    # 必须填写当前服务器IP地址
    host.name=10.10.6.225

    # 必须填写当前服务器IP地址
    advertised.host.name=10.10.6.225
    # 暂未配置集群
    zookeeper.connect=10.10.6.225:2181

    # 消息持久化目录
    log.dirs=/usr/local/kafka/kafkaLogs

    # 可以删除主题
    delete.topic.enable=true

    # 关闭自动创建topic
    auto.create.topics.enable=false
    复制代码
    5、配置Kafka的环境变量
    # vi /etc/profile
      export KAFKA_HOME=/usr/local/kafka
      export PATH=$PATH:$KAFKA_HOME/bin
    # source /etc/profile


    # vi /etc/hosts

    # es为主机名 ,这里一定要注意,是主机名!!!!重要的话说三次!!!!!!!!
    127.0.0.1 es
    10.10.6.225 es
    6、启动与停止Kafka
    # kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
      官方推荐启动方式:
    # /usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties &

    但这种方式退出shell后会自动断开

    停止:

    kafka-server-stop.sh 
    7、验证
    # jps
        2608 Kafka
    2236 QuorumPeerMain
    2687 Jps
    看到Kafka的进程,说明Kafka已经启动
     
    8、创建topic
        创建名为test,partitions为3,replication为3的topic
    # kafka-topics.sh --create --zookeeper 10.10.6.225:2181 --partitions 1 --replication-factor 1 --topic test
        查看topic状态
    # kafka-topics.sh --describe --zookeeper 10.10.6.225:2181 --topic test
      Topic:test      PartitionCount:1        ReplicationFactor:1     Configs:
       Topic: test     Partition: 0    Leader: 0       Replicas: 0     Isr: 0
     
        删除topic
        执行如下命令
    # kafka-topics.sh --delete --zookeeper 10.10.6.225:2181 --topic test
    9、测试使用Kafka
        发送消息
    # kafka-console-producer.sh --broker-list 10.10.6.225:9092 --topic test
    输入以下信息:
      This is a message
      This is another message
        接收消息
    # kafka-console-consumer.sh --bootstrap-server 10.10.6.225:9092 --topic test --from-beginning 
        若看到上输入的信息说明已经搭建成功。
     
    更复杂配置参考:
     
    黄海添加于2018-02-11 夜
    链接:https://pan.baidu.com/s/1i6HnIzr 密码:1soq
     
    KafkaProducer.py
    # http://kafka-python.readthedocs.io/en/master/
    # 安装办法:
    # C:UsersAdministrator>pip install kafka-python
    # Collecting kafka-python
    #  Downloading kafka_python-1.4.1-py2.py3-none-any.whl (235kB)
    #    100% |████████████████████████████████| 235kB 150kB/s
    # Installing collected packages: kafka-python
    # Successfully installed kafka-python-1.4.1
    # http://blog.csdn.net/evankaka/article/details/52421314
    
    from kafka import KafkaProducer
    from Util.MySQLHelper import *
    import json

    producer = KafkaProducer(bootstrap_servers='10.10.6.225:9092', value_serializer=lambda v: json.dumps(v).encode('utf-8'))
    db = MySQLHelper()
    sql = "select ID,RESOURCE_ID_INT,RESOURCE_ID_CHAR,RESOURCE_TITLE,RESOURCE_TYPE_NAME,RESOURCE_FORMAT,RESOURCE_PAGE,CAST(CREATE_TIME AS CHAR) AS CREATE_TIME,DOWN_COUNT,FILE_ID,RESOURCE_TYPE,STRUCTURE_ID,PERSON_ID,PERSON_NAME,IDENTITY_ID from t_resource_info limit 100"
    dt = db.query(sql)

    print(len(dt))

    for row in dt:
    producer.send('t_resource_info', row)

    producer.flush()

    print('恭喜,完成!')
    
    

    不依赖于MYSQL的数据提交:

    import json
    from kafka import KafkaProducer
    import datetime
    
    # kafka的服务器位置
    kafka_servers = '10.10.6.194:9092'
    
    # 日期的转换器
    class DateEncoder(json.JSONEncoder):
        def default(self, obj):
            if isinstance(obj, datetime.datetime):
                return obj.strftime('%Y-%m-%d %H:%M:%S')
            elif isinstance(obj, datetime.date):
                return obj.strftime("%Y-%m-%d")
            else:
                return json.JSONEncoder.default(self, obj)
    
    
    # 黄海定义的输出信息的办法,带当前时间
    def logInfo(msg):
        i = datetime.datetime.now()
        print(" %s            %s" % (i, msg))
    
    # 统一的topic名称
    topicName = 'test'
    
    dt=[{"id":1,"name":"刘备"},{"id":2,"name":"关羽"},{"id":3,"name":"张飞"}]
    
    # kafka的生产者
    producer = KafkaProducer(bootstrap_servers=kafka_servers)
    
    # # 将字段大写转为小写
    for row in dt:
        new_dics = {}
        for k, v in row.items():
            new_dics[k.lower()] = v
            jstr = json.dumps(new_dics, cls=DateEncoder)
        producer.send(topic=topicName, partition=0, value=jstr.encode('utf-8'))
    # 提交一下
    producer.flush()
    print('恭喜,完成!')

    KafkaConsumer.py

    from kafka import KafkaConsumer
    import time
    
    
    def log(str):
        t = time.strftime(r"%Y-%m-%d_%H-%M-%S", time.localtime())
        print("[%s]%s" % (t, str))
    
    
    log('start consumer')
    # 消费192.168.120.11:9092上的world 这个Topic,指定consumer group是consumer-20171017
    consumer = KafkaConsumer('foobar', bootstrap_servers=['localhost:9092'])
    for msg in consumer:
        recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value)
        log(recv)

     如果是想读取kafka记得的所有消费记录:

    from kafka import KafkaConsumer
    import time
    
    # kafka的服务器位置
    kafka_servers = '10.10.6.194:9092'
    # 统一的topic名称
    topicName = 'test'
    
    def log(str):
        t = time.strftime(r"%Y-%m-%d_%H-%M-%S", time.localtime())
        print("[%s]%s" % (t, str))
    
    
    log('启动消费者...')
    # auto_offset_reset='earliest' 这个参数很重要,如果加上了,就是kafka记录的最后一条位置,如果不加,就是以后要插入的数据了。
    #consumer = KafkaConsumer(topicName, auto_offset_reset='earliest', bootstrap_servers=kafka_servers)
    consumer = KafkaConsumer(topicName, bootstrap_servers=kafka_servers)
    for msg in consumer:
        recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value)
        log(recv)
  • 相关阅读:
    NODE 开发 2-3年工作经验 掌握的相关知识
    react 问题
    vue 问题集合 |
    前端实用工具大全, 有任何棘手的实现, 可以来这里拿
    react 入门的好东西 可以做出一个完整的网站
    vue 问题集合
    js 预处理 与 执行 的顺序
    js_6_dom选择
    js_4_函数
    js_3_for_if_try
  • 原文地址:https://www.cnblogs.com/littlehb/p/8438401.html
Copyright © 2011-2022 走看看