zoukankan      html  css  js  c++  java
  • 【慕课网实战】Spark Streaming实时流处理项目实战笔记五之铭文升级版

    铭文一级:

    单节点单broker的部署及使用

    $KAFKA_HOME/config/server.properties
    broker.id=0
    listeners
    host.name
    log.dirs
    zookeeper.connect

    启动Kafka
    kafka-server-start.sh
    USAGE: /home/hadoop/app/kafka_2.11-0.9.0.0/bin/kafka-server-start.sh [-daemon] server.properties [--override property=value]*

    kafka-server-start.sh $KAFKA_HOME/config/server.properties

    创建topic: zk
    kafka-topics.sh --create --zookeeper hadoop000:2181 --replication-factor 1 --partitions 1 --topic hello_topic

    查看所有topic
    kafka-topics.sh --list --zookeeper hadoop000:2181

    发送消息: broker
    kafka-console-producer.sh --broker-list hadoop000:9092 --topic hello_topic

    消费消息: zk
    kafka-console-consumer.sh --zookeeper hadoop000:2181 --topic hello_topic --from-beginning


    --from-beginning的使用

    查看所有topic的详细信息:kafka-topics.sh --describe --zookeeper hadoop000:2181
    查看指定topic的详细信息:kafka-topics.sh --describe --zookeeper hadoop000:2181 --topic hello_topic

    单节点多broker
    server-1.properties
    log.dirs=/home/hadoop/app/tmp/kafka-logs-1
    listeners=PLAINTEXT://:9093
    broker.id=1

    server-2.properties
    log.dirs=/home/hadoop/app/tmp/kafka-logs-2
    listeners=PLAINTEXT://:9094
    broker.id=2

    server-3.properties
    log.dirs=/home/hadoop/app/tmp/kafka-logs-3
    listeners=PLAINTEXT://:9095
    broker.id=3

    kafka-server-start.sh -daemon $KAFKA_HOME/config/server-1.properties &
    kafka-server-start.sh -daemon $KAFKA_HOME/config/server-2.properties &
    kafka-server-start.sh -daemon $KAFKA_HOME/config/server-3.properties &

    kafka-topics.sh --create --zookeeper hadoop000:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic

    kafka-console-producer.sh --broker-list hadoop000:9093,hadoop000:9094,hadoop000:9095 --topic my-replicated-topic
    kafka-console-consumer.sh --zookeeper hadoop000:2181 --topic my-replicated-topic

    kafka-topics.sh --describe --zookeeper hadoop000:2181 --topic my-replicated-topic 

    铭文二级:

    Kafka版本下载版本为0.9.0.0比较稳定,再选相对应的scala版本(http://kafka.apache.org/downloads

    单节点单broker的部署及使用=>

    配置环境变量,修改配置文件:conf/server.properties

    broker.id = 0                 //唯一id值

    listeners = :9092               //监听端口号,发送的内容到broker即为此端口

    hostname = hadoop000             //默认localhost也行

    log.dirs = /home/app/tmp/kafka-logs     //临时文件目录,需建立tmp,kafka-logs可不建立

    zookeeper.connect = hadoop000:2181      //创建topic,查询topic,消耗者均为此端口

    [num.partitions = 1]                //分区

    启动kafka:kafka-server-start.sh $KAFKA_HOME/config/server.properties  //不知道如何使用就先执行kafka-server-start.sh

    创建topic:kafka-topics.sh --create --zookeeper hadoop000:2181 --replication-factor 1 --partitions 1 --topic hello_topic

    查询所有topic:kafka-topics.sh --list --zookeeper hadoop000:2181

    发送消息:kafka-console-producer.sh --broker-list hadoop000:9092 --topic hello_topic

    消费消息:kafka-console-consumer.sh --zookeeper hadoop000:2181 --topic hello_topic   //可加 --from-beginning 只从一开始的也消费

    查看所有topic的详细信息:kafka-topics.sh --describe --zookeeper hadoop000:2181

    查看指定topic的详细信息:kafka-topics.sh --describe --zookeeper hadoop000:2181 --topic hello_topic

    详细信息:Replicas:3,1,2  // 副本节点  Isr:3,1,2 //存活节点数

    单节点多broker=>

    cp三份配置文件,修改三处,然后分别启动:

    1、broker.id    2、listener    3、log.dirs

    kafka-server-start.sh -daemon $KAFKA_HOME/config/server-1.properties      //-daemon为后台启动

    A、创建topics副本系数要修改成:3

    B、发送消息到三个端口:--broker-list hadoop000:9093,hadoop000:9094,hadoop000:9095

    查看topic详情:kafka-topics.sh --describe --zookeeper hadoop000:2181 --topic my-replicated-topic

    容错性测试=>

    kill -9 5598    //强制杀死进程,分别查看topic详情,观察leader变化

    Kafka API编程环境搭建=>

    一、创建maven项目-> scala-archetype-simple ->

    GroupId:com.imooc.spark

    ArtifactId:sparktrain

    Version:1.0

    设置本地自己安装的maven的settings.xml

    settings.xml 里面记得添加了<localRepository>/Users/rocky/maven-repos</localRepository>

    二、pom.xml文件修改:

    1.将<scala.version>修改成2.11.8

    2.删除多余的dependency,只剩下org.scala.lang,改成${scala.version}

    3.添加dependency

    groupId:org.apache.kafka

    artifactId:kafka_2.11

    version:0.9.0.0  //写出去成${kafka.version}

    三、项目文件夹的建立:

    1.删除多余的项目目录文件夹App、AppTest、MySpec

    2.新建文件夹java(Project Structure->Modules里面设置目录所属类型颜色),新建包:com.imooc.spark.kafka

  • 相关阅读:
    PCB genesis方槽加内角槽孔实现方法
    PCB genesis连孔加除毛刺孔(槽孔与槽孔)实现方法(三)
    PCB genesis连孔加除毛刺孔(圆孔与槽孔)实现方法(二)
    PCB genesis连孔加除毛刺孔(圆孔与圆孔)实现方法(一)
    为什么要用Redis而不直接用Map做缓存
    Linux 查询端口被占用命令
    HashMap 和 Hashtable 的区别
    RandomAccess是什么
    接口和抽象类的区别是什么?
    为什么 Java 中只有值传递?
  • 原文地址:https://www.cnblogs.com/kkxwz/p/8353631.html
Copyright © 2011-2022 走看看