zoukankan      html  css  js  c++  java
  • kafka写入hdfs

    碰到的问题

      (1)线程操作问题,因为单机节点,代码加锁就好了,后续再写

      (2) 消费者写hdfs的时候以流的形式写入,但是什么时候关闭流就是一个大问题了,这里引入了   fsDataOutputStream.hsync();

    1 hsync  保证 hdfs在写数据的时候被新的reader读到,保证数据被datanode持久化

    生产者

     1 package com.xuliugen.kafka.demo;
     2 
     3 import org.apache.kafka.clients.producer.KafkaProducer;
     4 import org.apache.kafka.clients.producer.ProducerRecord;
     5 
     6 import java.util.Properties;
     7 
     8 public class ProducerDemo {
     9 
    10     // Topic
    11     private static final String topic = "tangsonghuai";
    12 
    13     public static void main(String[] args) throws Exception {
    14 
    15         Properties props = new Properties();
    16         props.put("bootstrap.servers", "192.168.15.140:9092");
    17         props.put("acks", "0");
    18         props.put("group.id", "1111");
    19         props.put("retries", "0");
    20         props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    21         props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    22 
    23         //生产者实例
    24         KafkaProducer producer = new KafkaProducer(props);
    25 
    26         int i = 1;
    27 
    28         // 发送业务消息
    29         // 读取文件 读取内存数据库 读socket端口
    30         while (i<50) {
    31             Thread.sleep(100);
    32             producer.send(new ProducerRecord<String, String>(topic, "key:" + i, "value:" + i));
    33             System.out.println("key:" + i + " " + "value:" + i);
    34             i++;
    35         }
    36     }
    37 }
    View Code

    消费者

     1 package com.xuliugen.kafka.demo;
     2 
     3 import org.apache.hadoop.conf.Configuration;
     4 import org.apache.hadoop.fs.FSDataOutputStream;
     5 import org.apache.hadoop.fs.FileSystem;
     6 import org.apache.hadoop.fs.Path;
     7 import org.apache.hadoop.io.IOUtils;
     8 import org.apache.kafka.clients.consumer.ConsumerRecord;
     9 import org.apache.kafka.clients.consumer.ConsumerRecords;
    10 import org.apache.kafka.clients.consumer.KafkaConsumer;
    11 import org.slf4j.Logger;
    12 import org.slf4j.LoggerFactory;
    13 
    14 
    15 import java.io.ByteArrayInputStream;
    16 import java.io.IOException;
    17 import java.net.URI;
    18 import java.util.*;
    19 
    20 public class ConsumerDemo {
    21     private static final Logger logger = LoggerFactory.getLogger(ConsumerDemo.class);
    22     private static final String topic = "tangsonghuai";
    23 
    24     public static void main(String[] args) throws IOException {
    25 
    26         Properties props = new Properties();
    27         props.put("bootstrap.servers", "192.168.15.140:9092");
    28         props.put("group.id", "1111");
    29         props.put("enable.auto.commit", "true");
    30         props.put("auto.commit.interval.ms", "1000");
    31         props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    32         props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    33 
    34         KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
    35 
    36         consumer.subscribe(Arrays.asList(topic));
    37         int i = 0;
    38         String uri = "hdfs://192.168.15.140:9000/";
    39         Configuration configuration = new Configuration();
    40         configuration.set("dfs.client.block.write.replace-datanode-on-failure.policy", "NEVER");
    41 
    42         FileSystem fs = FileSystem.get(URI.create(uri), configuration);
    43         final String pathString = "/d1/tangsonghuai";
    44         final FSDataOutputStream fsDataOutputStream = fs.append(new Path(pathString));
    45         while (true) {
    46             ConsumerRecords<String, String> records = consumer.poll(1000);
    47             for (ConsumerRecord<String, String> record : records) {
    48                 System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
    49 //                HashMap<String, String> hmap= new HashMap<String, String>();
    50 //                hmap.put(record.key(),record.value());
    51 
    52 
    53                 fsDataOutputStream.write((record.offset()+","+record.key() + "," + record.value()+"
    ").getBytes());
    54                 fsDataOutputStream.hsync();
    55                 i++;
    56                 if (i == 70) {
    57                     fsDataOutputStream.close();
    58                     consumer.close();
    59                 }
    60 
    61 //                IOUtils.copyBytes(new ByteArrayInputStream(record.value().getBytes()),
    62 //                        fsDataOutputStream,configuration, true);
    63             }
    64         }
    65 
    66 
    67     }
    68 }
    View Code

    pom.xml

     1 <?xml version="1.0" encoding="UTF-8"?>
     2 <project xmlns="http://maven.apache.org/POM/4.0.0"
     3          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
     4          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     5     <modelVersion>4.0.0</modelVersion>
     6 
     7     <groupId>com.xuliugen.kafka</groupId>
     8     <artifactId>kafka.demo</artifactId>
     9     <version>1.0-SNAPSHOT</version>
    10 
    11 
    12     <dependencies>
    13         <dependency>
    14             <groupId>org.apache.kafka</groupId>
    15             <artifactId>kafka-clients</artifactId>
    16             <version>2.0.0</version>
    17         </dependency>
    18 
    19         <dependency>
    20             <groupId>org.slf4j</groupId>
    21             <artifactId>slf4j-log4j12</artifactId>
    22             <version>1.7.12</version>
    23         </dependency>
    24         <dependency>
    25             <groupId>org.slf4j</groupId>
    26             <artifactId>slf4j-api</artifactId>
    27             <version>1.7.12</version>
    28         </dependency>
    29 
    30         <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common -->
    31         <dependency>
    32             <groupId>org.apache.hadoop</groupId>
    33             <artifactId>hadoop-common</artifactId>
    34             <version>2.8.5</version>
    35         </dependency>
    36 
    37         <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-hdfs -->
    38         <dependency>
    39             <groupId>org.apache.hadoop</groupId>
    40             <artifactId>hadoop-hdfs</artifactId>
    41             <version>2.8.5</version>
    42         </dependency>
    43 
    44 
    45     </dependencies>
    46 
    47 </project>
    View Code
    RUSH B
  • 相关阅读:
    初步掌握HBase
    基于HBase0.98.13搭建HBase HA分布式集群
    获取当前目录中的文件个数
    MapReduce链接作业
    MapReduce二次排序
    使用map端连接结合分布式缓存机制实现Join算法
    字符串匹配算法-BM
    统计电视机顶盒中无效用户数据,并以压缩格式输出有效用户数据
    字符串匹配算法-KMP
    MapReduce中的Join算法
  • 原文地址:https://www.cnblogs.com/tangsonghuai/p/10558635.html
Copyright © 2011-2022 走看看