NewProducer: 1、通过回调,手动监控数据,并进行错误处理 2、手动控制同步,异步 3、linger.ms 控制消息在buffer停留时间 4、数据类型 StringDeSerializer ShortDeSerializer IntegerDeSerializer LongDeSerializer FloatDeSerializer DoubleDeSerializer ByteDeSerializer ByteArrayDeSerializer BytesDeSerializer NewConsumer: 1、指定多主题消费 2、指定分区消费 3、手动修改偏移量 consumer.commitSync 4、修改消费指针 consumer.seek() consumer.poll() 5、数据类型 StringSerializer ShortSerializer IntegerSerializer LongSerializer FloatSerializer DoubleSerializer ByteSerializer ByteArraySerializer BytesSerializer NewPartition: 1、可以根据topic来指定分区 2、可以根据value指定分区 对kafka进行压力和负载测试 ===================================== https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines 单线程生产,副本x1,分区x6 key:null value:byte[1024] ByteArraySerializer kafka-topics.sh --create --topic test1 --zookeeper s102:2181 --partitions 6 --replication-factor 1 结果:1G ===> 87s 11.7M/s 12052条/s public class NewProducerTest1 { public static void main(String[] args) throws InterruptedException, ExecutionException { Properties props = new Properties(); props.put("bootstrap.servers", "s102:9092"); props.put("acks", "0"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 10); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); long start = System.currentTimeMillis(); //初始化kafka生产者对象 Producer<String, byte[]> producer = new KafkaProducer<String, byte[]>(props); byte[] value = new byte[1024]; for (int i = 0; i < 1024 * 1024; i++) { //初始化kafka记录,包括topic,key,value ProducerRecord record = new ProducerRecord("test2",value); producer.send(record); } producer.close(); System.out.println(System.currentTimeMillis() - start); } } 单线程生产,副本x3,分区x6 key:null value:byte[1024] ByteArraySerializer kafka-topics.sh --create --topic test2 --zookeeper s102:2181 --partitions 6 --replication-factor 3 结果:1G ===> 188s 5.4M/s 5577条/s 3线程生产,副本x3,分区x6 key:null value:byte[1024] ByteArraySerializer kafka-topics.sh --create --topic test3 --zookeeper s102:2181 --partitions 6 --replication-factor 3 结果:1G ===> 188s 5.4M/s 5577条/s flume与kafka集成 kafkaSorce //r_kafka.conf ================================ # 将agent组件起名 a1.sources = r1 a1.sinks = k1 a1.channels = c1 # 配置source a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource a1.sources.r1.kafka.bootstrap.servers = s102:9092,s103:9092,s104:9092 a1.sources.r1.kafka.topics = t3 a1.sources.r1.kafka.consumer.group.id = flume # 配置sink a1.sinks.k1.type = logger # 配置channel a1.channels.c1.type = memory a1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 10000 # 绑定channel-source, channel-sink a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 1、启动zk和kafka 2、启动flume flume-ng agent -n a1 -f r_kafka.conf 3、通过producer生产数据 public class NewProducer { public static void main(String[] args) throws InterruptedException, ExecutionException { Properties props = new Properties(); props.put("bootstrap.servers", "s102:9092"); props.put("acks", "0"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("partitioner.class","kafka.NewPartition"); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); long start = System.currentTimeMillis(); //初始化kafka生产者对象 Producer<String, String> producer = new KafkaProducer<String, String>(props); Random r = new Random(); for (int i = 0; i < 10000; i++) { //初始化kafka记录,包括topic,key,value ProducerRecord record = new ProducerRecord("t3",r.nextInt(3)+ "" +i,"tom"+ i); Future future = producer.send(record, new Callback() { public void onCompletion(RecordMetadata metadata, Exception exception) { //System.out.print(metadata.toString()+ " " ); exception.printStackTrace(); } }); future.get(); //Thread.sleep(1000); } producer.close(); System.out.println(System.currentTimeMillis() - start); } } kafkaSink //k_kafka.conf ======================================= # 将agent组件起名 a1.sources = r1 a1.sinks = k1 a1.channels = c1 # 配置source a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 8888 # 配置sink a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = t3 a1.sinks.k1.kafka.bootstrap.servers = s102:9092,s103:9092,s104:9092 # 配置channel a1.channels.c1.type = memory a1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 10000 # 绑定channel-source, channel-sink a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 1、启动flume flume-ng agent -n a1 -f k_kafka.conf 2、启动kafka消费者,指定主题t3 kafka-console-consumer.sh --zookeeper s102:2181 --topic t3 3、使用nc生产数据 nc localhost 8888 KafkaChannel //缓冲区 c_kafka.conf ====================== # 将agent组件起名 a1.sources = r1 a1.sinks = k1 a1.channels = c1 # 配置source a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 8888 # 配置sink a1.sinks.k1.type = logger # 配置channel a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel a1.channels.c1.kafka.bootstrap.servers = s102:9092,s103:9092,s104:9092 a1.channels.c1.kafka.topic = flume-channel a1.channels.c1.kafka.consumer.group.id = flume # 绑定channel-source, channel-sink a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 1、启动flume flume-ng agent -n a1 -f c_kafka.conf 2、启动nc nc localhost 8888 3、在nc发送数据,观察logger中是否打印数据