一:flume复习
0.JMS(java message service )java消息服务
--------------------------------------------------------------
queue(队列模式):点对点服务只能有一个消费者。也叫做点对点模式
publish-subscribe(发布-订阅模式,也叫做主题模式):
1.flume是收集,聚合,移动日志的框架
2.agent:
source: //接受数据的,生产者
//netcat
//ExecSource实时收集:tail -F xxx.txt
//spooldir监控文件夹
//seq
//Stress 压力测试
//avroSource
channel //暂存数据,相当于缓冲区
//非永久性的:MemoryChannel
//永久性:FileChannel磁盘
//SpillableMemoryChannel:是内存通道和文件通道的一个重组
sink //输出数据,消费者,从通道中提取数据
//HdfsSink //
//HBaseSink //
//HiveSink //
//avroSink
kafka
-------------------------------------------------------
一、kafka简介
1.JMS:java message service :java消息服务
2.kafka:是分布式流处理平台,在系统之间构建实时数据流管道
3.kafka以集群的形式运行有一个或者多个主机,kafka以主题来分类存储记录,每个记录都有key ,value和timestamp
4.Producer:生产者;Consumer:消费者;consumer group消费者组;kafka server 包括broker,kafka服务器
topic:消息以topic为类别记录,每一类的消息称为一个主题
broker:以集群的方式运行,可以由一个或者多个服务组成,每个服务叫做一个broker,消费者可以订阅一个或者多个主题,并从broker拉数据,从而消费这些已经发布的消息
每个消息是由:key+value+timestamp组成
5.kafka:每秒钟百万数据吞吐量
二、安装kafka
0.选择s202 ~ s204三台主机安装kafka
1.准备zk
略
2.jdk
略
3.tar文件
4.环境变量
略
5.配置kafka
[kafka/config/server.properties]
...
broker.id=202
...
listeners=PLAINTEXT://:9092
...
log.dirs=/home/centos/kafka/logs
...
zookeeper.connect=s201:2181,s202:2181,s203:2181
6.分发server.properties,同时修改每个文件的broker.id
7.启动kafka服务器
a)先启动zk
b)启动kafka
[s202 ~ s204]
$>bin/kafka-server-start.sh config/server.properties
c)验证kafka服务器是否启动
$>netstat -anop | grep 9092
8.创建主题
$>bin/kafka-topics.sh --create --zookeeper s202:2181 --replication-factor 3 --partitions 3 --topic test //创建主题,分区数为3
9.查看主题列表
$>bin/kafka-topics.sh --list --zookeeper s202:2181
10.启动控制台生产者
$>bin/kafka-console-producer.sh --broker-list s202:9092 --topic test
11.启动控制台消费者
$>bin/kafka-console-consumer.sh --bootstrap-server s202:9092 --topic test --from-beginning --zookeeper s202:2181 //从头开始消费数据
12.在生产者控制台输入hello world
三、
1.副本:broker存放消息以消息到达的顺序进行存放,生产和消费都是副本感知的,支持n-1个故障,每个分区都有leader
新leader选举的过程是通过isr进行的,第一个注册的follower称为新的leader
2.kafka支持的副本模式:
[同步复制]:
1.producer联系zk识别leader
2.向leader发送消息
3.leader收入消息,写入本地log
4.follower从leader pull消息
5.follower向本地写入log
6.follower向leader发送ack确认消息
7.leader收到所有的ack消息
8.leader向producer回传ack消息
[异步复制]:
合同步复制的区别在与leader写入本地log后,直接向client回传ack消息,不需要等待所有的follower复制完成,但是这种模式不能保证消息被生产者分发
API方式进行访问
------------------------------------------
1.消息生产者
package com.it18zhang.kafkaDemo.test; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; import org.junit.Test; import java.util.HashMap; import java.util.Properties; /** * Created by stone on 2018/8/17. */ public class TestProducer { @Test public void testSend(){ Properties props = new Properties(); props.put("metadata.broker.list","s202:9092"); props.put("serializer.class","kafka.serializer.StringEncoder"); props.put("request.required.acks","1"); //配置生产值配置对象 ProducerConfig config = new ProducerConfig(props); //创建生产者 Producer<String,String> producer = new Producer<String,String>(config); KeyedMessage<String,String> msg =new KeyedMessage<String, String>("test2","100","hello world jack"); producer.send(msg); System.out.println("send over"); } }
2.消息消费者
@Test public void testConsumer(){ Properties prop = new Properties(); prop.put("zookeeper.connect","s202:2181"); prop.put("group.id","g1"); prop.put("zookeeper.session.timeout.ms","500"); prop.put("zookeeper.sync.time.ms","1000"); //创建消费者配置 Map<String,Integer> map = new HashMap<String, Integer>(); map.put("test2",new Integer(1)); Map<String, List<KafkaStream<byte[], byte[]>>> msgs= Consumer.createJavaConsumerConnector(new ConsumerConfig(prop) ).createMessageStreams(map); List<KafkaStream<byte[], byte[]>> msgList = msgs.get("test2"); for(KafkaStream<byte[],byte[]> stream : msgList){ ConsumerIterator<byte[],byte[]> it = stream.iterator(); while(it.hasNext()){ byte[] message = it.next().message(); System.out.println(new String(message)); } } }
flume与kafka集成的方式
----------------------------------------
1.flume数据sink到kafka
flume充当生产者
a1.sources=r1
a1.sinks=k1
a1.channels=c1
a1.sources.r1.type=netcat
a1.sources.r1.port=8888
a1.sources.r1.bind=localhsot
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = test2
a1.sinks.k1.kafka.bootstrap.servers = s202:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.channel = c1
a1.channels.c1.type=memory
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
2.kafka充当source
flume充当消费者
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = s202:9092
a1.sources.r1.kafka.topics = test3
a1.sources.r1.kafka.consumer.group.id = g4
a1.sinks.k1.type = logger
a1.channels.c1.type=memory
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
3.channels通道临时数据存放地缓冲区 ,flume通道有内存通道,文件通道,同时数据也可以存放进入kafka中去,把消息放在kafka里面,flume充当生产者
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.bind = localhost
a1.sources.r1.port = 8888
a1.sinks.k1.type = logger
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = s202:9092
a1.channels.c1.kafka.topic = test3
a1.channels.c1.kafka.consumer.group.id = g6
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1