zoukankan      html  css  js  c++  java
  • Kafka学习(三)——Java工具类、Springboot集成批量消费、SparkStreaming集成

    前言
    本次记录全部来自工作学习中,总结和测试。并非生产环境,仅供参考使用!

    提示:以下是本篇文章正文内容,下面案例可供参考

    一、Java中工具类
    常用于测试使用~

    1. 添加maven依赖
    <dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.12</artifactId>
    <version>1.0.0</version>
    <scope>provided</scope>
    </dependency>

    <dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>1.0.0</version>
    </dependency>

    <dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <version>1.0.0</version>
    </dependency>
     
    2.消费者:KafkaConsumerTest
    public class KafkaConsumerTest implements Runnable {

    private final KafkaConsumer<String, String> consumer;
    private ConsumerRecords<String, String> msgList;
    private final String topic;
    private static final String GROUPID = "groupA";

    public KafkaConsumerTest(String topicName) {
    Properties props = new Properties();
    props.put("bootstrap.servers", "cdh1:9092,cdh2:9092,cdh3:9092");
    /**
    * 组名 不同组名可以重复消费。
    */
    props.put("group.id", GROUPID);
    /**
    * 是否自动提交,默认为true。
    */
    props.put("enable.auto.commit", "true");
    /**
    * 从poll(拉)的回话处理时长。
    */
    props.put("auto.commit.interval.ms", "1000");
    /**
    * 超时时间。
    */
    props.put("session.timeout.ms", "30000");
    /**
    * 一次最大拉取的条数。
    */
    props.put("max.poll.records",5000);
    /**
    * 消费规则,默认earliest 。
    * earliest: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费 。
    * latest: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据 。
    * none: topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常。
    */
    props.put("auto.offset.reset", "latest");
    props.put("key.deserializer", StringDeserializer.class.getName());
    props.put("value.deserializer", StringDeserializer.class.getName());
    this.consumer = new KafkaConsumer<String, String>(props);
    this.topic = topicName;
    this.consumer.subscribe(Arrays.asList(topic));
    }

    @Override
    public void run() {
    int messageNo = 1;
    boolean stop = false;
    System.out.println("---------开始消费---------");
    try {
    PrintWriter printWriter = new PrintWriter(new FileWriter("data.txt"));
    while (Thread.currentThread().isInterrupted() && !stop) {
    msgList = consumer.poll(1000);
    if(null!=msgList&&msgList.count()>0){
    for (ConsumerRecord<String, String> record : msgList) {
    //消费100条就打印 ,但打印的数据不一定是这个规律的
    System.out.println(messageNo+"=======receive: key = " + record.key() + ", value = " + record.value()+" offset==="+record.offset());
    printWriter.println(record.value());
    messageNo++;
    }
    }else{
    Thread.sleep(1000);
    }
    }
    } catch (InterruptedException e) {
    e.printStackTrace();
    } catch (FileNotFoundException e) {
    e.printStackTrace();
    } catch (IOException e) {
    e.printStackTrace();
    } finally {
    consumer.close();
    }
    }
    public static void main(String args[]) {
    KafkaConsumerTest test1 = new KafkaConsumerTest("KAFKA_TEST");
    Thread thread1 = new Thread(test1);
    thread1.start();
    }
    }
     
    3.生产者:KafkaProducerTest
    public class KafkaProducerTest implements Runnable {

    private final KafkaProducer<String, String> producer;
    private final String topic;
    public KafkaProducerTest(String topicName) {
    Properties props = new Properties();
    props.put("bootstrap.servers", "cdh1:9092,cdh2:9092,cdh3:9092");
    /**
    * acks:消息的确认机制,默认值是0。
    * acks=0:如果设置为0,生产者不会等待kafka的响应。
    * acks=1:这个配置意味着kafka会把这条消息写到本地日志文件中,但是不会等待集群中其他机器的成功响应。
    * acks=all:这个配置意味着leader会等待所有的follower同步完成。这个确保消息不会丢失,除非kafka集群中所有机器挂掉。这是最强的可用性保证。
    */
    props.put("acks", "all");
    /**
    * 配置为大于0的值的话,客户端会在消息发送失败时重新发送。
    */
    props.put("retries", 0);
    /**
    * 当多条消息需要发送到同一个分区时,生产者会尝试合并网络请求。这会提高client和生产者的效率。
    */
    props.put("batch.size", 16384);
    /**
    * key值 序列化,指定序列化类
    */
    props.put("key.serializer", StringSerializer.class.getName());
    /**
    * value值 序列化,指定序列化类
    */
    props.put("value.serializer", StringSerializer.class.getName());
    this.producer = new KafkaProducer<String, String>(props);
    this.topic = topicName;
    }

    @Override
    public void run() {
    int messageNo = 1;

    boolean stop = false;
    try {
    while (!Thread.currentThread().isInterrupted() && !stop) {
    String messageStr="你好,这是第"+messageNo+"条数据";
    producer.send(new ProducerRecord<String, String>(topic, "Message", messageStr));
    //生产了100条就打印
    if(messageNo%100==0){
    System.out.println("发送的信息:" + messageStr);
    }
    //生产1000条就退出
    if(messageNo%10000==0){
    System.out.println("成功发送了"+messageNo+"条");
    break;
    }
    messageNo++;
    }
    } catch (Exception e) {
    e.printStackTrace();
    } finally {
    producer.close();
    }
    }

    public static void main(String args[]) {
    KafkaProducerTest test = new KafkaProducerTest("KAFKA_TEST");
    Thread thread = new Thread(test);
    thread.start();
    }
    }
     
    二、 SpringBoot中使用
    4.1 引入依赖
    compile group: 'org.springframework.kafka' ,name: 'spring-kafka'
    1
    4.2 application.yml
    #kafka配置信息
    kafka:

    bootstrap-servers: 192.168.44.136:9092,192.168.44.137:9092,192.168.44.138:9092
    producer:
    batch-size: 16785 #一次最多发送数据量
    retries: 1 #发送失败后的重复发送次数
    buffer-memory: 33554432 #32M批处理缓冲区
    linger: 1
    consumer:
    auto-offset-reset: latest #最早未被消费的offset earliest
    max-poll-records: 3100 #批量消费一次最大拉取的数据量
    enable-auto-commit: false #是否开启自动提交
    auto-commit-interval: 1000 #自动提交的间隔时间
    session-timeout: 20000 #连接超时时间
    max-poll-interval: 15000 #手动提交设置与poll的心跳数,如果消息队列中没有消息,等待毫秒后,调用poll()方法。如果队列中有消息,立即消费消息,每次消费的消息的多少可以通过max.poll.records配置。
    max-partition-fetch-bytes: 15728640 #设置拉取数据的大小,15M
    listener:
    batch-listener: true #是否开启批量消费,true表示批量消费
    concurrencys: 3,6 #设置消费的线程数
    poll-timeout: 1500 #只限自动提交,
    topics: kafkaTest
    group-id: test1-consumer-group
     
    4.3 KafkaConfiguration.java
    @Configuration
    public class KafkaConfiguration {

    @Value("${kafka.bootstrap-servers}")
    private String bootstrapServers;
    @Value("${kafka.producer.retries}")
    private Integer retries;
    @Value("${kafka.producer.batch-size}")
    private Integer batchSize;
    @Value("${kafka.producer.buffer-memory}")
    private Integer bufferMemory;
    @Value("${kafka.producer.linger}")
    private Integer linger;

    @Value("${kafka.consumer.enable-auto-commit}")
    private Boolean autoCommit;

    @Value("${kafka.consumer.auto-commit-interval}")
    private Integer autoCommitInterval;

    @Value("${kafka.consumer.max-poll-records}")
    private Integer maxPollRecords;

    @Value("${kafka.consumer.auto-offset-reset}")
    private String autoOffsetReset;

    @Value("#{'${kafka.listener.concurrencys}'.split(',')[0]}")
    private Integer concurrency3;

    @Value("#{'${kafka.listener.concurrencys}'.split(',')[1]}")
    private Integer concurrency6;

    @Value("${kafka.listener.poll-timeout}")
    private Long pollTimeout;

    @Value("${kafka.consumer.session-timeout}")
    private String sessionTimeout;

    @Value("${kafka.listener.batch-listener}")
    private Boolean batchListener;

    @Value("${kafka.consumer.max-poll-interval}")
    private Integer maxPollInterval;

    @Value("${kafka.consumer.max-partition-fetch-bytes}")
    private Integer maxPartitionFetchBytes;

    @Bean
    public KafkaTemplate<String, String> kafkaTemplateString() {
    return new KafkaTemplate<>(producerFactory());
    }

    @Bean
    public ProducerFactory<String, String> producerFactory() {

    Map<String, Object> props = new HashMap<>(7);
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ProducerConfig.RETRIES_CONFIG, retries);
    props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
    props.put(ProducerConfig.LINGER_MS_CONFIG, linger);
    props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    return new DefaultKafkaProducerFactory(props);
    }
    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
    return new KafkaTemplate<>(producerFactory());
    }


    private ConsumerFactory<String, String> consumerFactory() {
    Map<String, Object> props = new HashMap<>(10);
    props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);
    props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
    props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollInterval);
    props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, maxPartitionFetchBytes);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    return new DefaultKafkaConsumerFactory<>(props);
    }


    private ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    //批量消费
    factory.setBatchListener(batchListener);
    //如果消息队列中没有消息,等待timeout毫秒后,调用poll()方法。
    // 如果队列中有消息,立即消费消息,每次消费的消息的多少可以通过max.poll.records配置。
    //手动提交无需配置
    factory.getContainerProperties().setPollTimeout(pollTimeout);
    //设置提交偏移量的方式, MANUAL_IMMEDIATE 表示消费一条提交一次;MANUAL表示批量提交一次
    factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
    return factory;
    }
    /**
    * 并发数3
    *
    * @return
    */
    @Bean
    @ConditionalOnMissingBean(name = "kafkaBatchListener3")
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaBatchListener3() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = kafkaListenerContainerFactory();
    factory.setConcurrency(concurrency3);
    return factory;
    }
    /**
    * 并发数6
    *
    * @return
    */
    @Bean
    @ConditionalOnMissingBean(name = "kafkaBatchListener6")
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaBatchListener6() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = kafkaListenerContainerFactory();
    factory.setConcurrency(concurrency6);
    return factory;
    }
    }
     
    4.4 ProducerService.java
    @Service
    public final class ProducerService {
    private static final Logger logger = LoggerFactory.getLogger(ProducerService.class);

    @Value("${kafka.listener.topics}")
    private String topic;

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;


    public void sendMessage(String message) {
    logger.info(String.format("Producing message: %s", message));
    ListenableFuture<SendResult<String, String>> future = this.kafkaTemplate.send(topic, message);
    future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {

    @Override
    public void onSuccess(SendResult<String, String> result) {
    logger.info("Sent message=[ {} ] with offset=[ {} ]", message, result.getRecordMetadata().offset());
    }

    @Override
    public void onFailure(Throwable ex) {
    logger.info("Unable to send message=[ {} ] due to : {}", message, ex.getMessage());
    }
    });
    }
    }

     
    4.5 ConsumerService.java
    @Service
    public final class ConsumerService {
    private static final Logger logger = LoggerFactory.getLogger(ConsumerService.class);


    @KafkaListener(containerFactory = "kafkaBatchListener6",topics = {"#{'${kafka.listener.topics}'.split(',')[0]}"},groupId ="${kafka.listener.topics}" )
    public void batchListener(List<ConsumerRecord<?,?>> records,Acknowledgment ack){

    try {
    logger.info("本次总消息数量:{}",records.size());
    List<CusApntClmModel> listModel = new ArrayList<>();
    List<MsgEntity> listMsgs = new ArrayList<>();
    records.forEach(record -> {
    logger.info("消费消息:{}",record.value().toString());
    });

    } catch (Exception e) {
    logger.error("Kafka监听异常"+e.getMessage(),e);
    } finally {
    ack.acknowledge();//手动提交偏移量
    }

    }

    }
     
    三、SparkStreaming集成
    1.引入库
    compile group: 'org.apache.spark', name: 'spark-sql_2.11', version: '2.1.1'
    compile group: 'org.apache.spark', name: 'spark-streaming_2.11', version: '2.1.1'
    compile group: 'org.apache.spark', name: 'spark-streaming-kafka-0-10_2.11', version: '2.1.1'
     
    2.代码
    public class StartKafka {
    public static void main(String[] args) throws InterruptedException {
    SparkConf sparkConf = new SparkConf()
    .setAppName("StartKafka")
    .setMaster("local[2]")
    .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
    //一个batch为10s内的数据
    JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, Durations.seconds(2));
    //从kafka中获取数据
    Map<String, Object> kafkaParams = new HashMap<>();
    kafkaParams.put("bootstrap.servers", "cdh1:9092,cdh2:9092,cdh3:9092");
    kafkaParams.put("key.deserializer", StringDeserializer.class);
    kafkaParams.put("value.deserializer", StringDeserializer.class);
    kafkaParams.put("group.id", "groupB");
    kafkaParams.put("auto.offset.reset", "latest");
    kafkaParams.put("enable.auto.commit", true);

    Collection<String> topics = Arrays.asList("KAFKA_TEST");

    final JavaInputDStream<ConsumerRecord<String, String>> stream =
    KafkaUtils.createDirectStream(
    jsc,
    LocationStrategies.PreferConsistent(),
    ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
    );
    //打印
    stream.print();
    Thread.sleep(5000);
    //开始执行
    jsc.start();
    //执行等待
    jsc.awaitTermination();
    //执行完毕后关闭
    jsc.close();

    }

    总结
    本章主要内容:java工具类、springboot集成、sparkstreaming集成使用~


    ————————————————
    版权声明:本文为CSDN博主「笑里笑外~」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
    原文链接:https://blog.csdn.net/qq_36488175/article/details/110234262

  • 相关阅读:
    visual studio(vs)中项目解决方案的目录组织安排
    vs2017如果在调试状态下查看QString等qt变量的值
    终于成功编译和运行了glc_player和glc_lib
    Visual Studio(vs)内存泄漏Detected memory leaks的解决方案
    std::max、std::min error C2589: “(”:“::”右边的非法标记,error C2059:&nbs
    c++跨动态库DLL的内存分配与释放问题2
    CABasicAnimation精讲
    CAAnimation解读
    iOS CAShapeLayer精讲
    UIBezierPath精讲
  • 原文地址:https://www.cnblogs.com/javalinux/p/15060479.html
Copyright © 2011-2022 走看看