zoukankan      html  css  js  c++  java
  • 简单封装kafka相关的api




    import kafka.admin.AdminUtils;
    import kafka.admin.TopicCommand;
    import kafka.api.TopicMetadata;
    import kafka.tools.ConsumerOffsetChecker;
    import kafka.utils.ZKStringSerializer$;
    import kafka.utils.ZkUtils;
    import org.I0Itec.zkclient.ZkClient;
    import org.apache.commons.exec.CommandLine;
    import org.apache.commons.exec.DefaultExecutor;
    import org.apache.commons.exec.ExecuteWatchdog;
    import org.apache.commons.io.FileUtils;
    import org.apache.commons.lang3.StringUtils;
    import org.apache.tools.ant.taskdefs.Execute;
    public static class KafkaUtils {
        private static Logger LOGGER = LoggerFactory.getLogger(KafkaUtils.class);
        private static AutoZkClient zkClient;
        public static AutoZkClient getZkClient() {
            return zkClient;
        public static void setZkClient(AutoZkClient zkClient) {
            KafkaUtils.zkClient = zkClient;
        public static boolean topicExists(String topic) {
            Assert.notNull(zkClient, "zkclient is null");
            return AdminUtils.topicExists(zkClient, topic);
        public static void topicChangeConfig(String topic, Properties properties) {
            Assert.notNull(zkClient, "zkclient is null");
            AdminUtils.changeTopicConfig(zkClient, topic, properties);
        public static void topicAlterPartitions(String topic, int partitions) {
            Assert.notNull(zkClient, "zkclient is null");
            TopicMetadata topicMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkClient);
            int curPartitions = topicMetadata.partitionsMetadata().size();
            if (curPartitions == partitions) {
            if (curPartitions > partitions) {
                LOGGER.info(String.format("curPartitions=%d,不能修改partitions=%d,请确保大与当前分区数", curPartitions, partitions));
            String[] args = {
                    "--zookeeper", zkClient.zkServers,
                    "--partitions", String.valueOf(partitions),
                    "--topic", topic
            TopicCommand.TopicCommandOptions alterOpt = new TopicCommand.TopicCommandOptions(args);
            TopicCommand.alterTopic(zkClient, alterOpt);
        public static void topicDescribe(String topic) {
            Assert.notNull(zkClient, "zkclient is null");
            String[] args = {
                    "--zookeeper", zkClient.zkServers,
                    "--topic", topic
            TopicCommand.TopicCommandOptions describeOpt = new TopicCommand.TopicCommandOptions(args);
            TopicCommand.describeTopic(zkClient, describeOpt);
        public static void topicOverrideConfig(String topic, Properties properties) {
            Assert.notNull(zkClient, "zkclient is null");
            Properties oldProperties = KafkaUtils.topicConfig(topic);
            AdminUtils.changeTopicConfig(zkClient, topic, oldProperties);
        public static void topicCreate(TopicConfig topicConfig) {
            Assert.notNull(zkClient, "zkclient is null");
            int brokerSize = ZkUtils.getSortedBrokerList(zkClient).size();
            if (topicConfig.getReplicationFactor() > brokerSize) {
                LOGGER.info(String.format("broker-size=%d < replication-factor=%d, 所以设置replication-factor大小为broker-size大小"
                        , brokerSize, topicConfig.getReplicationFactor()));
                    , topicConfig.getName()
                    , topicConfig.getPartitions()
                    , topicConfig.getReplicationFactor()
                    , topicConfig.getProperties());
        public static void topicDelete(String topic) {
            Assert.notNull(zkClient, "zkclient is null");
            AdminUtils.deleteTopic(zkClient, topic);
        public static List<String> topicsList() {
            Assert.notNull(zkClient, "zkclient is null");
            return seqAsJavaList(ZkUtils.getAllTopics(zkClient));
        public static Properties topicConfig(String topic) {
            Assert.notNull(zkClient, "zkclient is null");
            return AdminUtils.fetchTopicConfig(zkClient, topic);
        public static Map<String, Properties> topicsConfig() {
            Assert.notNull(zkClient, "zkclient is null");
            return mapAsJavaMap(AdminUtils.fetchAllTopicConfigs(zkClient));
        public static void consumerDetail(String topic, String group){
            String[] args = {
                    "--zookeeper", zkClient.getZkServers(),
                    "--group", group,
                    "--topic", topic
        public static Map<String, List<String>> getConsumersPerTopic(String group) {
            return mapAsJavaMap(ZkUtils.getConsumersPerTopic(zkClient, group, false)).entrySet()
                    .stream().collect(Collectors.toMap(entry -> entry.getKey(), entry -> JavaConversions.seqAsJavaList(entry.getValue())
                            .stream().map(consumerThreadId -> consumerThreadId.consumer())
        public static List<String> getConsumersInGroup(String group){
            return JavaConversions.seqAsJavaList(ZkUtils.getConsumersInGroup(zkClient, group));
        public static String executeCommond(String commond) {
            LOGGER.info("begin to execute commond: " + commond);
            File tmpFileDir = Files.createTempDir();
            String tmpFileName = UUID.randomUUID().toString() + ".txt";
            String fileSavePath = tmpFileDir.getAbsolutePath() + tmpFileName;
            CommandLine oCmdLine = CommandLine.parse(commond + " > " + fileSavePath);
            DefaultExecutor executor = new DefaultExecutor();
            ExecuteWatchdog watchdog = new ExecuteWatchdog(20000);
            int[] exitValues = {0, 1};
            try {
                if (Execute.isFailure(executor.execute(oCmdLine))) {
                    LOGGER.error("远程命令执行失败... commond=" + commond);
                } else {
                    try (Stream<String> lines = java.nio.file.Files.lines(new File(fileSavePath).toPath(), Charset.defaultCharset())) {
                        List<String> fileLines = lines.collect(toCollection(LinkedList::new));
                        StringBuilder result = new StringBuilder();
                        fileLines.forEach(line -> result.append(line).append(System.lineSeparator()));
                        return result.toString();
                    } finally {
            } catch (Exception e) {
                LOGGER.error("execute command error happened... commond=" + commond, e);
            return StringUtils.EMPTY;


    import com.alibaba.fastjson.JSON;
    import com.cmos.common.annotation.CompatibleOutput;
    import com.cmos.core.logger.Logger;
    import com.cmos.core.logger.LoggerFactory;
    import com.cmos.wmhopenapi.web.config.KafkaMessageConfig;
    import org.apache.commons.lang3.StringUtils;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.http.MediaType;
    import org.springframework.web.bind.annotation.*;
    import scala.Console;
    import java.io.ByteArrayOutputStream;
    import java.io.PrintStream;
    import java.util.List;
    import static com.cmos.wmhopenapi.web.config.KafkaMessageConfig.KafkaUtils;
     * @author hujunzheng
     * @create 2018-07-16 10:20
    public class MessageCenterStateController {
        private static Logger LOGGER = LoggerFactory.getLogger(MessageCenterStateController.class);
        private KafkaMessageConfig.NoAckConsumer noAckConsumer;
        private KafkaMessageConfig.AckConsumer ackConsumer;
         * 获取topic描述
         * @param topic
        public String topicDescribe(@RequestParam String topic) {
            try (ByteArrayOutputStream bos = new ByteArrayOutputStream()) {
                //scala 输出流重定向
                Console.setOut(new PrintStream(bos));
                String result = bos.toString();
                return String.format("%s%s%s", "<pre>", result, "</pre>");
            } catch (Exception e) {
                LOGGER.error("获取topic描述异常", e);
            return StringUtils.EMPTY;
         * 获取全部topic
        @GetMapping(value = "/topics-all", produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
        public String topicAll() {
            String result = JSON.toJSONString(KafkaUtils.topicsList());
            return result;
         * 获取topic配置
         * @param topic
        @GetMapping(value = "/topic-config", produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
        public String topicConfig(@RequestParam String topic) {
            String result = JSON.toJSONString(KafkaUtils.topicConfig(topic));
            return result;
         * 获取所有topic的配置
        @GetMapping(value = "/topics-configs", produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
        public String topicsConfigs() {
            String result = JSON.toJSONString(KafkaUtils.topicsConfig());
            return result;
         * 展示在某个分组中的consumer
         * @param group
        @GetMapping(value = "/consumers-in-group", produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
        public String consumersInGroup(@RequestParam String group) {
            String result = JSON.toJSONString(KafkaUtils.getConsumersInGroup(group));
            return result;
         * 展示在某个分组中的consumer,按照topic划分
         * @param group
        @GetMapping(value = "/consumers-per-topic", produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
        public String consumersPerTopic(@RequestParam String group) {
            String result = JSON.toJSONString(KafkaUtils.getConsumersPerTopic(group));
            return result;
         * 展示消费者消费详情
         * @param topic
         * @param group
        public String consumerDetail(@RequestParam String topic, @RequestParam String group) {
            try (ByteArrayOutputStream bos = new ByteArrayOutputStream()) {
                //scala 输出流重定向
                Console.setOut(new PrintStream(bos));
                KafkaUtils.consumerDetail(topic, group);
                String result = bos.toString();
                return String.format("%s%s%s", "<pre>", result, "</pre>");
            } catch (Exception e) {
                LOGGER.error("获取消费详情", e);
            return StringUtils.EMPTY;
         * 消费消息并展示消息
         * @param topic
         * @param group
         * @param size 消费消息数量
         * @param ack 消费的消息是否需要进行ack操作
        public String consumerConsume(@RequestParam String topic
                , @RequestParam(required = false, defaultValue = "default") String group
                , @RequestParam(required = false, defaultValue = "1") int size
                , @RequestParam(required = false, defaultValue = "false") boolean ack) {
            List<String> messages;
            if (ack) {
                messages = ackConsumer.consume(topic, group, size);
            } else {
                messages = noAckConsumer.consume(topic, group, size);
            return JSON.toJSONString(messages);
         * 运行kafka相关命令
         * @param sshRemote 连接远程主机命令(ssh user@host)
         * @param sshCommond kafka相关命令 (kafka-consumer.sh ...)
        public String commondExecute(
                @RequestParam(required = false) String sshRemote
                , @RequestParam String sshCommond
        ) {
            String commond = sshCommond + StringUtils.EMPTY;
            if (StringUtils.isNotBlank(sshRemote)) {
                commond = String.format("%s "%s"", sshRemote, commond);
            String result = KafkaUtils.executeCommond(commond);
            return String.format("%s%s%s", "<pre>", result, "</pre>");



    package com.mochasoft.latte.data.kafka.consumer;
    import java.util.Properties;
    import kafka.consumer.ConsumerConfig;
    import org.apache.commons.lang3.StringUtils;
    public class KafkaConsumerConfig
      private String zkConnect;
      private String zkSessionTimeoutMs;
      private String zkSyncTimeMs;
      private String autoCommitIntervalMs;
      private String groupId = "default";
      static enum OffSet
        smallest,  largest;
        private OffSet() {}
      private OffSet offset = OffSet.largest;
      private Properties properties;
      public KafkaConsumerConfig()
        this.properties = new Properties();
      public KafkaConsumerConfig(String zkConnect, String zkSessionTimeoutMs, String zkSyncTimeMs, String autoCommitIntervalMs)
        this.zkConnect = zkConnect;
        this.zkSessionTimeoutMs = zkSessionTimeoutMs;
        this.zkSyncTimeMs = zkSyncTimeMs;
        this.autoCommitIntervalMs = autoCommitIntervalMs;
        this.properties = new Properties();
      public String getZkConnect()
        return this.zkConnect;
      public void setZkConnect(String zkConnect)
        this.zkConnect = zkConnect;
      public String getZkSessionTimeoutMs()
        return this.zkSessionTimeoutMs;
      public void setZkSessionTimeoutMs(String zkSessionTimeoutMs)
        this.zkSessionTimeoutMs = zkSessionTimeoutMs;
      public String getZkSyncTimeMs()
        return this.zkSyncTimeMs;
      public void setZkSyncTimeMs(String zkSyncTimeMs)
        this.zkSyncTimeMs = zkSyncTimeMs;
      public String getAutoCommitIntervalMs()
        return this.autoCommitIntervalMs;
      public void setAutoCommitIntervalMs(String autoCommitIntervalMs)
        this.autoCommitIntervalMs = autoCommitIntervalMs;
      public String getGroupId()
        return this.groupId;
      public void setGroupId(String groupId)
        if (StringUtils.isNotBlank(groupId)) {
          this.groupId = groupId;
      public OffSet getOffset()
        return this.offset;
      public void setOffset(OffSet offset)
        this.offset = offset;
      public ConsumerConfig getConsumerConfig()
        return new ConsumerConfig(getProperties());
      public Properties getProperties()
        if (StringUtils.isBlank(this.zkConnect)) {
          throw new IllegalArgumentException("Blank zkConnect");
        if (StringUtils.isNotBlank(this.zkSessionTimeoutMs)) {
          this.properties.put("zookeeper.session.timeout.ms", this.zkSessionTimeoutMs);
        if (StringUtils.isNotBlank(this.zkSyncTimeMs)) {
          this.properties.put("zookeeper.sync.time.ms", this.zkSyncTimeMs);
        if (StringUtils.isNotBlank(this.autoCommitIntervalMs)) {
          this.properties.put("auto.commit.interval.ms", this.autoCommitIntervalMs);
        if (StringUtils.isNotBlank(this.offset.name())) {
          this.properties.put("auto.offset.reset", this.offset.name());
        this.properties.put("group.id", getGroupId());
        this.properties.put("zookeeper.connect", this.zkConnect);
        return this.properties;

    static final class NoAckConsumer extends TheConsumer { public NoAckConsumer(KafkaConsumerConfig kafkaConsumerConfig) { super(kafkaConsumerConfig, false); this.consumerConfigProperties.setProperty("auto.commit.enable", "false"); } } public static final class AckConsumer extends TheConsumer { public AckConsumer(KafkaConsumerConfig kafkaConsumerConfig) { super(kafkaConsumerConfig, true); this.consumerConfigProperties.setProperty("auto.commit.enable", "true"); } } public static class TheConsumer { protected Properties consumerConfigProperties; private boolean ack; private StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties()); private StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties()); public TheConsumer(KafkaConsumerConfig kafkaConsumerConfig, boolean ack) { this.ack = ack; this.consumerConfigProperties = new Properties(); this.consumerConfigProperties.putAll(kafkaConsumerConfig.getProperties()); } /** * @param topic 主题 * @param group 分组 * @param size 消费数量 **/ public List<String> consume(String topic, String group, int size) { if (StringUtils.isNotBlank(group)) { this.consumerConfigProperties.setProperty("group.id", group); } ConsumerConnector consumerConnector = null; try { consumerConnector = Consumer.createJavaConsumerConnector(new ConsumerConfig(this.consumerConfigProperties)); Map<String, Integer> topics = new HashMap<>(1); topics.put(topic, 1); Map<String, List<KafkaStream<String, String>>> streams = consumerConnector.createMessageStreams(topics, keyDecoder, valueDecoder); if (!(CollectionUtils.isEmpty(streams) || CollectionUtils.isEmpty(streams.get(topic)))) { List<String> messages = new ArrayList<>(); KafkaStream<String, String> messageStream = streams.get(topic).get(0); for (ConsumerIterator<String, String> it = messageStream.iterator(); it.hasNext(); ) { MessageAndMetadata<String, String> messageAndMetadata = it.next(); messages.add(messageAndMetadata.message()); if (this.ack) { consumerConnector.commitOffsets(); } if (size <= messages.size()) { break; } } return messages; } } catch (Exception e) { LOGGER.error(String.format("%s ack consume has errors. topic=%s, group=%s, size=%d.", this.ack ? "" : "no", topic, group, size), e); } finally { if (consumerConnector != null) { consumerConnector.shutdown(); } } return Collections.EMPTY_LIST; } }


    public class KafkaTest extends BaseUnitTest {
        private static Logger LOGGER = LoggerFactory.getLogger(KafkaTest.class);
        private String zkConnect;
        private static final AutoZkClient zkClient = new AutoZkClient("");
        private static final String TEST_TOPIC = "message-center-biz-expiration-reminder-topic";
        private static final String TEST_GROUP = "hjz-group";
        private NoAckConsumer noAckConsumer;
        private AckConsumer ackConsumer;
        private KafkaProducer kafkaProducer;
        private CountDownLatch finishCountDownLatch = new CountDownLatch(20);
        public void testNoAckConsume() throws InterruptedException {
            class ConsumeRun implements Callable<List<String>> {
                private TheConsumer consumer;
                private CountDownLatch countDownLatch;
                public ConsumeRun(TheConsumer consumer, CountDownLatch countDownLatch) {
                    this.consumer = consumer;
                    this.countDownLatch = countDownLatch;
                public List<String> call() {
                    try {
                    } catch (InterruptedException e) {
                    List<String> messages = consumer.consume(TEST_TOPIC, TEST_GROUP, 8);
                    return messages;
            ExecutorService executorService = Executors.newFixedThreadPool(20);
            CountDownLatch countDownLatch = new CountDownLatch(1);
            List<Future<List<String>>> noAckConsumeFutures = new ArrayList<>(), ackConsumeFutures = new ArrayList<>();
            for (int i = 0; i < 10; ++i) {
                ConsumeRun consumeRun = new ConsumeRun(this.noAckConsumer, countDownLatch);
            for (int i = 0; i < 10; ++i) {
                ConsumeRun consumeRun = new ConsumeRun(this.ackConsumer, countDownLatch);
            System.out.println("no ack consumers response....");
            noAckConsumeFutures.forEach(future -> {
                try {
                } catch (Exception e){
    ack consumers response....");
            ackConsumeFutures.forEach(future -> {
                try {
                } catch (Exception e) {
        public void testProduce() {
            for (int i = 0; i < 100; ++i) {
                kafkaProducer.send(TEST_TOPIC, String.valueOf(i), "message " + i);
            KafkaUtils.consumerDetail(TEST_TOPIC, TEST_GROUP);
        public static void createTopic() {
            MessageCenterConstants.TopicConfig topicConfig = new MessageCenterConstants.TopicConfig();
        public static void testKafka() {
            Properties properties = new Properties();
            properties.put("min.cleanable.dirty.ratio", "0.3");
            KafkaMessageConfig.KafkaUtils.topicChangeConfig(TEST_TOPIC, properties);
            KafkaUtils.topicAlterPartitions(TEST_TOPIC, 7);
        public static void testTopicDescribe() {
            new MessageCenterStateController().topicDescribe("message-center-recharge-transaction-push-topic");
        public static void testConsumerDescribe() {
            String[] args = {
                    "--zookeeper", zkClient.getZkServers(),
                    "--group", "",
                    "--topic", "message-center-recharge-transaction-push-topic"
        public static void testConsumerList() {
            String[] args = {
                    "--broker-list", zkClient.getZkServers(),
                    "--topic", "message-center-recharge-transaction-push-topic",
        public static void main(String[] args) {

      测试no ack 以及 ack的消费结果

    no ack consumers response....
    [message 8, message 14, message 23, message 32, message 41, message 50, message 8, message 14]
    [message 14, message 23, message 32, message 41, message 50, message 12, message 21, message 30]
    [message 17, message 26, message 35, message 44, message 53, message 62, message 71, message 80]
    [message 19, message 28, message 37, message 46, message 55, message 64, message 73, message 82]
    [message 89, message 98, message 89, message 98, message 19, message 28, message 37, message 46]
    [message 0, message 39, message 48, message 57, message 66, message 75, message 84, message 93]
    [message 1, message 49, message 58, message 67, message 76, message 85, message 94, message 77]
    [message 8, message 14, message 23, message 32, message 41, message 50, message 89, message 98]
    [message 17, message 26, message 35, message 44, message 53, message 62, message 71, message 80]
    [message 2, message 59, message 68, message 77, message 86, message 95, message 0, message 39]
    ack consumers response....
    [message 7, message 13, message 22, message 31, message 40, message 5, message 11, message 20]
    [message 17, message 26, message 35, message 44, message 53, message 62, message 71, message 80]
    [message 77, message 86, message 95, message 67, message 76, message 85, message 94, message 0]
    [message 9, message 15, message 24, message 33, message 42, message 51, message 60, message 6]
    [message 4, message 10, message 79, message 88, message 97, message 2, message 59, message 68]
    [message 29, message 38, message 47, message 56, message 65, message 74, message 83, message 92]
    [message 16, message 25, message 34, message 43, message 52, message 61, message 70, message 8]
    [message 18, message 27, message 36, message 45, message 54, message 63, message 72, message 81]
    [message 3, message 69, message 78, message 87, message 96, message 1, message 49, message 58]
    [message 14, message 23, message 32, message 41, message 50, message 89, message 98, message 12]

      消费测试结果分析:no ack的consumer可以实现消息的窥探。










  • 相关阅读:
    query and join operation after sharding
    Windows Phone中的几种集合控件
    什么是SOPA SOPA的危害
    自动刷新人人网API session_key方法
    Windows Phone XNA创建简单局域网游戏
    static 修饰MySqlConnection引发的异常
    $Dsu$ $on$ $Tree$ 复习
    $NOIP2018$ 暴踩全场计划实施方案
    $NOIP2018$ 爆踩全场记
  • 原文地址:https://www.cnblogs.com/hujunzheng/p/9327927.html
Copyright © 2011-2022 走看看