zoukankan      html  css  js  c++  java
  • kafka的客户端操作,admin与producer

     

    1.大纲

    `  可以构建kafka的java客户段

      了解kafka客户端类型

      掌握kafka客户端的基本操作

    二:客户端类型

    1.五类客户端类型

      

    2.kafka客户端API类型

      AdminClient:允许管理核检测Topic,broker,以及其他的kafka对象

      Producer:发布消息到topic

      Consumer:订阅消息,并处理消息

      Stream:高效的将输入流转换为输出流

      Connectors:从一些源系统或者应用程序中拉取数据到kafka

    三:AdminClient API

    1.程序

    package com.jun.kafka.admin;
    
    import com.fasterxml.jackson.core.JsonProcessingException;
    import com.jun.kafka.common.utils.JsonUtils;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.kafka.clients.admin.*;
    import org.apache.kafka.common.config.ConfigResource;
    
    import java.util.*;
    import java.util.concurrent.ExecutionException;
    
    /**
     * admincliet的演示
     */
    @Slf4j
    public class AdminSample {
        private static final String TOPIC_NAME = "jun-topic";
    
        public static void main(String[] args) {
    //        alterConfig();
            describeTopic();
        }
    
        /**
         * 设置client
         */
        public static AdminClient adminClient(){
            Properties properties = new Properties();
            // kafka服务器
            properties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.19.129:9092");
            return AdminClient.create(properties);
        }
    
        /**
         * 创建topic
         */
        public static void createTopic(){
            AdminClient adminClient = adminClient();
            // 副本因子
            short replicationFactor = 1;
            NewTopic newTopic = new NewTopic(TOPIC_NAME, 1, replicationFactor);
            CreateTopicsResult topics = adminClient.createTopics(Arrays.asList(newTopic));
            log.info("{}", JsonUtils.toJsonString(topics));
        }
    
        /**
         * 查询所有的topic
         */
        public static void topicLists(){
            AdminClient adminClient = adminClient();
            ListTopicsResult listTopicsResult = adminClient.listTopics();
            try {
                Set<String> names = listTopicsResult.names().get();
                names.stream().forEach(System.out::println);
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        }
    
    
        /**
         * 查询所有的topic,可以查看内部topic
         */
        public static void topicOptionLists(){
            AdminClient adminClient = adminClient();
            ListTopicsOptions options = new ListTopicsOptions();
            options.listInternal(true);
            ListTopicsResult listTopicsResult = adminClient.listTopics(options);
            try {
                Set<String> names = listTopicsResult.names().get();
                names.stream().forEach(System.out::println);
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        }
    
        /**
         * 删除topic
         */
        public static void deleteTopic(){
            AdminClient adminClient = adminClient();
            DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(Arrays.asList(TOPIC_NAME));
        }
    
        /**
         * 查询topic的描述
         */
        public static void describeTopic(){
            String NEW_TOPIC_NAME = "caojun-topic";
            AdminClient adminClient = adminClient();
            DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Arrays.asList(NEW_TOPIC_NAME));
            try {
                Map<String, TopicDescription> stringTopicDescriptionMap = describeTopicsResult.all().get();
                Set<Map.Entry<String, TopicDescription>> entries = stringTopicDescriptionMap.entrySet();
                entries.stream().forEach(entry->{
                    log.info("key={}", entry.getKey());
                    log.info("value={}", entry.getValue());
                });
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        }
    
        /**
         * topic的配置信息
         */
        public static void describeConfig(){
            String NEW_TOPIC_NAME = "caojun-topic";
            AdminClient adminClient = adminClient();
            ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, NEW_TOPIC_NAME);
            DescribeConfigsResult describeConfigsResult = adminClient.describeConfigs(Arrays.asList(configResource));
            try {
                Map<ConfigResource, Config> configResourceConfigMap = describeConfigsResult.all().get();
                configResourceConfigMap.entrySet().stream().forEach(configResourceConfigEntry -> {
                    log.info("key={}", configResourceConfigEntry.getKey());
                    log.info("value={}", configResourceConfigEntry.getValue());
                });
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        }
    
        /**
         * topic的配置信息修改
         */
        public static void alterConfig(){
            String NEW_TOPIC_NAME = "caojun-topic";
            AdminClient adminClient = adminClient();
            Map<ConfigResource, Config> configMap= new HashMap<>();
            ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, NEW_TOPIC_NAME);
            Config config = new Config(Arrays.asList(new ConfigEntry("preallocate", "true")));
            configMap.put(configResource, config);
            AlterConfigsResult alterConfigsResult = adminClient.alterConfigs(configMap);
        }
    
        /**
         * 增加partitions
         */
        public static void incrPartitions(){
            String NEW_TOPIC_NAME = "caojun-topic";
            AdminClient adminClient = adminClient();
            Map<String, NewPartitions> partitionsMap = new HashMap<>();
            NewPartitions newPartitions = NewPartitions.increaseTo(2);
            partitionsMap.put(NEW_TOPIC_NAME, newPartitions);
            CreatePartitionsResult partitions = adminClient.createPartitions(partitionsMap);
            try {
                Void aVoid = partitions.all().get();
    
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        }
    
    
    
    }
    

      

    四:ProducerClient API

     1.发送模式

      同步发送

      异步发送

      异步发送回调发送

    2.异步发送

        /**
         * 异步发送
         */
        public static void producerSend(){
            Properties properties = new Properties();
            properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.19.129:9092");
            properties.put(ProducerConfig.ACKS_CONFIG, "all");
            properties.put(ProducerConfig.RETRIES_CONFIG, "0");
            properties.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384");
            properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432");
    
            properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
            properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    
            // Producer的主对象
            KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
            // 消息对象
    
            ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, "key", "value");
    
            producer.send(record);
            producer.close();
        }
    

      效果:‘

      

    3.同步发送

        /**
         * 同步发送,异步阻塞的方式
         */
        public static void producerSyncSend(){
            Properties properties = new Properties();
            properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.19.129:9092");
            properties.put(ProducerConfig.ACKS_CONFIG, "all");
            properties.put(ProducerConfig.RETRIES_CONFIG, "0");
            properties.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384");
            properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432");
    
            properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
            properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    
            // Producer的主对象
            KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
            // 消息对象
    
            ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, "key-sync", "value-sync");
    
            Future<RecordMetadata> send = producer.send(record);
            RecordMetadata recordMetadata = null;
            try {
                recordMetadata = send.get();
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
            log.info("partition:{}, offset:{}", recordMetadata.partition(), recordMetadata.offset());
            producer.close();
        }
    

      

    4.异步回掉

     /**
         * 异步回调
         */
        public static void producerSendWithCallback(){
            Properties properties = new Properties();
            properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.19.129:9092");
            properties.put(ProducerConfig.ACKS_CONFIG, "all");
            properties.put(ProducerConfig.RETRIES_CONFIG, "0");
            properties.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384");
            properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432");
    
            properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
            properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    
            // Producer的主对象
            KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
            // 消息对象
    
            ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, "key-callback", "value-callback");
    
            producer.send(record, new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    log.info("回调了");
                    log.info("partition:{}, offset:{}", recordMetadata.partition(), recordMetadata.offset());
                }
            });
            producer.close();
        }
    

      效果:

      

    5.原理分析

      KafkaProducer的构造器:

    KafkaProducer(Map<String, Object> configs, Serializer<K> keySerializer, Serializer<V> valueSerializer, ProducerMetadata metadata, KafkaClient kafkaClient, ProducerInterceptors interceptors, Time time) {
            ProducerConfig config = new ProducerConfig(ProducerConfig.addSerializerToConfig(configs, keySerializer, valueSerializer));
    
            try {
                Map<String, Object> userProvidedConfigs = config.originals();
                this.producerConfig = config;
                this.time = time;
                String transactionalId = userProvidedConfigs.containsKey("transactional.id") ? (String)userProvidedConfigs.get("transactional.id") : null;
                this.clientId = buildClientId(config.getString("client.id"), transactionalId);
                LogContext logContext;
                if (transactionalId == null) {
                    logContext = new LogContext(String.format("[Producer clientId=%s] ", this.clientId));
                } else {
                    logContext = new LogContext(String.format("[Producer clientId=%s, transactionalId=%s] ", this.clientId, transactionalId));
                }
    
                this.log = logContext.logger(KafkaProducer.class);
                this.log.trace("Starting the Kafka producer");
                Map<String, String> metricTags = Collections.singletonMap("client-id", this.clientId);
           // kafka的监控
                MetricConfig metricConfig = (new MetricConfig()).samples(config.getInt("metrics.num.samples")).timeWindow(config.getLong("metrics.sample.window.ms"), TimeUnit.MILLISECONDS).recordLevel(RecordingLevel.forName(config.getString("metrics.recording.level"))).tags(metricTags);
                List<MetricsReporter> reporters = config.getConfiguredInstances("metric.reporters", MetricsReporter.class, Collections.singletonMap("client.id", this.clientId));
                reporters.add(new JmxReporter("kafka.producer"));
                this.metrics = new Metrics(metricConfig, reporters, time);
           // 加载分区器
                this.partitioner = (Partitioner)config.getConfiguredInstance("partitioner.class", Partitioner.class);
                long retryBackoffMs = config.getLong("retry.backoff.ms");
           // 初始化系列化
                if (keySerializer == null) {
                      this.keySerializer = (Serializer)config.getConfiguredInstance("key.serializer", Serializer.class);
                      this.keySerializer.configure(config.originals(), true);
                   } else {
                      config.ignore("key.serializer");
                      this.keySerializer = keySerializer;
                   }
    
                if (valueSerializer == null) {
                    this.valueSerializer = (Serializer)config.getConfiguredInstance("value.serializer", Serializer.class);
                    this.valueSerializer.configure(config.originals(), false);
                } else {
                    config.ignore("value.serializer");
                    this.valueSerializer = valueSerializer;
                }
    
                userProvidedConfigs.put("client.id", this.clientId);
                ProducerConfig configWithClientId = new ProducerConfig(userProvidedConfigs, false);
                List<ProducerInterceptor<K, V>> interceptorList = configWithClientId.getConfiguredInstances("interceptor.classes", ProducerInterceptor.class);
                if (interceptors != null) {
                    this.interceptors = interceptors;
                } else {
                    this.interceptors = new ProducerInterceptors(interceptorList);
                }
    
                ClusterResourceListeners clusterResourceListeners = this.configureClusterResourceListeners(keySerializer, valueSerializer, interceptorList, reporters);
                this.maxRequestSize = config.getInt("max.request.size");
                this.totalMemorySize = config.getLong("buffer.memory");
                this.compressionType = CompressionType.forName(config.getString("compression.type"));
                this.maxBlockTimeMs = config.getLong("max.block.ms");
                this.transactionManager = configureTransactionState(config, logContext, this.log);
                int deliveryTimeoutMs = configureDeliveryTimeout(config, this.log);
                this.apiVersions = new ApiVersions();
           // 初始化计数器
                this.accumulator = new RecordAccumulator(logContext, config.getInt("batch.size"), this.compressionType, lingerMs(config), retryBackoffMs, deliveryTimeoutMs, this.metrics, "producer-metrics", time, this.apiVersions, this.transactionManager, new BufferPool(this.totalMemorySize, config.getInt("batch.size"), this.metrics, time, "producer-metrics"));
                List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList("bootstrap.servers"), config.getString("client.dns.lookup"));
                if (metadata != null) {
                    this.metadata = metadata;
                } else {
                    this.metadata = new ProducerMetadata(retryBackoffMs, config.getLong("metadata.max.age.ms"), logContext, clusterResourceListeners, Time.SYSTEM);
                    this.metadata.bootstrap(addresses, time.milliseconds());
                }
    
                this.errors = this.metrics.sensor("errors");
           // 守护线程
                this.sender = this.newSender(logContext, kafkaClient, this.metadata);
                String ioThreadName = "kafka-producer-network-thread | " + this.clientId;
                this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
           // 启动守护线程
                this.ioThread.start();
                config.logUnused();
                AppInfoParser.registerAppInfo("kafka.producer", this.clientId, this.metrics, time.milliseconds());
                this.log.debug("Kafka producer started");
            } catch (Throwable var23) {
                this.close(Duration.ofMillis(0L), true);
                throw new KafkaException("Failed to construct kafka producer", var23);
            }
        }
    

      

      sender的原理:

        public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
            ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
            return this.doSend(interceptedRecord, callback);
        }
    

      

    private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
            TopicPartition tp = null;
    
            try {
                this.throwIfProducerClosed();
    
                KafkaProducer.ClusterAndWaitTime clusterAndWaitTime;
                try {
                    clusterAndWaitTime = this.waitOnMetadata(record.topic(), record.partition(), this.maxBlockTimeMs);
                } catch (KafkaException var20) {
                    if (this.metadata.isClosed()) {
                        throw new KafkaException("Producer closed while send in progress", var20);
                    }
    
                    throw var20;
                }
    
                long remainingWaitMs = Math.max(0L, this.maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
                Cluster cluster = clusterAndWaitTime.cluster;
    
                byte[] serializedKey;
                try {
                    serializedKey = this.keySerializer.serialize(record.topic(), record.headers(), record.key());
                } catch (ClassCastException var19) {
                    throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() + " to class " + this.producerConfig.getClass("key.serializer").getName() + " specified in key.serializer", var19);
                }
    
                byte[] serializedValue;
                try {
                    serializedValue = this.valueSerializer.serialize(record.topic(), record.headers(), record.value());
                } catch (ClassCastException var18) {
                    throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() + " to class " + this.producerConfig.getClass("value.serializer").getName() + " specified in value.serializer", var18);
                }
           // 计算分区,消息具体进入哪个partition
                int partition = this.partition(record, serializedKey, serializedValue, cluster);
                tp = new TopicPartition(record.topic(), partition);
                this.setReadOnly(record.headers());
                Header[] headers = record.headers().toArray();
                int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(this.apiVersions.maxUsableProduceMagic(), this.compressionType, serializedKey, serializedValue, headers);
                this.ensureValidRecordSize(serializedSize);
                long timestamp = record.timestamp() == null ? this.time.milliseconds() : record.timestamp();
                if (this.log.isTraceEnabled()) {
                    this.log.trace("Attempting to append record {} with callback {} to topic {} partition {}", new Object[]{record, callback, record.topic(), partition});
                }
           // 组织calllback对象
                Callback interceptCallback = new KafkaProducer.InterceptorCallback(callback, this.interceptors, tp);
                if (this.transactionManager != null && this.transactionManager.isTransactional()) {
                    this.transactionManager.failIfNotReadyForSend();
                }
           // 计算批次
                RecordAppendResult result = this.accumulator.append(tp, timestamp, serializedKey, serializedValue, headers, interceptCallback, remainingWaitMs, true);
           // 超出阈值 if (result.abortForNewBatch) { int prevPartition = partition; this.partitioner.onNewBatch(record.topic(), cluster, partition); partition = this.partition(record, serializedKey, serializedValue, cluster); tp = new TopicPartition(record.topic(), partition); if (this.log.isTraceEnabled()) { this.log.trace("Retrying append due to new batch creation for topic {} partition {}. The old partition was {}", new Object[]{record.topic(), partition, prevPartition}); } interceptCallback = new KafkaProducer.InterceptorCallback(callback, this.interceptors, tp); result = this.accumulator.append(tp, timestamp, serializedKey, serializedValue, headers, interceptCallback, remainingWaitMs, false); } if (this.transactionManager != null && this.transactionManager.isTransactional()) { this.transactionManager.maybeAddPartitionToTransaction(tp); } if (result.batchIsFull || result.newBatchCreated) { this.log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
             // 发送靠的是守护进程 this.sender.wakeup(); } return result.future; } catch (ApiException var21) { this.log.debug("Exception occurred during message send:", var21); if (callback != null) { callback.onCompletion((RecordMetadata)null, var21); } this.errors.record(); this.interceptors.onSendError(record, tp, var21); return new KafkaProducer.FutureFailure(var21); } catch (InterruptedException var22) { this.errors.record(); this.interceptors.onSendError(record, tp, var22); throw new InterruptException(var22); } catch (BufferExhaustedException var23) { this.errors.record(); this.metrics.sensor("buffer-exhausted-records").record(); this.interceptors.onSendError(record, tp, var23); throw var23; } catch (KafkaException var24) { this.errors.record(); this.interceptors.onSendError(record, tp, var24); throw var24; } catch (Exception var25) { this.interceptors.onSendError(record, tp, var25); throw var25; } }

      图形示意图:

      

      

    6.自定义负载均衡器

    package com.jun.kafka.producer;
    
    import org.apache.kafka.clients.producer.Partitioner;
    import org.apache.kafka.common.Cluster;
    
    import java.util.Map;
    
    /**
     * 自定义分区器
     */
    public class SamplePartition implements Partitioner {
        @Override
        public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
            /*
                key-1
                key-2
                key-3
             */
            String keyStr = key + "";
            String keyInt = keyStr.substring(4);
            System.out.println("keyStr : " + keyStr + "keyInt : " + keyInt);
    
            int i = Integer.parseInt(keyInt);
    
            return i % 2;
        }
    
        @Override
        public void close() {
    
        }
    
        @Override
        public void configure(Map<String, ?> configs) {
    
        }
    
    }
    

      使用:

        /**
         * 有自定义分区器的发送
         */
        public static void producerSendWithPartition(){
            Properties properties = new Properties();
            properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.19.129:9092");
            properties.put(ProducerConfig.ACKS_CONFIG, "all");
            properties.put(ProducerConfig.RETRIES_CONFIG, "0");
            properties.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384");
            properties.put(ProducerConfig.LINGER_MS_CONFIG, "1");
            properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432");
            // 分区器
            properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.jun.kafka.producer.SamplePartition");
    
            properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
            properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    
            // Producer的主对象
            KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
            // 消息对象
    
            ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, "key-2", "value");
    
            producer.send(record);
            producer.close();
        }
    

      

    7.消费传递保障

      kafka提供了三种传递保障:

        最多一次

        至少一次

        正好一次

      传递依赖于Producer与Consumer的共同实现

      传递保障主要依赖于Producer

      依赖于配置项:

    properties.put(ProducerConfig.ACKS_CONFIG, "all");
    

      

  • 相关阅读:
    oracle字符集查看修改
    oracle查看所有表及字段
    oracle重新启动步骤
    oracle job 定时执行 存储过程
    oracle导入导出exp,imp
    oracle创建表空间
    Oracle Dataguard HA (主备,灾备)方案部署调试
    Moving Tables-贪心
    Windows下Android开发环境配置
    在单进程单线程或单进程多线程下实现log4cplus写日志并按大小切割
  • 原文地址:https://www.cnblogs.com/juncaoit/p/13407207.html
Copyright © 2011-2022 走看看