zoukankan      html  css  js  c++  java
  • Java kafka 设置从指定时间戳开始消费

    包含的功能:

    1.Java kafka 设置从指定时间戳开始消费

    2.JdbcTemplate操作MySql

    3.Java多线程消费kafka

    4.Java获取kafka所有topic

    pom.xml文件,引入kafka、mysql、jdbc:

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>2.1.6.RELEASE</version>
            <relativePath/> <!-- lookup parent from repository -->
        </parent>
    
        <groupId>com.suncreate</groupId>
        <artifactId>kafka-collector</artifactId>
        <version>1.0</version>
        <name>kafka-collector</name>
        <description>华为HD平台升级测试-kafka测试数据源采集</description>
    
        <properties>
            <java.version>1.8</java.version>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
    
            <!-- jdbc -->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-jdbc</artifactId>
            </dependency>
    
            <!-- mysql -->
            <dependency>
                <groupId>mysql</groupId>
                <artifactId>mysql-connector-java</artifactId>
                <version>5.1.25</version>
            </dependency>
    
            <!-- kafka -->
            <dependency>
                <groupId>org.springframework.kafka</groupId>
                <artifactId>spring-kafka</artifactId>
            </dependency>
    
        </dependencies>
    
        <repositories>
            <repository>
                <id>alimaven</id>
                <name>aliyun maven</name>
                <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
                <snapshots>
                    <enabled>false</enabled>
                </snapshots>
            </repository>
        </repositories>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                </plugin>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <configuration>
                        <source>1.8</source>
                        <target>1.8</target>
                    </configuration>
                </plugin>
            </plugins>
        </build>
    
    </project>
    View Code

    application.properties文件,kafka集群和mysql配置:

    #kafka集群IP:Port端口
    kafka.consumer.servers=x.x.x.115:21005,x.x.x.109:21005,x.x.x.116:21005,x.x.x.111:21005,x.x.x.110:21005,x.x.x.112:21005,x.x.x.113:21005,x.x.x.114:21005,x.x.x.117:21005,x.x.x.118:21005,x.x.x.119:21005,x.x.x.120:21005,x.x.x.121:21005
    #消费者组
    kafka.consumer.group.id=kafka-collector
    kafka.consumer.auto.offset.reset=latest
    #mysql
    spring.datasource.mysql.driver-class-name=com.mysql.jdbc.Driver
    spring.datasource.mysql.jdbcurl=jdbc:mysql://localhost:3306/kafka-data?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull&useSSL=false&allowPublicKeyRetrieval=true
    spring.datasource.mysql.username=root
    spring.datasource.mysql.password=123456
    View Code

    MySqlConfig:

    package com.suncreat.kafkacollector.config;
    
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.boot.context.properties.ConfigurationProperties;
    import org.springframework.boot.jdbc.DataSourceBuilder;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.jdbc.core.JdbcTemplate;
    
    import javax.sql.DataSource;
    
    @Configuration
    public class MySqlConfig {
    
        @Bean(name = "MySqlDataSource")
        @Qualifier("MySqlDataSource")
        @ConfigurationProperties(prefix = "spring.datasource.mysql")
        public DataSource MySqlDataSource() {
            return DataSourceBuilder.create().build();
        }
    
        @Bean(name = "mysqlJdbcTemplate")
        public JdbcTemplate mysqlJdbcTemplate(@Qualifier("MySqlDataSource") DataSource dataSource) {
            return new JdbcTemplate(dataSource);
        }
    
    }
    View Code

    SaveOffsetOnRebalanced,kafka设置从指定时间戳开始消费:

    package com.suncreat.kafkacollector.kafka;
    
    import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
    import org.apache.kafka.common.TopicPartition;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.util.*;
    
    /**
     * kafka设置从指定时间戳开始消费
     */
    public class SaveOffsetOnRebalanced implements ConsumerRebalanceListener {
        private static final Logger log = LoggerFactory.getLogger(SaveOffsetOnRebalanced.class);
    
        private KafkaConsumer<String, String> consumer;
        private String topic;
        private long fallbackMilliseconds;
    
        public SaveOffsetOnRebalanced(KafkaConsumer<String, String> consumer, String topic, long fallbackMilliseconds) {
            this.consumer = consumer;
            this.topic = topic;
            this.fallbackMilliseconds = fallbackMilliseconds;
        }
    
        @Override
        public void onPartitionsRevoked(Collection<TopicPartition> collection) {
    
        }
    
        @Override
        public void onPartitionsAssigned(Collection<TopicPartition> collection) {
            try {
                long startTime = System.currentTimeMillis() - fallbackMilliseconds;
    
                Map<TopicPartition, Long> partitionLongMap = new HashMap<>();
                for (TopicPartition topicPartition : collection) {
                    partitionLongMap.put(new TopicPartition(topic, topicPartition.partition()), startTime);
                }
    
                Map<TopicPartition, OffsetAndTimestamp> map = consumer.offsetsForTimes(partitionLongMap);
                for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : map.entrySet()) {
                    TopicPartition partition = entry.getKey();
                    OffsetAndTimestamp value = entry.getValue();
                    long offset = value.offset();
                    consumer.seek(partition, offset);
                }
            } catch (Exception e) {
                log.error("kafka设置从指定时间戳开始消费 SaveOffsetOnRebalanced 出错", e);
            }
        }
    }
    View Code

    SetOffset,kafka设置从指定时间戳开始消费:

    package com.suncreat.kafkacollector.kafka;
    
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.text.SimpleDateFormat;
    import java.time.Duration;
    import java.util.Collections;
    import java.util.Date;
    
    /**
     * kafka设置从指定时间戳开始消费
     */
    public class SetOffset {
        private static final Logger log = LoggerFactory.getLogger(SetOffset.class);
    
        private KafkaConsumer<String, String> consumer;
        private String topic;
        private String startTime;
    
        public SetOffset(KafkaConsumer<String, String> consumer, String topic, String startTime) {
            this.consumer = consumer;
            this.topic = topic;
            this.startTime = startTime;
        }
    
        public void submitOffset() {
            try {
                SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyyMMddHHmmss");
                Date date = simpleDateFormat.parse(startTime + "000000");
                long fallbackMilliseconds = System.currentTimeMillis() - date.getTime();
    
                SaveOffsetOnRebalanced saveOffsetOnRebalanced = new SaveOffsetOnRebalanced(consumer, topic, fallbackMilliseconds);
                consumer.subscribe(Collections.singleton(topic), saveOffsetOnRebalanced);
                consumer.poll(Duration.ofMillis(0));
            } catch (Exception e) {
                log.error("kafka设置从指定时间戳开始消费 SetOffset 出错", e);
            }
        }
    }
    View Code

    ConsumerThread,kafka消费线程,消费kafka消息数据,并写入mysql:

    package com.suncreat.kafkacollector.kafka;
    
    import com.suncreat.kafkacollector.utils.CalcSpeed;
    import org.apache.kafka.clients.consumer.*;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.jdbc.core.JdbcTemplate;
    
    import java.sql.PreparedStatement;
    import java.time.Duration;
    import java.util.Map;
    import java.util.concurrent.Semaphore;
    
    /**
     * kafka消费线程
     */
    public class ConsumerThread extends Thread {
        private static final Logger log = LoggerFactory.getLogger(ConsumerThread.class);
    
        private KafkaConsumer<String, String> consumer;
        private String topic;
        private JdbcTemplate jdbcTemplate;
    
        public ConsumerThread(KafkaConsumer<String, String> consumer, String topic, JdbcTemplate jdbcTemplate) {
            this.consumer = consumer;
            this.topic = topic;
            this.jdbcTemplate = jdbcTemplate;
        }
    
        public void run() {
            try {
                while (true) {
                    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    
                    if (records.count() > 0) {
                        for (ConsumerRecord<String, String> record : records) {
                            int pos = record.value().indexOf("pass_time");
                            String passTime = record.value().substring(pos + 12, pos + 26);
    
                            insertMySql(topic, record.value());
    
                            if (CalcSpeed.getCount() % 10000 == 0) {
                                System.out.println(passTime);
                            }
    
                            CalcSpeed.addCount();
                        }
    
                        consumer.commitAsync();
                    }
    
                    Thread.sleep(1);
                }
            } catch (Exception e) {
                log.error("ConsumerThread 出错", e);
            }
        }
    
        private void insertMySql(String topic, String value) {
            try {
                jdbcTemplate.update("insert into KafkaData(topic, content) values(?, ?)", topic, value);
            } catch (Exception e) {
                log.error("insertMySql 出错", e);
            }
        }
    }
    View Code

    KafkaCollectService,kafka消费服务,多线程消费kafka消息数据:

    package com.suncreat.kafkacollector.service;
    
    import com.suncreat.kafkacollector.kafka.ConsumerThread;
    import com.suncreat.kafkacollector.kafka.SetOffset;
    import org.apache.kafka.clients.admin.KafkaAdminClient;
    import org.apache.kafka.clients.admin.ListTopicsResult;
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.KafkaFuture;
    import org.apache.kafka.common.serialization.StringDeserializer;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.boot.context.properties.ConfigurationProperties;
    import org.springframework.jdbc.core.JdbcTemplate;
    import org.springframework.stereotype.Service;
    
    import javax.annotation.PostConstruct;
    import java.util.*;
    import java.util.concurrent.ExecutionException;
    
    @Service
    public class KafkaCollectService {
        private static final Logger log = LoggerFactory.getLogger(KafkaCollectService.class);
    
        @Value("${kafka.consumer.servers}")
        private String servers;
    
        @Value("${kafka.consumer.group.id}")
        private String groupId;
    
        @Value("${kafka.consumer.auto.offset.reset}")
        private String autoOffsetReset;
    
        @Autowired
        @Qualifier("mysqlJdbcTemplate")
        private JdbcTemplate mysqlJdbcTemplate;
    
        @PostConstruct
        public void start() {
            try {
                //List<Map<String, Object>> maps = mysqlJdbcTemplate.queryForList("select * from KafkaData");
    
                List<String> topics = listTopics();
                topics.sort(Comparator.naturalOrder());
                topics.forEach(topic -> {
                    System.out.println(topic);
                });
    
                subscribe("ST-FEATURE-LOG"); //人 topic
                subscribe("motorVehicle"); //车 topic
                subscribe("wifiData"); //WIFI topic
                subscribe("KK_PASS_INFO_TYCC"); //卡口 topic
    
            } catch (Exception e) {
                log.error("出错", e);
            }
        }
    
        public List<String> listTopics() throws ExecutionException, InterruptedException {
            Properties properties = new Properties();
            properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
            ListTopicsResult result = KafkaAdminClient.create(properties).listTopics();
            KafkaFuture<Set<String>> set = result.names();
    
            List<String> topics = new ArrayList<>();
            for (String topic : set.get()) {
                topics.add(topic);
            }
    
            return topics;
        }
    
        public void subscribe(String topic) {
            for (int i = 0; i < 2; i++) {
                Properties properties = new Properties();
                properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
                properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
                properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
                properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
                properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
                properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    
                KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
                SetOffset setOffset = new SetOffset(consumer, topic, "20210331");
                setOffset.submitOffset();
    
                ConsumerThread consumerThread = new ConsumerThread(consumer, topic, mysqlJdbcTemplate);
                consumerThread.start();
            }
        }
    
    }
    View Code

    CalcSpeedConfig,计算数据处理速度:

    package com.suncreat.kafkacollector.task;
    
    import com.suncreat.kafkacollector.utils.CalcSpeed;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.scheduling.annotation.EnableScheduling;
    import org.springframework.scheduling.annotation.SchedulingConfigurer;
    import org.springframework.scheduling.config.ScheduledTaskRegistrar;
    import org.springframework.scheduling.support.CronTrigger;
    
    import java.text.SimpleDateFormat;
    import java.util.Date;
    
    /**
     * 计算数据处理速度
     */
    @Configuration
    @EnableScheduling
    public class CalcSpeedConfig implements SchedulingConfigurer {
    
        private final int timeRange = 2;
    
        private String calcQuarter = "0/" + timeRange + " * * * * ?";
    
        private final SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    
        @Override
        public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
            taskRegistrar.addTriggerTask(() -> {
    
                long currentTime = System.currentTimeMillis();
                double speed = CalcSpeed.getCount() / (double) timeRange;
                System.out.println(simpleDateFormat.format(new Date(currentTime)) + " 处理速度:" + (int) speed + " 条/秒");
    
                CalcSpeed.resetCount();
    
            }, triggerContext -> new CronTrigger(calcQuarter).nextExecutionTime(triggerContext));
        }
    }
    View Code

    CalcSpeed,用于记录数据处理速度:

    package com.suncreat.kafkacollector.utils;
    
    /**
     * 数据处理速度
     */
    public class CalcSpeed {
        private static int count = 0;
    
        public static synchronized void addCount() {
            count++;
        }
    
        public static synchronized void resetCount() {
            count = 0;
        }
    
        public static synchronized int getCount() {
            return count;
        }
    }
    View Code
  • 相关阅读:
    CSS3中新增的对文本和字体的设置
    PAT1107:Social Clusters
    Git的一些操作
    PAT1029:Median
    PAT1024:Palindromic Number
    PAT1028:List Sorting
    PAT1035: Password
    PAT1133:Splitting A Linked List
    PAT1017:Queueing at Bank
    PAT1105:Spiral Matrix
  • 原文地址:https://www.cnblogs.com/s0611163/p/14610942.html
Copyright © 2011-2022 走看看