zoukankan      html  css  js  c++  java
  • Kafka学习(三)——Java工具类、Springboot集成批量消费、SparkStreaming集成

    前言
    本次记录全部来自工作学习中,总结和测试。并非生产环境,仅供参考使用!

    提示:以下是本篇文章正文内容,下面案例可供参考

    一、Java中工具类
    常用于测试使用~

    1. 添加maven依赖
    <dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.12</artifactId>
    <version>1.0.0</version>
    <scope>provided</scope>
    </dependency>

    <dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>1.0.0</version>
    </dependency>

    <dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <version>1.0.0</version>
    </dependency>
     
    2.消费者:KafkaConsumerTest
    public class KafkaConsumerTest implements Runnable {

    private final KafkaConsumer<String, String> consumer;
    private ConsumerRecords<String, String> msgList;
    private final String topic;
    private static final String GROUPID = "groupA";

    public KafkaConsumerTest(String topicName) {
    Properties props = new Properties();
    props.put("bootstrap.servers", "cdh1:9092,cdh2:9092,cdh3:9092");
    /**
    * 组名 不同组名可以重复消费。
    */
    props.put("group.id", GROUPID);
    /**
    * 是否自动提交,默认为true。
    */
    props.put("enable.auto.commit", "true");
    /**
    * 从poll(拉)的回话处理时长。
    */
    props.put("auto.commit.interval.ms", "1000");
    /**
    * 超时时间。
    */
    props.put("session.timeout.ms", "30000");
    /**
    * 一次最大拉取的条数。
    */
    props.put("max.poll.records",5000);
    /**
    * 消费规则,默认earliest 。
    * earliest: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费 。
    * latest: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据 。
    * none: topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常。
    */
    props.put("auto.offset.reset", "latest");
    props.put("key.deserializer", StringDeserializer.class.getName());
    props.put("value.deserializer", StringDeserializer.class.getName());
    this.consumer = new KafkaConsumer<String, String>(props);
    this.topic = topicName;
    this.consumer.subscribe(Arrays.asList(topic));
    }

    @Override
    public void run() {
    int messageNo = 1;
    boolean stop = false;
    System.out.println("---------开始消费---------");
    try {
    PrintWriter printWriter = new PrintWriter(new FileWriter("data.txt"));
    while (Thread.currentThread().isInterrupted() && !stop) {
    msgList = consumer.poll(1000);
    if(null!=msgList&&msgList.count()>0){
    for (ConsumerRecord<String, String> record : msgList) {
    //消费100条就打印 ,但打印的数据不一定是这个规律的
    System.out.println(messageNo+"=======receive: key = " + record.key() + ", value = " + record.value()+" offset==="+record.offset());
    printWriter.println(record.value());
    messageNo++;
    }
    }else{
    Thread.sleep(1000);
    }
    }
    } catch (InterruptedException e) {
    e.printStackTrace();
    } catch (FileNotFoundException e) {
    e.printStackTrace();
    } catch (IOException e) {
    e.printStackTrace();
    } finally {
    consumer.close();
    }
    }
    public static void main(String args[]) {
    KafkaConsumerTest test1 = new KafkaConsumerTest("KAFKA_TEST");
    Thread thread1 = new Thread(test1);
    thread1.start();
    }
    }
     
    3.生产者:KafkaProducerTest
    public class KafkaProducerTest implements Runnable {

    private final KafkaProducer<String, String> producer;
    private final String topic;
    public KafkaProducerTest(String topicName) {
    Properties props = new Properties();
    props.put("bootstrap.servers", "cdh1:9092,cdh2:9092,cdh3:9092");
    /**
    * acks:消息的确认机制,默认值是0。
    * acks=0:如果设置为0,生产者不会等待kafka的响应。
    * acks=1:这个配置意味着kafka会把这条消息写到本地日志文件中,但是不会等待集群中其他机器的成功响应。
    * acks=all:这个配置意味着leader会等待所有的follower同步完成。这个确保消息不会丢失,除非kafka集群中所有机器挂掉。这是最强的可用性保证。
    */
    props.put("acks", "all");
    /**
    * 配置为大于0的值的话,客户端会在消息发送失败时重新发送。
    */
    props.put("retries", 0);
    /**
    * 当多条消息需要发送到同一个分区时,生产者会尝试合并网络请求。这会提高client和生产者的效率。
    */
    props.put("batch.size", 16384);
    /**
    * key值 序列化,指定序列化类
    */
    props.put("key.serializer", StringSerializer.class.getName());
    /**
    * value值 序列化,指定序列化类
    */
    props.put("value.serializer", StringSerializer.class.getName());
    this.producer = new KafkaProducer<String, String>(props);
    this.topic = topicName;
    }

    @Override
    public void run() {
    int messageNo = 1;

    boolean stop = false;
    try {
    while (!Thread.currentThread().isInterrupted() && !stop) {
    String messageStr="你好,这是第"+messageNo+"条数据";
    producer.send(new ProducerRecord<String, String>(topic, "Message", messageStr));
    //生产了100条就打印
    if(messageNo%100==0){
    System.out.println("发送的信息:" + messageStr);
    }
    //生产1000条就退出
    if(messageNo%10000==0){
    System.out.println("成功发送了"+messageNo+"条");
    break;
    }
    messageNo++;
    }
    } catch (Exception e) {
    e.printStackTrace();
    } finally {
    producer.close();
    }
    }

    public static void main(String args[]) {
    KafkaProducerTest test = new KafkaProducerTest("KAFKA_TEST");
    Thread thread = new Thread(test);
    thread.start();
    }
    }
     
    二、 SpringBoot中使用
    4.1 引入依赖
    compile group: 'org.springframework.kafka' ,name: 'spring-kafka'
    1
    4.2 application.yml
    #kafka配置信息
    kafka:

    bootstrap-servers: 192.168.44.136:9092,192.168.44.137:9092,192.168.44.138:9092
    producer:
    batch-size: 16785 #一次最多发送数据量
    retries: 1 #发送失败后的重复发送次数
    buffer-memory: 33554432 #32M批处理缓冲区
    linger: 1
    consumer:
    auto-offset-reset: latest #最早未被消费的offset earliest
    max-poll-records: 3100 #批量消费一次最大拉取的数据量
    enable-auto-commit: false #是否开启自动提交
    auto-commit-interval: 1000 #自动提交的间隔时间
    session-timeout: 20000 #连接超时时间
    max-poll-interval: 15000 #手动提交设置与poll的心跳数,如果消息队列中没有消息,等待毫秒后,调用poll()方法。如果队列中有消息,立即消费消息,每次消费的消息的多少可以通过max.poll.records配置。
    max-partition-fetch-bytes: 15728640 #设置拉取数据的大小,15M
    listener:
    batch-listener: true #是否开启批量消费,true表示批量消费
    concurrencys: 3,6 #设置消费的线程数
    poll-timeout: 1500 #只限自动提交,
    topics: kafkaTest
    group-id: test1-consumer-group
     
    4.3 KafkaConfiguration.java
    @Configuration
    public class KafkaConfiguration {

    @Value("${kafka.bootstrap-servers}")
    private String bootstrapServers;
    @Value("${kafka.producer.retries}")
    private Integer retries;
    @Value("${kafka.producer.batch-size}")
    private Integer batchSize;
    @Value("${kafka.producer.buffer-memory}")
    private Integer bufferMemory;
    @Value("${kafka.producer.linger}")
    private Integer linger;

    @Value("${kafka.consumer.enable-auto-commit}")
    private Boolean autoCommit;

    @Value("${kafka.consumer.auto-commit-interval}")
    private Integer autoCommitInterval;

    @Value("${kafka.consumer.max-poll-records}")
    private Integer maxPollRecords;

    @Value("${kafka.consumer.auto-offset-reset}")
    private String autoOffsetReset;

    @Value("#{'${kafka.listener.concurrencys}'.split(',')[0]}")
    private Integer concurrency3;

    @Value("#{'${kafka.listener.concurrencys}'.split(',')[1]}")
    private Integer concurrency6;

    @Value("${kafka.listener.poll-timeout}")
    private Long pollTimeout;

    @Value("${kafka.consumer.session-timeout}")
    private String sessionTimeout;

    @Value("${kafka.listener.batch-listener}")
    private Boolean batchListener;

    @Value("${kafka.consumer.max-poll-interval}")
    private Integer maxPollInterval;

    @Value("${kafka.consumer.max-partition-fetch-bytes}")
    private Integer maxPartitionFetchBytes;

    @Bean
    public KafkaTemplate<String, String> kafkaTemplateString() {
    return new KafkaTemplate<>(producerFactory());
    }

    @Bean
    public ProducerFactory<String, String> producerFactory() {

    Map<String, Object> props = new HashMap<>(7);
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ProducerConfig.RETRIES_CONFIG, retries);
    props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
    props.put(ProducerConfig.LINGER_MS_CONFIG, linger);
    props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    return new DefaultKafkaProducerFactory(props);
    }
    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
    return new KafkaTemplate<>(producerFactory());
    }


    private ConsumerFactory<String, String> consumerFactory() {
    Map<String, Object> props = new HashMap<>(10);
    props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);
    props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
    props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollInterval);
    props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, maxPartitionFetchBytes);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    return new DefaultKafkaConsumerFactory<>(props);
    }


    private ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    //批量消费
    factory.setBatchListener(batchListener);
    //如果消息队列中没有消息,等待timeout毫秒后,调用poll()方法。
    // 如果队列中有消息,立即消费消息,每次消费的消息的多少可以通过max.poll.records配置。
    //手动提交无需配置
    factory.getContainerProperties().setPollTimeout(pollTimeout);
    //设置提交偏移量的方式, MANUAL_IMMEDIATE 表示消费一条提交一次;MANUAL表示批量提交一次
    factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
    return factory;
    }
    /**
    * 并发数3
    *
    * @return
    */
    @Bean
    @ConditionalOnMissingBean(name = "kafkaBatchListener3")
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaBatchListener3() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = kafkaListenerContainerFactory();
    factory.setConcurrency(concurrency3);
    return factory;
    }
    /**
    * 并发数6
    *
    * @return
    */
    @Bean
    @ConditionalOnMissingBean(name = "kafkaBatchListener6")
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaBatchListener6() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = kafkaListenerContainerFactory();
    factory.setConcurrency(concurrency6);
    return factory;
    }
    }
     
    4.4 ProducerService.java
    @Service
    public final class ProducerService {
    private static final Logger logger = LoggerFactory.getLogger(ProducerService.class);

    @Value("${kafka.listener.topics}")
    private String topic;

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;


    public void sendMessage(String message) {
    logger.info(String.format("Producing message: %s", message));
    ListenableFuture<SendResult<String, String>> future = this.kafkaTemplate.send(topic, message);
    future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {

    @Override
    public void onSuccess(SendResult<String, String> result) {
    logger.info("Sent message=[ {} ] with offset=[ {} ]", message, result.getRecordMetadata().offset());
    }

    @Override
    public void onFailure(Throwable ex) {
    logger.info("Unable to send message=[ {} ] due to : {}", message, ex.getMessage());
    }
    });
    }
    }

     
    4.5 ConsumerService.java
    @Service
    public final class ConsumerService {
    private static final Logger logger = LoggerFactory.getLogger(ConsumerService.class);


    @KafkaListener(containerFactory = "kafkaBatchListener6",topics = {"#{'${kafka.listener.topics}'.split(',')[0]}"},groupId ="${kafka.listener.topics}" )
    public void batchListener(List<ConsumerRecord<?,?>> records,Acknowledgment ack){

    try {
    logger.info("本次总消息数量:{}",records.size());
    List<CusApntClmModel> listModel = new ArrayList<>();
    List<MsgEntity> listMsgs = new ArrayList<>();
    records.forEach(record -> {
    logger.info("消费消息:{}",record.value().toString());
    });

    } catch (Exception e) {
    logger.error("Kafka监听异常"+e.getMessage(),e);
    } finally {
    ack.acknowledge();//手动提交偏移量
    }

    }

    }
     
    三、SparkStreaming集成
    1.引入库
    compile group: 'org.apache.spark', name: 'spark-sql_2.11', version: '2.1.1'
    compile group: 'org.apache.spark', name: 'spark-streaming_2.11', version: '2.1.1'
    compile group: 'org.apache.spark', name: 'spark-streaming-kafka-0-10_2.11', version: '2.1.1'
     
    2.代码
    public class StartKafka {
    public static void main(String[] args) throws InterruptedException {
    SparkConf sparkConf = new SparkConf()
    .setAppName("StartKafka")
    .setMaster("local[2]")
    .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
    //一个batch为10s内的数据
    JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, Durations.seconds(2));
    //从kafka中获取数据
    Map<String, Object> kafkaParams = new HashMap<>();
    kafkaParams.put("bootstrap.servers", "cdh1:9092,cdh2:9092,cdh3:9092");
    kafkaParams.put("key.deserializer", StringDeserializer.class);
    kafkaParams.put("value.deserializer", StringDeserializer.class);
    kafkaParams.put("group.id", "groupB");
    kafkaParams.put("auto.offset.reset", "latest");
    kafkaParams.put("enable.auto.commit", true);

    Collection<String> topics = Arrays.asList("KAFKA_TEST");

    final JavaInputDStream<ConsumerRecord<String, String>> stream =
    KafkaUtils.createDirectStream(
    jsc,
    LocationStrategies.PreferConsistent(),
    ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
    );
    //打印
    stream.print();
    Thread.sleep(5000);
    //开始执行
    jsc.start();
    //执行等待
    jsc.awaitTermination();
    //执行完毕后关闭
    jsc.close();

    }

    总结
    本章主要内容:java工具类、springboot集成、sparkstreaming集成使用~


    ————————————————
    版权声明:本文为CSDN博主「笑里笑外~」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
    原文链接:https://blog.csdn.net/qq_36488175/article/details/110234262

  • 相关阅读:
    Binary Search Tree Iterator 解答
    Invert Binary Tree 解答
    Min Stack 解答
    Trapping Raining Water 解答
    Candy 解答
    Jump Game II 解答
    Implement Hash Map Using Primitive Types
    Gas Station 解答
    Bucket Sort
    HashMap 专题
  • 原文地址:https://www.cnblogs.com/javalinux/p/15060479.html
Copyright © 2011-2022 走看看