zoukankan      html  css  js  c++  java
  • 把HDFS里的json数据转换成csv格式

     

    1. 全景图

     NewImage

     .

    2. 用ListHDFS获取所有文件名

    NewImage

     

    如果想重新再取一次,右健view state:

    NewImage

     

    点击 clear state, 再运行,即可再次采集数据了。

     

    3. 用FetchHDFS 取出json 数据

    NewImage

     

    4. 用ExecuteScript 转换

    NewImage

    import org.apache.commons.io.IOUtils
    import java.nio.charset.*
    import java.text.SimpleDateFormat
    import groovy.json.*

    def flowFile = session.get()

    flowFile = session.write(flowFile, {inputStream, outputStream ->

    def js = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
    def data = new JsonSlurper().parseText( js )
    def columns = data.data*.keySet().flatten().unique()

    // Wrap strings in double quotes, and remove nulls
    def encode = { e -> e == null ? '' : e instanceof String ? /"$e"/ : "$e" }

    // Print all the column names
    def columnName = columns.collect { c -> encode( c ) }.join( ',' )

    // Then create all the rows
    def columnData = data.data.collect { row ->
    // A row at a time
    columns.collect { colName -> encode( row[ colName ] ) }.join( ',' )
    }.join( ' ' )

    StringBuilder cd = new StringBuilder()
    cd.append(columnName + " ")
    cd.append(columnData)

    outputStream.write(cd.toString().getBytes(StandardCharsets.UTF_8))
    }as StreamCallback)

    session.transfer(flowFile, REL_SUCCESS)

     

    参考:http://stackoverflow.com/questions/21576162/groovy-code-to-convert-json-to-csv-file

     

    5. 用PutHDFS 插入

    NewImage

     

    问题:

    最近加了cluster,发现listhdfs不能取到数据了:

    NewImage

     

    查看日志:

    NewImage

     

    发现日志里提到了zookeeper导致connection refused

     

    nifi设置成cluster必须走zookeeper来调度资源,所以必须要连上我们的zookeeper server,有一个配置要加

    conf/state-management.xml里面有个配置

    <cluster-provider>
    <id>zk-provider</id>
    <class>org.apache.nifi.controller.state.providers.zookeeper.ZooKeeperStateProvider</class>
    <property name="Connect String">wdp.xxx.cn:2181</property>
    <property name="Root Node">/nifi</property>
    <property name="Session Timeout">30 seconds</property>
    <property name="Access Control">CreatorOnly</property>
    <property name="Username">nifi</property>
    <property name="Password">nifi</property>
    </cluster-provider>

     

     

    NIFI 中国社区 QQ群:595034369

     

     

  • 相关阅读:
    第02组 Alpha冲刺(2/6)
    第02组 Alpha冲刺(1/6)
    第02组 团队Git现场编程实战
    第02组 团队项目-需求分析报告
    团队项目-选题报告
    第二次结对编程作业
    第一次结对编程作业
    第2组 团队展示(组长)
    Exchange 2013 中的 OAB (脱机通讯簿)以及如何管理
    vmware esxi 查看网卡、Raid卡驱动
  • 原文地址:https://www.cnblogs.com/fengwenit/p/5598606.html
Copyright © 2011-2022 走看看