zoukankan      html  css  js  c++  java
  • Kafka笔记--指定消息的partition规则

    参数的设定:参考资料

    不错的资料:http://blog.csdn.net/honglei915/article/details/37697655

    http://developer.51cto.com/art/201501/464491.htm

    注意:在配置文件server.properties中指定了partition的数量num.partitions。这指的是多单个topic的partition数量之和。若有多个broker,可能partition分布在不同的节点上,则多个broker的所有partitioin数量加起来为num.partitions 

    0.7中producer的配置有几项是相排斥的,设置了其一,就不能设置其二
    比如:
      broker.list 与 zk.connect 不能同时设置
      broker.list 与 partitioner.class 不能同时设置
    如果这么干,编译时无所谓,运行时会抛异常

    1,指定broker

    props.put("broker.list", "0:10.10.10.10:9092");//直接连接kafka
    设置这项后,就不能设置partitioner.class了,可是我在运行的时候发现,此时所有的数据都发往10.10.10.10的4个分区,并没有只发给一个分区。我换了syncproducer里的send(topic,partitionid,list)都没用。

    2,指定partition
    props.put("partitioner.class","com.kafka.myparitioner.CidPartitioner");
    props.put("zk.connect", "10.10.10.10:2181");//连接zk

    上面的 com.kafka.myparitioner.CidPartitioner 为自己实现的类,注意要自己实现完整的包名
    CidPartitioner继承了Partitioner类,其中实现的partition方法指定了通过key计算partition的方法

    package com.kafka.myparitioner;
    
    import kafka.producer.Partitioner;
    import kafka.utils.VerifiableProperties;
    
    //设定依据key将当前这条消息发送到哪个partition的规则
    public class CidPartitioner implements Partitioner {
        public CidPartitioner(VerifiableProperties props) {  
              //注意 : 构造函数的函数体没有东西,但是不能没有构造函数          
        }  
        
        @Override
        public int partition(Object key, int numPartitions) {
            try {            
                long partitionNum = Long.parseLong((String) key);
                return (int) Math.abs(partitionNum % numPartitions);
            } catch (Exception e) {
                return Math.abs(key.hashCode() % numPartitions);
            }
        }
    }

    想要依据key来进行partition的分配,需要在发送消息的时候指定key。 

    import java.io.IOException;
    import java.io.InputStream;
    import java.net.ServerSocket;
    import java.net.Socket;
    import java.util.Properties;
    import java.util.regex.Pattern;
    
    import kafka.producer.KeyedMessage;
    import kafka.producer.ProducerConfig;
    
    //与KafkaReceiverLTELogSocket的区别在于,指定了消息的partition分配规则
    public class KafkaReceiveLTELogSocketPartition extends Thread{
        //按照一定的时间间隔发送LTE信令数据
        String regEx ="[^0-9.\+\-\s+\,E]"; 
        Pattern p = Pattern.compile(regEx); 
            
        //第一个类型代表key的类型,第二个代表消息的类型
        private final kafka.javaapi.producer.Producer<String, String> producer;
        private final String topic;
        private final Properties props = new Properties();
        
        private final int port = 12345; 
        
        public KafkaReceiveLTELogSocketPartition(String topic) {
            props.put("serializer.class", "kafka.serializer.StringEncoder");
            props.put("metadata.broker.list", "192.168.1.164:9093"); // 配置kafka端口        
            props.put("partitioner.class","com.kafka.myparitioner.CidPartitioner");
            //props.put("zk.connect", "192.168.1.164:2181");//连接zk,新的版本好像不需要
            
            producer = new kafka.javaapi.producer.Producer<String, String>(new ProducerConfig(props));
            this.topic = topic;
        }
        
        public void receiveAndWrite2(String outputFileName , int port) throws IOException{
            ServerSocket serverSocket = new ServerSocket(port);
            Socket socket = serverSocket.accept();
            StringBuilder sb = new StringBuilder();
            try{
                while(true){                
                    InputStream istream = socket.getInputStream();
                    int count = 0;
                    while (count == 0) {
                        count = istream.available();
                    }
                    byte[] b = new byte[count];
                    istream.read(b);
                    for(int i = 0 ; i < count ; i ++){
                        if(b[i]=='
    '){ //当遇到流中的换行符时,说明已经获取一条完整的信息,发送        
                            String str = sb.toString();
                            
                            //获取key_cid_str
                            String key_cid_str = str.substring(str.indexOf(":")+1, str.indexOf(","));
                            
                            System.out.println("接收长度:"+str.length());
                            System.out.println(str);
                            //第一个参数代表key的类型,第二个参数代表message的类型
                            producer.send(new KeyedMessage<String, String>(topic,key_cid_str,str));
                            
                            sb = new StringBuilder();
                        }else{        
                            sb.append(Character.toChars(b[i]));
                        }
                    }
                }
                
            }finally{
                // 关闭socket,不要再while中关闭,否则发送方每次都要重建连接
                socket.close();
                serverSocket.close();
            }
        }
        
        @Override
        public void run() {
            String filename = "JSON1_Yanming_DriveTesting_09-04.16-17.16-27_TIME.json";
            String outputFileName  = ""+filename;
            
            try {
                receiveAndWrite2(outputFileName,port);
            } catch (IOException e) {    
                e.printStackTrace();
            }        
        }    
        public static void main(String[] args) {
            String topic = "kafka_flume_topic";
            new KafkaReceiveLTELogSocketPartition(topic).start();
        }
    }

    利用KafkaConsumer输出(这里使用高级别Consumer) 

    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.Properties;
    
    import kafka.consumer.ConsumerConfig;
    import kafka.consumer.ConsumerIterator;
    import kafka.consumer.KafkaStream;
    import kafka.javaapi.consumer.ConsumerConnector;
    import kafka.message.MessageAndMetadata;
    
    
    public class KafkaConsumer extends Thread {
        private final ConsumerConnector consumer;
        private final String topic;
    
        public KafkaConsumer(String topic) {
            consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig());
            this.topic = topic;
        }
    
        private static ConsumerConfig createConsumerConfig() {
            Properties props = new Properties();
            props.put("zookeeper.connect", "192.168.1.164:2181"); // zookeeper的地址
            props.put("group.id", "group2"); // 组ID
    
            //zk连接超时
            props.put("zookeeper.session.timeout.ms", "40000");
            props.put("zookeeper.sync.time.ms", "200");
            props.put("auto.commit.interval.ms", "1000");
            
            return new ConsumerConfig(props);
        }
    
        @Override
        public void run() {
            Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
            //设定每个topic开几个线程
            topicCountMap.put(topic, new Integer(1));
            
            Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap     = consumer.createMessageStreams(topicCountMap);
            
            KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);
            ConsumerIterator<byte[], byte[]> it = stream.iterator();
            while (it.hasNext()) {
                MessageAndMetadata<byte[], byte[]> message = it.next();  
                String topic = message.topic();  
                int partition = message.partition();  
                long offset = message.offset();  
                String key = new String(message.key());  
                String msg = new String(message.message());  
                // 在这里处理消息,这里仅简单的输出  
                // 如果消息消费失败,可以将已上信息打印到日志中,活着发送到报警短信和邮件中,以便后续处理  
                System.out.println( " thread : " + Thread.currentThread().getName()  
                        + ", topic : " + topic + ", partition : " + partition + ", offset : " + offset + " , key : "  
                        + key + " , mess : " + msg);              
            }
        }
    }

    附加:Kafka低级别consumer

    package com.cuicui.kafkademon;  
      
      
    import java.nio.ByteBuffer;  
    import java.util.Collections;  
    import java.util.HashMap;  
    import java.util.List;  
    import java.util.Map;  
      
      
    import kafka.api.FetchRequest;  
    import kafka.api.FetchRequestBuilder;  
    import kafka.api.PartitionOffsetRequestInfo;  
    import kafka.cluster.Broker;  
    import kafka.common.TopicAndPartition;  
    import kafka.javaapi.FetchResponse;  
    import kafka.javaapi.OffsetRequest;  
    import kafka.javaapi.OffsetResponse;  
    import kafka.javaapi.PartitionMetadata;  
    import kafka.javaapi.TopicMetadata;  
    import kafka.javaapi.TopicMetadataRequest;  
    import kafka.javaapi.TopicMetadataResponse;  
    import kafka.javaapi.consumer.SimpleConsumer;  
    import kafka.javaapi.message.ByteBufferMessageSet;  
    import kafka.message.Message;  
    import kafka.message.MessageAndOffset;  
      
      
    /** 
     * offset自己维护 目标topic、partition均由自己分配 
     *  
     * @author <a href="mailto:leicui001@126.com">崔磊</a> 
     * @date 2015年11月4日 上午11:44:15 
     * 
     */  
    public class MySimpleConsumer {  
      
      
        public static void main(String[] args) {  
            new MySimpleConsumer().consume();  
        }  
      
      
        /** 
         * 消费消息 
         */  
        public void consume() {  
            int partition = 0;  
      
      
            // 找到leader  
            Broker leaderBroker = findLeader(KafkaProperties.BROKER_CONNECT, KafkaProperties.TOPIC, partition);  
      
      
            // 从leader消费  
            SimpleConsumer simpleConsumer =  
                    new SimpleConsumer(leaderBroker.host(), leaderBroker.port(), 20000, 10000, "mySimpleConsumer");  
            long startOffet = 1;  
            int fetchSize = 1000;  
      
      
            while (true) {  
                long offset = startOffet;  
                // 添加fetch指定目标tipic,分区,起始offset及fetchSize(字节),可以添加多个fetch  
                FetchRequest req =  
                        new FetchRequestBuilder().addFetch(KafkaProperties.TOPIC, 0, startOffet, fetchSize).build();  
      
      
                // 拉取消息  
                FetchResponse fetchResponse = simpleConsumer.fetch(req);  
      
      
                ByteBufferMessageSet messageSet = fetchResponse.messageSet(KafkaProperties.TOPIC, partition);  
                for (MessageAndOffset messageAndOffset : messageSet) {  
                    Message mess = messageAndOffset.message();  
                    ByteBuffer payload = mess.payload();  
                    byte[] bytes = new byte[payload.limit()];  
                    payload.get(bytes);  
                    String msg = new String(bytes);  
      
      
                    offset = messageAndOffset.offset();  
                    System.out.println("partition : " + 3 + ", offset : " + offset + "  mess : " + msg);  
                }  
                // 继续消费下一批  
                startOffet = offset + 1;  
            }  
        }  
      
      
        /** 
         * 找到制定分区的leader broker 
         *  
         * @param brokerHosts broker地址,格式为:“host1:port1,host2:port2,host3:port3” 
         * @param topic topic 
         * @param partition 分区 
         * @return 
         */  
        public Broker findLeader(String brokerHosts, String topic, int partition) {  
            Broker leader = findPartitionMetadata(brokerHosts, topic, partition).leader();  
            System.out.println(String.format("Leader tor topic %s, partition %d is %s:%d", topic, partition, leader.host(),  
                    leader.port()));  
            return leader;  
        }  
      
      
        /** 
         * 找到指定分区的元数据 
         *  
         * @param brokerHosts broker地址,格式为:“host1:port1,host2:port2,host3:port3” 
         * @param topic topic 
         * @param partition 分区 
         * @return 元数据 
         */  
        private PartitionMetadata findPartitionMetadata(String brokerHosts, String topic, int partition) {  
            PartitionMetadata returnMetaData = null;  
            for (String brokerHost : brokerHosts.split(",")) {  
                SimpleConsumer consumer = null;  
                String[] splits = brokerHost.split(":");  
                consumer = new SimpleConsumer(splits[0], Integer.valueOf(splits[1]), 100000, 64 * 1024, "leaderLookup");  
                List<String> topics = Collections.singletonList(topic);  
                TopicMetadataRequest request = new TopicMetadataRequest(topics);  
                TopicMetadataResponse response = consumer.send(request);  
                List<TopicMetadata> topicMetadatas = response.topicsMetadata();  
                for (TopicMetadata topicMetadata : topicMetadatas) {  
                    for (PartitionMetadata PartitionMetadata : topicMetadata.partitionsMetadata()) {  
                        if (PartitionMetadata.partitionId() == partition) {  
                            returnMetaData = PartitionMetadata;  
                        }  
                    }  
                }  
                if (consumer != null)  
                    consumer.close();  
            }  
            return returnMetaData;  
        }  
      
      
        /** 
         * 根据时间戳找到某个客户端消费的offset 
         *  
         * @param consumer SimpleConsumer 
         * @param topic topic 
         * @param partition 分区 
         * @param clientID 客户端的ID 
         * @param whichTime 时间戳 
         * @return offset 
         */  
        public long getLastOffset(SimpleConsumer consumer, String topic, int partition, String clientID, long whichTime) {  
            TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);  
            Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo =  
                    new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();  
            requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));  
            OffsetRequest request = new OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientID);  
            OffsetResponse response = consumer.getOffsetsBefore(request);  
            long[] offsets = response.offsets(topic, partition);  
            return offsets[0];  
        }  
    }  
    View Code
  • 相关阅读:
    我的开发环境配置经验
    C# WINFORM 打包数据库
    C#格式化数值结果表(格式化字符串)
    Excel如何固定表头,任意一行
    下载fiddler证书并设置信任
    fiddler展示serverIP方法
    fiddler抓包参数乱码的解决方法
    fiddler模拟发送get/post请求(也可做简单接口测试)
    Jenkins常用插件
    关于gitignore文件的创建与使用
  • 原文地址:https://www.cnblogs.com/gnivor/p/5318319.html
Copyright © 2011-2022 走看看