zoukankan      html  css  js  c++  java
  • kafka-spark streaming (一)

    Kafka-spark streaming

    1、安装包

    kafka安装需要zookeeper、jdk。

    官网下载最新的:

    https://kafka.apache.org/downloads

    http://mirrors.hust.edu.cn/apache/zookeeper/

    http://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html

    本例用到的服务版本:kafka_2.11.0.11.tar.gz,jdk1.8

    Zookeeper可以自己安装,也可以使用kafka自带的zk。本例使用自带的zookeeper。

    2、安装kafka(standalone)

    1)# tar xf kafka_2.11-0.11.0.0.tgz

    2)# mv kafka_2.11-0.11.0.0  kafka

    3)# cd kafka ;ls

    bin  config  kafka-logs  libs  LICENSE  logs  NOTICE  site-docs

    3、修改配置文件

    主要是修改server.properties、zookeeper.properties、producer.properties、consumer.properties

    1)  # vim server.properties

    broker.id=0 #每个kafka的broker是唯一的

    delete.topic.enable=true

    listeners=PLAINTEXT://:9092

    num.network.threads=3

    num.io.threads=8

    socket.send.buffer.bytes=102400

    socket.receive.buffer.bytes=102400

    socket.request.max.bytes=104857600

    log.dirs=/data/kafka/kafka-logs

    num.partitions=4 #4个分区,对应spark4个RDD

    num.recovery.threads.per.data.dir=1

    • offsets.topic.replication.factor=1

    transaction.state.log.replication.factor=1

    transaction.state.log.min.isr=1

    log.retention.hours=168

    log.segment.bytes=1073741824

    log.retention.check.interval.ms=300000

    zookeeper.connect=localhost:2181

    zookeeper.connection.timeout.ms=6000

    group.initial.rebalance.delay.ms=0

    2)vim zookeeper.properties

    dataDir=/data/zookeeper

    clientPort=2181

    maxClientCnxns=0

    3)vim producer.properties

      bootstrap.servers=localhost:9092

    compression.type=none

      

     4)vim consumer.properties

        zookeeper.connect=127.0.0.1:2181

    zookeeper.connection.timeout.ms=6000

    group.id=consumer

    #如果zk是集群,则用,隔开。

    #kafka是集群,则group.id相同

    创建相应的目录。

    4、启动服务

    1)启动zk

    # nohup ./zookeeper-server-start.sh ../config/zookeeper.properties 2>&1 &

    2)启动kafka

    # nohup ./kafka-server-start.sh ../config/server.properties 2>&1 &

    3)创建一个topic

     # ./kafka-topics.sh --create --topic kafka-test --replication-factor 1 --partitions 4 --zookeeper localhost:2181

    修改partition的数量:

    # ./kafka-topics.sh --alter --zookeeper localhost:2181 --topic kafka-test --partitions 20

    4)查看创建的所有topic

    # ./kafka-topics.sh --list --zookeeper localhost:2181 

    5)删除某个topic

       # ./kafka-topics.sh --delete --zookeeper localhost:2181 --topic kafka-test

    # 需要在server.properties里面设置delete.topic.enable=true。

    6)模拟producer

    # ./kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic userlog

    7)模拟consumer

    # ./kafka-console-consumer.sh --zookeeper localhost:2181 --topic userlog --from-beginning 

    #如果是分布式的,还需要设置listeners=PLAINTEXT://:9092,默认是127.0.0.1;需要修改为:listeners=PLAINTEXT://服务器ip:9092

    在其他服务器上调用的时候,也是需要将地址改为服务器地址的,还有就是  bootstrap.servers=localhost:9092

     ,改为服务器的地址。

    --------崔帅的拾荒
  • 相关阅读:
    vue后台模板推荐
    Webstorm的一些常用快捷键
    dataTables 插件学习整理
    js阻止事件冒泡
    vscode常用快捷键
    VScode插件以及配置
    今日笔记2
    ES6中的import()函数
    ES6之class 中 constructor 方法 和 super 的作用
    JS设计模式一:单例模式
  • 原文地址:https://www.cnblogs.com/cuishuai/p/7475356.html
Copyright © 2011-2022 走看看