zoukankan      html  css  js  c++  java
  • Kafka使用代码设置offset值

      1 import kafka.common.*;
      2 import kafka.common.OffsetAndMetadata;
      3 import kafka.javaapi.*;
      4 import kafka.network.BlockingChannel;
      5 import org.apache.kafka.common.TopicPartition;
      6 import org.slf4j.Logger;
      7 import org.slf4j.LoggerFactory;
      8 
      9 import java.lang.Long;
     10 import java.util.*;
     11 import java.util.Map.Entry;
     12 
     13 public class KafkaUtilsV2
     14 {
     15     private static Logger LOG = LoggerFactory.getLogger(KafkaUtilsV2.class);
     16     private static final int correlationId = 2;
     17     private static final String clientId = "internalConsumer";
     18 
     19     public static BlockingChannel createBlockingChannel(String bootstrapServers1) {
     20         List<String> hosts = new ArrayList();
     21         int port = 9092;
     22         BlockingChannel channel = null;
     23         String bootstrapServers = bootstrapServers1.replaceAll(" ", "");
     24         if ("" != bootstrapServers) {
     25             String[] hostsAndPort = bootstrapServers.split(",");
     26             for (int i = 0; i < hostsAndPort.length; i++) {
     27                 String host = hostsAndPort[i].split(":")[0];
     28                 port = Integer.parseInt(hostsAndPort[i].split(":")[1]);
     29                 hosts.add(host);
     30             }
     31 
     32             String[] hostsArray = new String[hosts.size()];
     33             for (int k = 0; k < hosts.size(); k++) {
     34                 hostsArray[k] = hosts.get(k);
     35             }
     36 
     37             for (int j = 0; (j < hostsArray.length) && ((channel == null) || (!channel.isConnected())); j++)
     38                 try {
     39                     //LOG.info("###testbug001: try to create BlockingChannel in {} times", Integer.valueOf(j + 1));
     40                     channel = new BlockingChannel(hostsArray[j].trim(), port,
     41                             BlockingChannel.UseDefaultBufferSize(),
     42                             BlockingChannel.UseDefaultBufferSize(), 5000);
     43                     channel.connect();
     44                 } catch (Exception e) {
     45                     LOG.info("###>:channel connect but failed with the exception {}", e.getMessage());
     46                 }
     47         }
     48         else {
     49             LOG.info("###>: bootstrapServers is null, so can not create blockingChannel");
     50         }
     51         return channel;
     52     }
     53 
         //获得offset  54 public static Map<Integer, Long> getOffsetFromKafka(String bootstrapServers, String groupId, String topic, List<Integer> partitionsIds) { 55 Map<Integer, Long> offsetMap = new HashMap(); 56 BlockingChannel channel = createBlockingChannel(bootstrapServers); 57 if (channel.isConnected()) { 58 List partitions = new ArrayList(); 59 for (Integer i : partitionsIds) { 60 partitions.add(new TopicAndPartition(topic, i.intValue())); 61 } 62 OffsetFetchRequest fetchRequest = new OffsetFetchRequest(groupId, partitions, 63 (short) 1, correlationId, clientId); 64 try 65 { 66 channel.send(fetchRequest.underlying()); 67 OffsetFetchResponse fetchResponse = OffsetFetchResponse.readFrom(channel.receive().payload()); 68 Map<TopicAndPartition, OffsetMetadataAndError> result = fetchResponse.offsets(); 69 for(Entry<TopicAndPartition, OffsetMetadataAndError> entry : result.entrySet()) { 70 TopicAndPartition topicAndPartition = entry.getKey(); 71 OffsetMetadataAndError offsetMetadataAndError = entry.getValue(); 72 int partition = topicAndPartition.partition(); 73 long retriveOffset = offsetMetadataAndError.offset(); 74 offsetMap.put(partition, retriveOffset); 75 } 76 } catch (Exception e) { 77 LOG.warn("###>: send offsetFetchRequest with exception: {}", e.getMessage()); 78 e.printStackTrace(); 79 } finally { 80 channel.disconnect(); 81 } 82 } else { 83 LOG.info("###>: BlockingChannel is not connected!"); 84 } 85 return offsetMap; 86 } 87
    88 public static Map<TopicPartition, Long> getOffsetFromKafkaByTopicAndMetadata(String bootstrapServers, String groupId, 89 Set<TopicAndPartition> topicPartitions) { 90 Map<TopicPartition, Long> topicPartitionLongMap = new HashMap<>(); 91 BlockingChannel channel = createBlockingChannel(bootstrapServers); 92 if (channel.isConnected()) { 93 List partitions = new ArrayList(); 94 partitions.addAll(topicPartitions); 95 OffsetFetchRequest fetchRequest = new OffsetFetchRequest(groupId, partitions, 96 (short) 1, correlationId, clientId); 97 try 98 { 99 channel.send(fetchRequest.underlying()); 100 OffsetFetchResponse fetchResponse = OffsetFetchResponse.readFrom(channel.receive().payload()); 101 Map<TopicAndPartition, OffsetMetadataAndError> result = fetchResponse.offsets(); 102 for(Entry<TopicAndPartition, OffsetMetadataAndError> entry : result.entrySet()) { 103 TopicAndPartition topicAndPartition = entry.getKey(); 104 TopicPartition topicPartition = new TopicPartition(topicAndPartition.topic(), topicAndPartition.partition()); 105 OffsetMetadataAndError offsetMetadataAndError = entry.getValue(); 106 long retriveOffset = offsetMetadataAndError.offset(); 107 topicPartitionLongMap.put(topicPartition, retriveOffset); 108 } 109 } catch (Exception e) { 110 LOG.warn("###>: send offsetFetchRequest with exception: {}", e.getMessage()); 111 e.printStackTrace(); 112 } finally { 113 channel.disconnect(); 114 } 115 } else { 116 LOG.info("###>: BlockingChannel is not connected!"); 117 } 118 return topicPartitionLongMap; 119 } 120 121 public static Map<Integer, Long> getOffsetFromKafkaByPartitionNum(String bootstrapServers, String groupId, 122 String topic, int partitionsNum) { 123 Map<Integer, Long> offsetMap = new HashMap(); 124 BlockingChannel channel = createBlockingChannel(bootstrapServers); 125 if (channel.isConnected()) { 126 List partitions = new ArrayList(); 127 for (int i = 0; i < partitionsNum; i++) { 128 partitions.add(new TopicAndPartition(topic, i)); 129 } 130 OffsetFetchRequest fetchRequest = new OffsetFetchRequest(groupId, partitions, 131 (short) 1, correlationId, clientId); 132 try 133 { 134 channel.send(fetchRequest.underlying()); 135 OffsetFetchResponse fetchResponse = OffsetFetchResponse.readFrom(channel.receive().payload()); 136 Map<TopicAndPartition, OffsetMetadataAndError> result = fetchResponse.offsets(); 137 for(Entry<TopicAndPartition, OffsetMetadataAndError> entry : result.entrySet()) { 138 TopicAndPartition topicAndPartition = entry.getKey(); 139 OffsetMetadataAndError offsetMetadataAndError = entry.getValue(); 140 int partition = topicAndPartition.partition(); 141 long retriveOffset = offsetMetadataAndError.offset(); 142 offsetMap.put(partition, retriveOffset); 143 } 144 } catch (Exception e) { 145 LOG.warn("###>: send offsetFetchRequest with exception: {}", e.getMessage()); 146 e.printStackTrace(); 147 } finally { 148 channel.disconnect(); 149 } 150 } else { 151 LOG.info("###>: BlockingChannel is not connected!"); 152 } 153 return offsetMap; 154 } 155 156 public static void commitOffsetToKafka(String bootstrapServers, String groupId, Map<TopicAndPartition, OffsetAndMetadata> offsets) { 157 BlockingChannel channel = createBlockingChannel(bootstrapServers); 158 if (channel.isConnected()) { 159 OffsetCommitRequest commitRequest = new OffsetCommitRequest(groupId, offsets, correlationId, clientId, (short) 1); 160 try { 161 LOG.debug("###testbug: begin to send OffsetCommitRequest"); 162 channel.send(commitRequest.underlying()); 163 OffsetCommitResponse commitResponse = OffsetCommitResponse.readFrom(channel.receive().payload()); 164 if (commitResponse.hasError()) { 165 Map<TopicAndPartition, Object> result = commitResponse.errors(); 166 for (Entry<TopicAndPartition, Object> entry : result.entrySet()) { 167 if(entry.getValue() == ErrorMapping.OffsetMetadataTooLargeCode()) { 168 169 }else if (entry.getValue() == ErrorMapping.NotCoordinatorForConsumerCode() || 170 entry.getValue() == ErrorMapping.ConsumerCoordinatorNotAvailableCode()) { 171 172 // Go to step 1 (offset manager has moved) and then retry the commit to the new offset manager 173 } else { 174 // log and retry the commit 175 } 176 } 177 } 178 channel.disconnect(); 179 } 180 catch (Exception e) 181 { 182 LOG.info("###>: commit offset request failed with exception {}", e.getMessage()); 183 } 184 } else { 185 LOG.info("###>: BlockingChannel is not connected!"); 186 } 187 } 188 189 public static Map<TopicAndPartition, OffsetAndMetadata> convertToCommon(Map<TopicPartition, org.apache.kafka.clients.consumer.OffsetAndMetadata> offsets) { 190 Map<TopicAndPartition, OffsetAndMetadata> convertedOffsets = new HashMap<>(); 191 for(Map.Entry<TopicPartition, org.apache.kafka.clients.consumer.OffsetAndMetadata> offset : offsets.entrySet()) { 192 TopicAndPartition topicAndPartition = new TopicAndPartition(offset.getKey().topic(), offset.getKey().partition()); 193 OffsetMetadata offsetMetadata = new OffsetMetadata(offset.getValue().offset(), Integer.toString(offset.getKey().partition())); 194 OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(offsetMetadata, System.currentTimeMillis(), System.currentTimeMillis() + 6*3600*3600); 195 convertedOffsets.put(topicAndPartition, offsetAndMetadata); 196 } 197 return convertedOffsets; 198 } 199 200 201 public static void main(String[] args) 202 { 203 Map<TopicAndPartition, OffsetAndMetadata> offset = new HashMap<>(); 204 for (int i = 0; i < 1; i++) { 205 TopicAndPartition topicAndPartition = new TopicAndPartition("fieldcompact02", i); 206 Long offset1 = 80L; 207 String metadata = Integer.toString(i); 208 OffsetMetadata offsetMetadata = new OffsetMetadata(offset1, metadata); 209 OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(offsetMetadata, System.currentTimeMillis(), System.currentTimeMillis() + 2*60*60*1000); 210 211 offset.put(topicAndPartition, offsetAndMetadata); 212 } 213 commitOffsetToKafka("hdh153:9092", "connectors-lsh-008", offset); 214 215 /*Map<Integer, Long> test = KafkaUtilsV2.getOffsetFromKafkaByPartitionNum("lark001:9092", "hik_mac_info", "hik_mac_info", "test", 10); 216 for(Entry<Integer, Long> entry : test.entrySet()) { 217 Integer key = entry.getKey(); 218 Long value = entry.getValue(); 219 LOG.info("###testbug: key = {},and value = {}", key, value); 220 }*/ 221 222 Map<TopicAndPartition, org.apache.kafka.clients.consumer.OffsetAndMetadata> topicPartitions = new HashMap<>(); 223 TopicAndPartition topicPartition = new TopicAndPartition("fieldcompact02", 0); 224 Set<TopicAndPartition> topicAndPartitionSet = new HashSet<>(); 225 topicAndPartitionSet.add(topicPartition); 226 org.apache.kafka.clients.consumer.OffsetAndMetadata offsetAndMetadata = new org.apache.kafka.clients.consumer.OffsetAndMetadata(0); 227 topicPartitions.put(topicPartition, offsetAndMetadata); 228 //Map<Integer, Long> offsets = KafkaUtilsV2.getOffsetFromKafkaByPartitionNum("hdh153:9092", "connectors-lsh-008", "fieldcompact02", 1); 229 Map<TopicPartition, Long> offsets = KafkaUtilsV2.getOffsetFromKafkaByTopicAndMetadata("hdh153:9092", "connectors-lsh-018", topicAndPartitionSet); 230 for(Map.Entry<TopicPartition, Long> offset : offsets.entrySet()) { 231 //System.out.println("###test: topic = " + offset.getKey().topic() + ";partition = " + offset.getKey().partition() + "; offset = " + offset.getValue()); 232 LOG.info("###> partition = {}, offset = {}", offset.getKey().partition(), offset.getValue()); 233 } 234 } 235 }
  • 相关阅读:
    HDU Railroad (记忆化)
    HDU 1227 Fast Food
    HDU 3008 Warcraft
    asp vbscript 检测客户端浏览器和操作系统(也可以易于升级到ASP.NET)
    Csharp 讀取大文本文件數據到DataTable中,大批量插入到數據庫中
    csharp 在万年历中计算显示农历日子出错
    csharp create ICS file extension
    CSS DIV Shadow
    DataTable search keyword
    User select fontface/color/size/backgroundColor设置 字体,颜色,大小,背景色兼容主流浏览器
  • 原文地址:https://www.cnblogs.com/jinniezheng/p/6379639.html
Copyright © 2011-2022 走看看