zoukankan      html  css  js  c++  java
  • Kafka使用代码设置offset值

      1 import kafka.common.*;
      2 import kafka.common.OffsetAndMetadata;
      3 import kafka.javaapi.*;
      4 import kafka.network.BlockingChannel;
      5 import org.apache.kafka.common.TopicPartition;
      6 import org.slf4j.Logger;
      7 import org.slf4j.LoggerFactory;
      9 import java.lang.Long;
     10 import java.util.*;
     11 import java.util.Map.Entry;
     13 public class KafkaUtilsV2
     14 {
     15     private static Logger LOG = LoggerFactory.getLogger(KafkaUtilsV2.class);
     16     private static final int correlationId = 2;
     17     private static final String clientId = "internalConsumer";
     19     public static BlockingChannel createBlockingChannel(String bootstrapServers1) {
     20         List<String> hosts = new ArrayList();
     21         int port = 9092;
     22         BlockingChannel channel = null;
     23         String bootstrapServers = bootstrapServers1.replaceAll(" ", "");
     24         if ("" != bootstrapServers) {
     25             String[] hostsAndPort = bootstrapServers.split(",");
     26             for (int i = 0; i < hostsAndPort.length; i++) {
     27                 String host = hostsAndPort[i].split(":")[0];
     28                 port = Integer.parseInt(hostsAndPort[i].split(":")[1]);
     29                 hosts.add(host);
     30             }
     32             String[] hostsArray = new String[hosts.size()];
     33             for (int k = 0; k < hosts.size(); k++) {
     34                 hostsArray[k] = hosts.get(k);
     35             }
     37             for (int j = 0; (j < hostsArray.length) && ((channel == null) || (!channel.isConnected())); j++)
     38                 try {
     39                     //LOG.info("###testbug001: try to create BlockingChannel in {} times", Integer.valueOf(j + 1));
     40                     channel = new BlockingChannel(hostsArray[j].trim(), port,
     41                             BlockingChannel.UseDefaultBufferSize(),
     42                             BlockingChannel.UseDefaultBufferSize(), 5000);
     43                     channel.connect();
     44                 } catch (Exception e) {
     45                     LOG.info("###>:channel connect but failed with the exception {}", e.getMessage());
     46                 }
     47         }
     48         else {
     49             LOG.info("###>: bootstrapServers is null, so can not create blockingChannel");
     50         }
     51         return channel;
     52     }
         //获得offset  54 public static Map<Integer, Long> getOffsetFromKafka(String bootstrapServers, String groupId, String topic, List<Integer> partitionsIds) { 55 Map<Integer, Long> offsetMap = new HashMap(); 56 BlockingChannel channel = createBlockingChannel(bootstrapServers); 57 if (channel.isConnected()) { 58 List partitions = new ArrayList(); 59 for (Integer i : partitionsIds) { 60 partitions.add(new TopicAndPartition(topic, i.intValue())); 61 } 62 OffsetFetchRequest fetchRequest = new OffsetFetchRequest(groupId, partitions, 63 (short) 1, correlationId, clientId); 64 try 65 { 66 channel.send(fetchRequest.underlying()); 67 OffsetFetchResponse fetchResponse = OffsetFetchResponse.readFrom(channel.receive().payload()); 68 Map<TopicAndPartition, OffsetMetadataAndError> result = fetchResponse.offsets(); 69 for(Entry<TopicAndPartition, OffsetMetadataAndError> entry : result.entrySet()) { 70 TopicAndPartition topicAndPartition = entry.getKey(); 71 OffsetMetadataAndError offsetMetadataAndError = entry.getValue(); 72 int partition = topicAndPartition.partition(); 73 long retriveOffset = offsetMetadataAndError.offset(); 74 offsetMap.put(partition, retriveOffset); 75 } 76 } catch (Exception e) { 77 LOG.warn("###>: send offsetFetchRequest with exception: {}", e.getMessage()); 78 e.printStackTrace(); 79 } finally { 80 channel.disconnect(); 81 } 82 } else { 83 LOG.info("###>: BlockingChannel is not connected!"); 84 } 85 return offsetMap; 86 } 87
    88 public static Map<TopicPartition, Long> getOffsetFromKafkaByTopicAndMetadata(String bootstrapServers, String groupId, 89 Set<TopicAndPartition> topicPartitions) { 90 Map<TopicPartition, Long> topicPartitionLongMap = new HashMap<>(); 91 BlockingChannel channel = createBlockingChannel(bootstrapServers); 92 if (channel.isConnected()) { 93 List partitions = new ArrayList(); 94 partitions.addAll(topicPartitions); 95 OffsetFetchRequest fetchRequest = new OffsetFetchRequest(groupId, partitions, 96 (short) 1, correlationId, clientId); 97 try 98 { 99 channel.send(fetchRequest.underlying()); 100 OffsetFetchResponse fetchResponse = OffsetFetchResponse.readFrom(channel.receive().payload()); 101 Map<TopicAndPartition, OffsetMetadataAndError> result = fetchResponse.offsets(); 102 for(Entry<TopicAndPartition, OffsetMetadataAndError> entry : result.entrySet()) { 103 TopicAndPartition topicAndPartition = entry.getKey(); 104 TopicPartition topicPartition = new TopicPartition(topicAndPartition.topic(), topicAndPartition.partition()); 105 OffsetMetadataAndError offsetMetadataAndError = entry.getValue(); 106 long retriveOffset = offsetMetadataAndError.offset(); 107 topicPartitionLongMap.put(topicPartition, retriveOffset); 108 } 109 } catch (Exception e) { 110 LOG.warn("###>: send offsetFetchRequest with exception: {}", e.getMessage()); 111 e.printStackTrace(); 112 } finally { 113 channel.disconnect(); 114 } 115 } else { 116 LOG.info("###>: BlockingChannel is not connected!"); 117 } 118 return topicPartitionLongMap; 119 } 120 121 public static Map<Integer, Long> getOffsetFromKafkaByPartitionNum(String bootstrapServers, String groupId, 122 String topic, int partitionsNum) { 123 Map<Integer, Long> offsetMap = new HashMap(); 124 BlockingChannel channel = createBlockingChannel(bootstrapServers); 125 if (channel.isConnected()) { 126 List partitions = new ArrayList(); 127 for (int i = 0; i < partitionsNum; i++) { 128 partitions.add(new TopicAndPartition(topic, i)); 129 } 130 OffsetFetchRequest fetchRequest = new OffsetFetchRequest(groupId, partitions, 131 (short) 1, correlationId, clientId); 132 try 133 { 134 channel.send(fetchRequest.underlying()); 135 OffsetFetchResponse fetchResponse = OffsetFetchResponse.readFrom(channel.receive().payload()); 136 Map<TopicAndPartition, OffsetMetadataAndError> result = fetchResponse.offsets(); 137 for(Entry<TopicAndPartition, OffsetMetadataAndError> entry : result.entrySet()) { 138 TopicAndPartition topicAndPartition = entry.getKey(); 139 OffsetMetadataAndError offsetMetadataAndError = entry.getValue(); 140 int partition = topicAndPartition.partition(); 141 long retriveOffset = offsetMetadataAndError.offset(); 142 offsetMap.put(partition, retriveOffset); 143 } 144 } catch (Exception e) { 145 LOG.warn("###>: send offsetFetchRequest with exception: {}", e.getMessage()); 146 e.printStackTrace(); 147 } finally { 148 channel.disconnect(); 149 } 150 } else { 151 LOG.info("###>: BlockingChannel is not connected!"); 152 } 153 return offsetMap; 154 } 155 156 public static void commitOffsetToKafka(String bootstrapServers, String groupId, Map<TopicAndPartition, OffsetAndMetadata> offsets) { 157 BlockingChannel channel = createBlockingChannel(bootstrapServers); 158 if (channel.isConnected()) { 159 OffsetCommitRequest commitRequest = new OffsetCommitRequest(groupId, offsets, correlationId, clientId, (short) 1); 160 try { 161 LOG.debug("###testbug: begin to send OffsetCommitRequest"); 162 channel.send(commitRequest.underlying()); 163 OffsetCommitResponse commitResponse = OffsetCommitResponse.readFrom(channel.receive().payload()); 164 if (commitResponse.hasError()) { 165 Map<TopicAndPartition, Object> result = commitResponse.errors(); 166 for (Entry<TopicAndPartition, Object> entry : result.entrySet()) { 167 if(entry.getValue() == ErrorMapping.OffsetMetadataTooLargeCode()) { 168 169 }else if (entry.getValue() == ErrorMapping.NotCoordinatorForConsumerCode() || 170 entry.getValue() == ErrorMapping.ConsumerCoordinatorNotAvailableCode()) { 171 172 // Go to step 1 (offset manager has moved) and then retry the commit to the new offset manager 173 } else { 174 // log and retry the commit 175 } 176 } 177 } 178 channel.disconnect(); 179 } 180 catch (Exception e) 181 { 182 LOG.info("###>: commit offset request failed with exception {}", e.getMessage()); 183 } 184 } else { 185 LOG.info("###>: BlockingChannel is not connected!"); 186 } 187 } 188 189 public static Map<TopicAndPartition, OffsetAndMetadata> convertToCommon(Map<TopicPartition, org.apache.kafka.clients.consumer.OffsetAndMetadata> offsets) { 190 Map<TopicAndPartition, OffsetAndMetadata> convertedOffsets = new HashMap<>(); 191 for(Map.Entry<TopicPartition, org.apache.kafka.clients.consumer.OffsetAndMetadata> offset : offsets.entrySet()) { 192 TopicAndPartition topicAndPartition = new TopicAndPartition(offset.getKey().topic(), offset.getKey().partition()); 193 OffsetMetadata offsetMetadata = new OffsetMetadata(offset.getValue().offset(), Integer.toString(offset.getKey().partition())); 194 OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(offsetMetadata, System.currentTimeMillis(), System.currentTimeMillis() + 6*3600*3600); 195 convertedOffsets.put(topicAndPartition, offsetAndMetadata); 196 } 197 return convertedOffsets; 198 } 199 200 201 public static void main(String[] args) 202 { 203 Map<TopicAndPartition, OffsetAndMetadata> offset = new HashMap<>(); 204 for (int i = 0; i < 1; i++) { 205 TopicAndPartition topicAndPartition = new TopicAndPartition("fieldcompact02", i); 206 Long offset1 = 80L; 207 String metadata = Integer.toString(i); 208 OffsetMetadata offsetMetadata = new OffsetMetadata(offset1, metadata); 209 OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(offsetMetadata, System.currentTimeMillis(), System.currentTimeMillis() + 2*60*60*1000); 210 211 offset.put(topicAndPartition, offsetAndMetadata); 212 } 213 commitOffsetToKafka("hdh153:9092", "connectors-lsh-008", offset); 214 215 /*Map<Integer, Long> test = KafkaUtilsV2.getOffsetFromKafkaByPartitionNum("lark001:9092", "hik_mac_info", "hik_mac_info", "test", 10); 216 for(Entry<Integer, Long> entry : test.entrySet()) { 217 Integer key = entry.getKey(); 218 Long value = entry.getValue(); 219 LOG.info("###testbug: key = {},and value = {}", key, value); 220 }*/ 221 222 Map<TopicAndPartition, org.apache.kafka.clients.consumer.OffsetAndMetadata> topicPartitions = new HashMap<>(); 223 TopicAndPartition topicPartition = new TopicAndPartition("fieldcompact02", 0); 224 Set<TopicAndPartition> topicAndPartitionSet = new HashSet<>(); 225 topicAndPartitionSet.add(topicPartition); 226 org.apache.kafka.clients.consumer.OffsetAndMetadata offsetAndMetadata = new org.apache.kafka.clients.consumer.OffsetAndMetadata(0); 227 topicPartitions.put(topicPartition, offsetAndMetadata); 228 //Map<Integer, Long> offsets = KafkaUtilsV2.getOffsetFromKafkaByPartitionNum("hdh153:9092", "connectors-lsh-008", "fieldcompact02", 1); 229 Map<TopicPartition, Long> offsets = KafkaUtilsV2.getOffsetFromKafkaByTopicAndMetadata("hdh153:9092", "connectors-lsh-018", topicAndPartitionSet); 230 for(Map.Entry<TopicPartition, Long> offset : offsets.entrySet()) { 231 //System.out.println("###test: topic = " + offset.getKey().topic() + ";partition = " + offset.getKey().partition() + "; offset = " + offset.getValue()); 232 LOG.info("###> partition = {}, offset = {}", offset.getKey().partition(), offset.getValue()); 233 } 234 } 235 }
  • 相关阅读:
    一文搞定 Spring Boot & Shiro 实战
    CPU 到底是怎么认识代码的?涨姿势了!
    Java 可重入锁内存可见性分析
    大牛总结的 Git 使用技巧,写得太好了!
    深入浅出 Java 中 JVM 内存管理
    Optional导致的 java.util.NoSuchElementException: No value present
  • 原文地址:https://www.cnblogs.com/jinniezheng/p/6379639.html
Copyright © 2011-2022 走看看