zoukankan      html  css  js  c++  java
  • kafka和flume集成

    NewProducer:
    
        1、通过回调,手动监控数据,并进行错误处理
        2、手动控制同步,异步
        3、linger.ms 控制消息在buffer停留时间
        4、数据类型
            StringDeSerializer
            ShortDeSerializer
            IntegerDeSerializer
            LongDeSerializer
            FloatDeSerializer
            DoubleDeSerializer
            ByteDeSerializer
            ByteArrayDeSerializer
            BytesDeSerializer
    
    
    NewConsumer:
        1、指定多主题消费
        2、指定分区消费
        3、手动修改偏移量    consumer.commitSync
        4、修改消费指针        consumer.seek()
                    consumer.poll()
        5、数据类型
            StringSerializer
            ShortSerializer
            IntegerSerializer
            LongSerializer
            FloatSerializer
            DoubleSerializer
            ByteSerializer
            ByteArraySerializer
            BytesSerializer
    
    
    NewPartition:
        1、可以根据topic来指定分区
        2、可以根据value指定分区
    
        
        
    对kafka进行压力和负载测试
    =====================================
        https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines
    
        单线程生产,副本x1,分区x6
            key:null
            value:byte[1024] ByteArraySerializer
    
            kafka-topics.sh --create --topic test1 --zookeeper s102:2181 --partitions 6 --replication-factor 1
    
            结果:1G ===> 87s  11.7M/s 12052条/s
    
    
            public class NewProducerTest1 {
                public static void main(String[] args) throws InterruptedException, ExecutionException {
                Properties props = new Properties();
                props.put("bootstrap.servers", "s102:9092");
                props.put("acks", "0");
                props.put("retries", 0);
                props.put("batch.size", 16384);
                props.put("linger.ms", 10);
                props.put("buffer.memory", 33554432);
    
                props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
                props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
    
                long start = System.currentTimeMillis();
                //初始化kafka生产者对象
                Producer<String, byte[]> producer = new KafkaProducer<String, byte[]>(props);
                byte[] value = new byte[1024];
                for (int i = 0; i < 1024 * 1024; i++) {
                    //初始化kafka记录,包括topic,key,value
                    ProducerRecord record = new ProducerRecord("test2",value);
                    producer.send(record);
    
                }
                producer.close();
                System.out.println(System.currentTimeMillis() - start);
                }
    
            }
    
        单线程生产,副本x3,分区x6
            key:null
            value:byte[1024] ByteArraySerializer
    
            kafka-topics.sh --create --topic test2 --zookeeper s102:2181 --partitions 6 --replication-factor 3
            
            结果:1G ===> 188s  5.4M/s 5577条/s
    
    
        3线程生产,副本x3,分区x6
            key:null
            value:byte[1024] ByteArraySerializer
    
            kafka-topics.sh --create --topic test3 --zookeeper s102:2181 --partitions 6 --replication-factor 3
            
            结果:1G ===> 188s  5.4M/s 5577条/s
    
    
    flume与kafka集成
        
        kafkaSorce    //r_kafka.conf
        ================================
            # 将agent组件起名
            a1.sources = r1
            a1.sinks = k1
            a1.channels = c1
    
            # 配置source
            a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
            a1.sources.r1.kafka.bootstrap.servers = s102:9092,s103:9092,s104:9092
            a1.sources.r1.kafka.topics = t3
            a1.sources.r1.kafka.consumer.group.id = flume
    
            # 配置sink
            a1.sinks.k1.type = logger
    
            # 配置channel
            a1.channels.c1.type = memory
            a1.channels.c1.capacity = 10000
            a1.channels.c1.transactionCapacity = 10000
    
            # 绑定channel-source, channel-sink
            a1.sources.r1.channels = c1
            a1.sinks.k1.channel = c1
    
    
            1、启动zk和kafka
            2、启动flume
                flume-ng agent -n a1 -f r_kafka.conf
            3、通过producer生产数据
                public class NewProducer {
                    public static void main(String[] args) throws InterruptedException, ExecutionException {
                    Properties props = new Properties();
                    props.put("bootstrap.servers", "s102:9092");
                    props.put("acks", "0");
                    props.put("retries", 0);
                    props.put("batch.size", 16384);
                    props.put("linger.ms", 1);
                    props.put("partitioner.class","kafka.NewPartition");
                    props.put("buffer.memory", 33554432);
    
                    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
                    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    
                    long start = System.currentTimeMillis();
                    //初始化kafka生产者对象
                    Producer<String, String> producer = new KafkaProducer<String, String>(props);
                    Random r = new Random();
    
                    for (int i = 0; i < 10000; i++) {
                        //初始化kafka记录,包括topic,key,value
                        ProducerRecord record = new ProducerRecord("t3",r.nextInt(3)+ "" +i,"tom"+ i);
                        Future future = producer.send(record, new Callback() {
                        public void onCompletion(RecordMetadata metadata, Exception exception) {
                            //System.out.print(metadata.toString()+ "	" );
                            exception.printStackTrace();
                        }
                        });
                        future.get();
                        //Thread.sleep(1000);
                    }
                    producer.close();
                    System.out.println(System.currentTimeMillis() - start);
                    }
                }
    
            
    
    
    
        kafkaSink    //k_kafka.conf
        =======================================
            # 将agent组件起名
            a1.sources = r1
            a1.sinks = k1
            a1.channels = c1
    
            # 配置source
            a1.sources.r1.type = netcat
            a1.sources.r1.bind = localhost
            a1.sources.r1.port = 8888
    
            # 配置sink
            a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
            a1.sinks.k1.kafka.topic = t3
            a1.sinks.k1.kafka.bootstrap.servers = s102:9092,s103:9092,s104:9092
    
            # 配置channel
            a1.channels.c1.type = memory
            a1.channels.c1.capacity = 10000
            a1.channels.c1.transactionCapacity = 10000
    
            # 绑定channel-source, channel-sink
            a1.sources.r1.channels = c1
            a1.sinks.k1.channel = c1
    
            1、启动flume
                flume-ng agent -n a1 -f k_kafka.conf
            2、启动kafka消费者,指定主题t3
                kafka-console-consumer.sh --zookeeper s102:2181 --topic t3
    
            3、使用nc生产数据
                nc localhost 8888
    
            
    
        KafkaChannel    //缓冲区  c_kafka.conf
        ======================
            # 将agent组件起名
            a1.sources = r1
            a1.sinks = k1
            a1.channels = c1
    
            # 配置source
            a1.sources.r1.type = netcat
            a1.sources.r1.bind = localhost
            a1.sources.r1.port = 8888
    
            # 配置sink
            a1.sinks.k1.type = logger
    
            # 配置channel
            a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
            a1.channels.c1.kafka.bootstrap.servers = s102:9092,s103:9092,s104:9092
            a1.channels.c1.kafka.topic = flume-channel
            a1.channels.c1.kafka.consumer.group.id = flume
    
            # 绑定channel-source, channel-sink
            a1.sources.r1.channels = c1
            a1.sinks.k1.channel = c1
    
            1、启动flume
                flume-ng agent -n a1 -f c_kafka.conf
            2、启动nc
                nc localhost 8888
            3、在nc发送数据,观察logger中是否打印数据
  • 相关阅读:
    MyBatis学习存档(3)——mapper.xml映射文件
    Springboot Idea热部署以及重启后代码也不生效的问题解决
    属性值为空不更新到数据库工具类
    Poi工具类快速生成Ecxel
    Nginx配置ssl,实现https访问
    商城多商品组合数据格式
    Nginx泛解析配置
    Java支付宝支付接入流程,wap,app接入方式
    springboot配置@ResponseBody注解依然返回xml格式的数据
    通过aop记录日志,记录修改前后的数据,精确到每个字段
  • 原文地址:https://www.cnblogs.com/zyde/p/8947053.html
Copyright © 2011-2022 走看看