zoukankan      html  css  js  c++  java
  • 使用原生方法从kafka消费消息

        kafka最早是linkedin开发的一套高性能类队列结构,具有发布—订阅功能。现在是apache的项目之一。支持很多种客户端从其中进行consume,网上也有许多第三方的客户端(注1),但下面我们只使用其自己的包中的方法,来进行consume。我们的这个例子是从一个servlet中调用kafka的Consumer相关类,来读取远端kafka中的message。

    代码如下:

        protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
            String topic = "test";
            List<Message> list = new ArrayList<Message>();
            StringBuilder builder = new StringBuilder();
            KafkaHttpConsumer consumer = new KafkaHttpConsumer();
            list = consumer.consume(topic);
            builder.append("[");
            for(int i=0; i<list.size(); i++){
                builder.append(list.get(i).message);
                builder.append(",");
            }
            builder.deleteCharAt(builder.length()-1);
            builder.append("]");
            response.getWriter().append(builder.toString());
        }
    
    

    import java.io.FileNotFoundException;
    import java.io.IOException;
    import java.nio.charset.Charset;
    import java.util.ArrayList;
    import java.util.Collections;
    import java.util.Iterator;
    import java.util.List;
    import java.util.Map;
    import java.util.Properties;

    import com.fasterxml.jackson.annotation.JsonInclude;

    import kafka.consumer.Consumer;
    import kafka.consumer.ConsumerConfig;
    import kafka.consumer.ConsumerTimeoutException;
    import kafka.consumer.KafkaStream;
    import kafka.javaapi.consumer.ConsumerConnector;
    import kafka.message.MessageAndMetadata;


    public
    class KafkaHttpConsumer { public List<Message> consume(String topic) { Properties prop = new Properties(); try { prop.load(this.getClass().getResourceAsStream("/kafka-http.properties")); } catch (FileNotFoundException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } ConsumerConfig config = new ConsumerConfig(prop); ConsumerConnector connector = Consumer.createJavaConsumerConnector(config); Map<String, Integer> streamCounts = Collections.singletonMap(topic, 1); Map<String, List<KafkaStream<byte[], byte[]>>> streams = connector.createMessageStreams(streamCounts); KafkaStream<byte[], byte[]> stream = streams.get(topic).get(0);
    List
    <Message> messages = new ArrayList<>(); try { for (MessageAndMetadata<byte[], byte[]> messageAndMetadata : stream) messages.add(new Message(messageAndMetadata)); } catch (ConsumerTimeoutException ignore) { } finally { connector.commitOffsets(); connector.shutdown(); } return messages; } /* for test */ public static void main(String[] args) { Properties prop = new Properties(); try { prop.load(KafkaHttpConsumer.class.getResourceAsStream("/kafka-http.properties")); Iterator<Object> ite = prop.keySet().iterator(); while(ite.hasNext()){ String key = (String)ite.next(); System.out.println("value:" + prop.getProperty(key)); } } catch (FileNotFoundException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } public static class Message { public String topic; @JsonInclude(JsonInclude.Include.NON_NULL) public String key; public String message; public int partition; public long offset; public Message(MessageAndMetadata<byte[], byte[]> message) { this.topic = message.topic(); this.key = message.key() != null ? new String(message.key(), Charset.forName("utf-8")) : null; this.message = new String(message.message(), Charset.forName("utf-8")); this.partition = message.partition(); this.offset = message.offset(); } } }

     kafka-http.properties

    #for read from kafka
    zookeeper.connect=192.20.34.144:2181
    group.id=group
    auto.offset.reset=smallest
    consumer.timeout.ms=500

    注1:https://cwiki.apache.org/confluence/display/KAFKA/Clients

  • 相关阅读:
    CentOS 6.x 系统安装选项说明
    MySQL表的操作
    6月13号
    6月11号
    6月10号
    6月9号
    6月6
    day27
    day 28
    day 29
  • 原文地址:https://www.cnblogs.com/lyhero11/p/5186850.html
Copyright © 2011-2022 走看看