zoukankan      html  css  js  c++  java
  • 从kafka读数据写到ES

    http://kafka.apache.org/documentation.html#newconsumerconfigs

    kafka是一款基于发布与订阅的消息系统,它一般被称为“分布式提交日志”或者“分布式流平台”。kafka的数据是按照一定顺序持久化保存的,可以按需读取。

     核心概念:

    消费者与消费组

    Kafka消费者是消费组的一部分,当多个消费者形成一个消费组来消费topic时,每个消费者会收到不同分区的消息。假设有一个topic T1,该topic有4个分区;同时我们有一个消费组G1,这个消费组只有一个消费者C1。那么消费者C1将会收到这4个分区的消息。如果我们增加新的消费者C2到消费组G1,那么每个消费者将会分别收到两个分区的消息,但如果我们继续增加消费者到这个消费组,剩余的消费者将会空闲,不会收到任何消息。所以,我们可以通过增加消费组的消费者来进行水平扩展提升消费能力。这也是为什么建议创建topic时使用较多的分区数,这样可以在消费负载高的情况下增加消费者来提升性能。另外,消费者的数量不应该比分区数多,因为多出来的消费者是空闲的,没有任何帮助。

    Kafka一个很重要的特性就是,只需写入一次消息,可以支持任意多的应用读取这个消息。为了使得每个应用都能读到全量消息,应用需要有不同的消费组

    当消费者离开消费组(比如重启、宕机等)时,它所消费的分区会分配给其他分区。这种现象称为重平衡(rebalance)。重平衡是Kafka一个很重要的性质,这个性质保证了高可用和水平扩展。不过也需要注意到,在重平衡期间,所有消费者都不能消费消息,因此会造成整个消费组短暂的不可用。而且,将分区进行重平衡也会导致原来的消费者状态过期,从而导致消费者需要重新更新状态,这段期间也会降低消费性能。所以,尽量避免重平衡。

    消费者通过定期发送心跳(hearbeat)到一个作为组协调者(group coordinator)的broker来保持在消费组内存活。这个broker不是固定的,每个消费组都可能不同。当消费者拉取消息或者提交时,便会发送心跳。如果消费者超过一定时间没有发送心跳,那么它的会话(session)就会过期,组协调者会认为该消费者已经宕机,然后触发重平衡。从消费者宕机到会话过期是有一定时间的,这段时间内该消费者的分区都不能进行消息消费;通常情况下,我们可以进行关闭,这样消费者会发送离开的消息到组协调者,这样组协调者可以立即进行重平衡而不需要等待会话过期。

    在0.10.1版本,Kafka对心跳机制进行了修改,将发送心跳与拉取消息进行分离,这样使得发送心跳的频率不受拉取的频率影响。另外更高版本的Kafka支持配置一个消费者多长时间不拉取消息但仍然保持存活,这个配置可以避免活锁(livelock,指应用没有故障但是由于某些原因不能进一步消费)

    注意:消费者对象不是线程安全的,也就是不能够多个线程同时使用一个消费者对象,也不能够一个线程有多个消费者对象,即一个线程一个消费者,如果需要多个消费者那么请用多线程来进行一一对应。

    max.poll.records与session.timeout.ms参数问题

    例如:consumer.properties配置中max.poll.records=40  (一次最多拉取40条数据)  session.timeout.ms=30000    (会话时间)

    假设kafka此时一次拉取了40条数据,但在处理第31条的时候抛出了异常,就会导致本次offset不会提交,完了这40条消息都会在接下来的某刻被再次消费,这其中就包含了其实已经消费了的30条数据;

    另一种情况是,如果poll下来数据后,处理这些数据的时间比 session.timeout.ms配置的时间要长,从而导致 rebalanced

    所以,session.timeout.ms和小max.poll.records 具体配置为多少,得看你处理一条消息花费多长时间 x,需要满足 x乘以max.poll.records < session.timeout.ms

      读取kafka消息只需要创建一个kafka消费者(可以认为一个group是一个“订阅者”):

    import com.alibaba.fastjson.JSONObject;
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.TopicPartition;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Map;
    import java.util.Properties;
    import java.util.UUID;
    
    public class KafkaPoolData {
        private KafkaConsumer<String, Map<String, Object>> consumer;
        private String topic;
       
      public KafkaPoolData(String topic) {
            Properties consumerConfig = new Properties();
     
            consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "*.dct-*.com:9092");
            consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "xtt-source-group1"); //消费者组,保证要跟其他的不一样
            consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");// 这个参数指定了当消费者第一次读取分区或者上一次的位置太老(比如消费者下线时间太久)时的行为,可以取值为latest(读取消息队列中最新的消息)或者earliest(从最老的消息开始消费)
     /**kafka根据key值确定消息发往哪个分区(如果分区被指定则发往指定的分区),具有相同key的消息被发往同一个分区,如果key为NONE则随机选择分区,可以使用key_serializer参数序列化为字节类型
            value为要发送的消息值,必须为bytes类型,如果这个值为空,则必须有对应的key值,并且空值被标记为删除。可以通过配置value_serializer参数序列化为字节类型*/
            consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "com.apache.kafka.common.serialization.StringDeserializer"); 
            consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");  //用来做反序列化的,也就是将字节数组转换成对象
            consumer = new KafkaConsumer<String, Map<String, Object>>(consumerConfig); //创建一个消费者
            this.topic = topic;
        }
    
     public void testConsumer(long offset,int number) {
            consumer.poll(1);//我们不断调用poll拉取数据,如果停止拉取,那么Kafka会认为此消费者已经死亡并进行重平衡。参数值是一个超时时间,指明线程如果没有数据时等待多长时间,0表示不等待立即返回
            consumer.seekToBeginning();//在给消费者分配分区的时候将消息偏移量跳转到起始位置 。
            List<TopicPartition> partitions = new ArrayList<TopicPartition>();
           //与subscirbe方法不同,assign方法由用户直接手动consumer实例消费哪些具体分区,assign的consumer不会拥有kafka的group management机制,也就是当group内消费者数量变化的时候不会有reblance行为发生。assign的方法不能和subscribe方法同时使用。
            for (int i = 0; i < 10; i++) {
                TopicPartition partition = new TopicPartition(topic, i);//i是指定分区partition
                partitions.add(partition);
                consumer.assign(partitions);
                consumer.seek(partition, offset);// 指定从这个topic和partition的哪个位置获取
            }  //consumer要消费0-9分区
           //consumer.close(); 主动关闭可以使得Kafka立即进行重平衡而不需要等待会话过期
            boolean result = true;
            int count = 0;
            System.out.println("
    Consumer begin!
    ");
            while (result) {
                System.out.println("
    Consumer data begin!
    ");
                ConsumerRecords<String, Map<String, Object>> records = consumer.poll(1000); //拉取消费记录
    
                for (ConsumerRecord<String, Map<String, Object>> record : records) {
                    JSONObject json = new JSONObject();
                    Map<String, Object> map = record.value();
                    //System.out.println("
    Parse data!
    ");
                    for (Map.Entry<?, ?> entry : map.entrySet()) {
                        String filedName = String.valueOf(entry.getKey());
                        if (filedName.equals("image_data")) {
    
                        }else {
                            json.put(filedName, String.valueOf(entry.getValue())); //将非图片信息保存到json
                        }
                    }
                     if(json.containsKey("time_stamp")) {
                        if (json.get("time_stamp") != null && (json.get("time_stamp").toString().compareTo("2018-01-19 10:30:00")) > 0 && json.get("time_stamp").toString().compareTo("2018-01-19 11:00:00") < 0) {
                            /*SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                            String timeStamp = sdf.format(json.getString("time_stamp"));
                            json.put("time_stamp",timeStamp);*/
                            System.out.println("put to es");
                            //生成es索引的文档id
                            UUID uuid = UUID.randomUUID();
                            String id = uuid.toString();
                            PutDataToEs.PutToEs(json,id); 
                        }
                    }
                    count++;
                    if (count % 100 == 0) {
                        System.out.println("parse data number : " + count); //打印写数据的数量
                    }
                    if (count == number) {
                        result = false;
                        break;
                    }
                }
            }
            System.out.println("
    Parse data Finished! count=" + count + "
    ");
            System.out.println("
    Consumer Finished!
    ");
        }
    
    public static void main(String[] args) {
            //初始化ES
            PutDataToEs.initClient();
            //topic名称
            String topic = "*-analysis-v1-1-production";
            //String topic = args[0];
            long offset =110000; //从哪里开始
           // long offset = Long.parseLong(args[0]);
            int number=1000000;  //需要拉取多少数据
           // int number = Integer.parseInt(args[1]);
            KafkaPoolData consumer = new KafkaPoolData(topic);
            consumer.testConsumer(offset,number);
            PutDataToEs.closeClient();
        }
    }

     写入ES:

    public class PutDataToEs {
    
        private static TransportClient client = null;
    
        private static long randomNum(long begin, long end)
        {
            long rtn = begin + (long)(Math.random() * (end - begin));
            if (rtn == begin || rtn == end)
            {
                return randomNum(begin,end);
            }
            return rtn;
        }
    
        public static void initClient() {
            try {
                // on startup
                Settings settings = Settings.builder()
                        .put("cluster.name", "*-*.com-es").build();
                client = new PreBuiltTransportClient(settings)
                        .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("10.45.*.*"), 9300));
            } catch (UnknownHostException e) {
                e.printStackTrace();
            }
        }
    
    
        public static void closeClient() {
            client.close();
        }
    
        public static void PutToEs(JSONObject json,String id) {
    
            try {
                String msgType="";
                String resultType="";
                SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                sdf.setTimeZone(TimeZone.getTimeZone("UTC"));
                if(json.containsKey("msg_type")){
                    msgType = json.getString("msg_type");
                }
                if(json.containsKey("result_type")){
                    resultType = json.getString("result_type");
                }
    
              BulkRequestBuilder bulkRequest = client.prepareBulk();
    
              bulkRequest.add(client.prepareIndex("kafka_to_es_test", "kafkadata", String.valueOf(id)).setSource(jsonBuilder()
                                .startObject()
                                .field("time_stamp", sdf.parse(timeStamp))
                                .field("msg_type", msgType)
                                .endObject()
                        )
                );
              BulkResponse bulkResponse = bulkRequest.get();
                 if (bulkResponse.hasFailures()) {
                    // process failures by iterating through each bulk response item
                    System.out.println(bulkRequest.toString());
                 }
            }catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
  • 相关阅读:
    Linux文件和目录的属性及权限
    chkconfig原理
    Linux启动过程
    正则表达式(grep,awk,sed)和通配符
    Linux系统目录结构:目录层次标准、常用目录和文件
    Linux系统目录结构
    虚拟机快照和克隆
    Linux系统的基础优化
    Linux系统应用管理:增加普通用户(密码管理等)
    [译]java9新特性:在接口中用pirvate方法让default(java8接口特性)更简练
  • 原文地址:https://www.cnblogs.com/zling/p/10450259.html
Copyright © 2011-2022 走看看