zoukankan      html  css  js  c++  java
  • kafka消费者实时消费数据存入hdfs java scalca 代码

    
    
    hadoop-client依赖很乱 调试很多次cdh版本好多jar没有 用hadoop2.7.3可以
    
    
    
       自定义输出流的池子进行流管理
     public void writeLog2HDFS(String path, byte[] log) {
            try {
                //得到我们的装饰流
                FSDataOutputStream out = HDFSOutputStreamPool.getInstance().takeOutputStream(path);
                out.write(log);
                out.write("
    ".getBytes());
                out.hsync();
                out.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    /**
      * @created by imp ON 2019/3/1
      */
    object KafkaScalaConsumer {
    
      val  write=new HDFSWriter()
    
      def ZK_CONN     = "192.168.121.12:2181"
      def GROUP_ID    = "1test-consumer-group109"
      def TOPIC       = "eshop"
    
    
      def main(args: Array[String]): Unit = {
        //println(" 开始了 ")
    
        val connector = Consumer.create(createConfig())
    
        val topicCountMap = new HashMap[String, Int]()
        topicCountMap.put(TOPIC, 3) // TOPIC在创建时就指定了它有3个partition
    
        val msgStreams: Map[String, List[KafkaStream[Array[Byte], Array[Byte]]]] = connector.createMessageStreams(topicCountMap)
    
        println("# of streams is " + msgStreams.get(TOPIC).get.size)
    
        val threadPool:ExecutorService=Executors.newFixedThreadPool(3)
    
        var index = 0;
        for (stream <- msgStreams.get(TOPIC).get) {
          threadPool.execute(new ThreadDemo("consumer_"+index,stream))
          index+=1;
        }
      }
    
      class ThreadDemo(threadName:String,stream:KafkaStream[Array[Byte], Array[Byte]]) extends Runnable{
        override def run(): Unit = {
    
          val it: ConsumerIterator[Array[Byte], Array[Byte]] = stream.iterator();
    
          while(it.hasNext()){
            val data : MessageAndMetadata[Array[Byte], Array[Byte]] = it.next()
            val msg=data.message()
            val log = new String(msg)
            val arr = StringUtil.splitLog(log)
            if (arr == null || arr.length < 1) return //todo: continue is not supported
            //主机名
            val hostname = StringUtil.getHostname(arr)
            //日期串
            val dateStr = StringUtil.formatYyyyMmDdHhMi(arr)
            //path
            val rawPath = "/spark/eshop/" + dateStr + "/" + hostname + ".log"
    
            //写入数据到hdfs
            System.out.println(log)
           write .writeLog2HDFS(rawPath, msg)
          }
        }
      }
    
      def createConfig(): ConsumerConfig = {
        val props = new Properties()
        props.put("zookeeper.connect", ZK_CONN)
    //    props.put("bootstrap.servers","localhost:9092")
        props.put("group.id", GROUP_ID)
        props.put("zookeeper.session.timeout.ms", "5000")
        props.put("zookeeper.connection.timeout.ms","10000")
        props.put("auto.offset.reset", "smallest")
        props.put("auto.commit.interval.ms", "300")
        props.put("rebalance.backoff.ms","2000")
        props.put("rebalance.max.retries","10")
        props.put("auto.offset.reset", "smallest")
        new ConsumerConfig(props)
      }
    }
  • 相关阅读:
    Sqlite EF6注册
    C# 等值锁定
    net 4.0+EF6+Sqlite 使用,安装,打包
    C#调用C++函数
    C# 调用.exe文件
    Java继承
    python多线程与threading模块
    Java对象构造
    python多线程与_thread模块
    Linux文件压缩与打包
  • 原文地址:https://www.cnblogs.com/hejunhong/p/10469657.html
Copyright © 2011-2022 走看看