zoukankan      html  css  js  c++  java
  • 通过 flume 上传数据到hive

    目标:  通过接受 1084端口的http请求信息, 存储到 hive数据库中,
    osgi为hive中创建的数据库名称
    periodic_report6 为创建的数据表,

    flume配置如下:

    a1.sources=r1
    a1.channels=c1
    a1.sinks=k1

    a1.sources.r1.type = http
    a1.sources.r1.bind = 0.0.0.0
    a1.sources.r1.port = 1084
    a1.sources.r1.handler=jkong.test.PlainJSONHandler2

    #a1.sources.r1.interceptors=i1 i2
    #a1.sources.r1.interceptors.i1.type=regex_filter
    #a1.sources.r1.interceptors.i1.regex=\{.*\}
    #a1.sources.r1.interceptors.i2.type=timestamp

    a1.channels.c1.type=memory
    a1.channels.c1.capacity=10000
    a1.channels.c1.transactionCapacity=1000
    a1.channels.c1.keep-alive=30

    a1.sinks.k1.type=hdfs
    a1.sinks.k1.channel=c1
    a1.sinks.k1.hdfs.useLocalTimeStamp = true
    a1.sinks.k1.hdfs.path=hdfs://hadoop:9000/user/hive/warehouse/osgi.db/periodic_report6/day=%y-%m-%d/mf=%{manufacture}/sn=%{deviceId}
    a1.sinks.k1.hdfs.fileType=DataStream
    a1.sinks.k1.hdfs.writeFormat=Text
    a1.sinks.k1.hdfs.rollInterval=0
    a1.sinks.k1.hdfs.rollSize=67108864
    a1.sinks.k1.hdfs.rollCount=0
    a1.sinks.k1.hdfs.idleTimeout=60

    a1.sources.r1.channels=c1
    a1.sinks.k1.channel=c1

     2.  数据表创建: 格式化json需要多加2个jar包json-serde-1.3.8-jar-with-dependencies.jar 和json-udf-1.3.8-jar-with-dependencies.jar, 具体参考如下flume存储数据到hive)   hive 安装参考 程序安装中的 hive 安装

    链接:https://pan.baidu.com/s/1suPzGJmtJlsROC6SVpcztQ 密码:zlgg

     create table period_data(deviceId STRING,actualTime STRING, manufacture STRING, information STRING) partitioned by (day string, mf string, sn string) row format serde "org.openx.data.jsonserde.JsonSerDe" WITH SERDEPROPERTIES("deviceId"="$.deviceId","actualTime"="$.actualTime","manufacture"="$.manufacture","information"="$.information");

      2.1  将数据表中的字段也同样拆分成数据字段的创表语句(还没有试验, 暂时不用)

    create table periodic_report4(id BIGINT, deviceId STRING,report_time STRING,information STRUCT<actualTime:BIGINT,dpiVersionInfo:STRING,subDeviceInfo:STRING,wanTrafficData:STRING,ponInfo:STRING,eventType:STRING,potsInfo:STRING,deviceInfo:STRING,deviceStatus:STRING>) row format serde "org.openx.data.jsonserde.JsonSerDe" WITH SERDEPROPERTIES("input.invalid.ignore"="true","id"="$.id","deviceId"="$.deviceId","report_time"="$.report_time","requestParams.actualTime"="$.requestParams.actualTime","requestParams.dpiVersionInfo"="$.requestParams.dpiVersionInfo","requestParams.subDeviceInfo"="$.requestParams.subDeviceInfo","requestParams.wanTrafficData"="$.requestParams.wanTrafficData","requestParams.ponInfo"="$.requestParams.ponInfo","requestParams.eventType"="$.requestParams.eventType","requestParams.potsInfo"="$.requestParams.potsInfo","requestParams.deviceInfo"="$.requestParams.deviceInfo","requestParams.deviceStatus"="$.requestParams.deviceStatus"); 

    3. 启动flume语句:flume 根目录

    bin/flume-ng agent --conf ./conf/ -f ./conf/flume.conf --name a1 -Dflume.root.logger=DEBUG,console  //带 log 启动

    nohup ./flume-ng agent --conf .././conf/ -f .././conf/flume.conf1 --name a1 &     后台启动

    4. 启动hive语句: hive bin目录

    ./hive    #启动 hive 客户端
    ./hive -hiveconf hive.root.logger=DEBUG,console  #带log信息启动

    ./hiveserver2 #启动 hive2 服务器
    nohup ./hiveserver2 &  后台启动 hive2 服务器

    5.  flume 数据过滤类 ,  链接 hive 创建 patition,  需要将 jar 包拷贝到 flume 中的lib目录(链接:https://pan.baidu.com/s/1GR1xbmXwFT_-t7rJJcPvgA 密码:nbv9)

    package jkong.test;
    
    import java.io.BufferedReader;
    import java.io.IOException;
    import java.io.PrintWriter;
    import java.nio.charset.Charset;
    import java.sql.Connection;
    import java.sql.DriverManager;
    import java.sql.ResultSet;
    import java.sql.SQLException;
    import java.sql.Statement;
    import java.text.SimpleDateFormat;
    import java.util.ArrayList;
    import java.util.Date;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    
    import javax.servlet.http.HttpServletRequest;
    import javax.servlet.http.HttpServletResponse;
    
    import org.apache.flume.Context;
    import org.apache.flume.Event;
    import org.apache.flume.event.EventBuilder;
    import org.apache.flume.source.http.BidirectionalHTTPSourceHandler;
    import org.apache.flume.source.http.JSONHandler;
    import org.json.JSONObject;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    public class PlainJSONHandler2 implements BidirectionalHTTPSourceHandler{
        private static final Logger LOG = LoggerFactory.getLogger(JSONHandler.class);
        private static int data_number = 0;
        @Override
        public void configure(Context cont) {
            data_number = 0;
        }
    
        @Override
        public List<Event> getEvents(HttpServletRequest request, HttpServletResponse respose) {
            String readLine = null;
            String deviceSN = null;
            String actualTime = null;
            Map<String, String> headers = null;
            try {
                if(data_number > 65536)
                    data_number = 0;
                
                if(data_number%800 != 0){
                    return null;
                }
                
                BufferedReader reader = request.getReader();
                String charset = request.getCharacterEncoding();
                
                if (charset != null) {
                  LOG.debug("Charset is " + charset);
                  charset = "UTF-8";
                }
                
                readLine = reader.readLine();
                
                headers = new HashMap<String, String>();
                
                if(readLine != null){
                    int start = readLine.indexOf("deviceId");
                    deviceSN = readLine.substring(start+11, start+23);
                    start = readLine.indexOf("actualTime");
                    actualTime = readLine.substring(start+12, start+25);
                    String manufacture = deviceSN.substring(0, 3);
                      headers.put("deviceId", deviceSN);
                    headers.put("manufacture", manufacture);
                    
                    MyRunnable R1 = new MyRunnable(deviceSN);
                    R1.start();
                    
                    
                    JSONObject json = new JSONObject();
                    json.put("deviceId", deviceSN);
                    json.put("actualTime", actualTime);
                    json.put("manufacture", manufacture);
                    json.put("information", readLine);
                    readLine = json.toString();
                }
                String result = getResult(deviceSN);
                PrintWriter writer = respose.getWriter();
                writer.println(result);
                writer.flush();
                writer.close();
                result = "";
            } catch (IOException e) {
                e.printStackTrace();
            }
            return getSimpleEvents(readLine, headers);
        }
        
        public String getResult(String deviceSN){
    //         long currentTime = System.currentTimeMillis();
    //            Date curDate = new Date(currentTime);
                String result = new String("{"result": 0,"timeStamp": "2018-08-14","periodConfigParameter": {"uploadConfig": {"msgreportInterval": "36000"}}}");
    //            SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd");
    //            String showTime = formatter.format(curDate);
                return result;
        }
    
        @Override
        public void onChannelException(HttpServletRequest request, HttpServletResponse response, Exception ex) {
            
        }
    
        @Override
        public void onSuccessfulCommit(HttpServletRequest request, HttpServletResponse response) {
            
        }
        
        private List<Event> getSimpleEvents(String events, Map<String, String> headers) {
            if(events == null)
                return null;
              List<Event> newEvents = new ArrayList<Event>();
            newEvents.add(EventBuilder.withBody(events, Charset.forName("UTF-8"), headers));
            System.out.println("info: " + newEvents.toString());
            return newEvents;
      }
    }
    
    
    class MyRunnable implements Runnable {
        private Thread t;
        private String deviceSN;
        
        private String connUrl = "jdbc:hive2://localhost:10000/osgi";
        private String userName = "hive";
        private String passWord = "hive";
        private Connection conn = null;
        private String tableName = "period_data";
        private boolean isHasPartition = false;
    
        MyRunnable(String deviceSN) {
            this.deviceSN = deviceSN;
        }
    
        public void run() {
            Date date = new Date();
            SimpleDateFormat sd = new SimpleDateFormat("yy-MM-dd");
            String day = sd.format(date);
            
            String manufacture = deviceSN.substring(0, 3);
            addPartition(day, manufacture, deviceSN);
        }
    
        public void start() {
            if (t == null) {
                t = new Thread(this, deviceSN);
                t.start();
            }
        }
        
        public void addPartition(String day, String manufacture, String deviceSN) {
            try {
                if (null == conn) {
                    conn = getConnect(userName, passWord, connUrl);
                }
                Statement stmt = conn.createStatement();
                String addPartition = "alter table "+tableName+" add partition (day='"+day+"', mf='"+manufacture+"', sn='"+deviceSN+"')";
                System.out.println(addPartition);
                String showPartitions = "show partitions "+ tableName;
                System.out.println(showPartitions);
                ResultSet res = stmt.executeQuery(showPartitions);
                
                while (res.next()) {
                    System.out.println("已创建分区: "+res.getString(1));
                    if (("day="+day+"/mf="+manufacture+"/sn="+deviceSN+"").equals(res.getString(1))) {
                        isHasPartition = true;
                    }
                }
    
                if (!isHasPartition) {
                    System.out.println("开始创建分区!!!");
                    stmt.executeUpdate(addPartition);
                }
                isHasPartition = false;
    
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
        
        public Connection getConnect(String userName, String passWord, String connUrl) {
            String driverName = "org.apache.hive.jdbc.HiveDriver";
            Connection conn = null;
            try {
                Class.forName(driverName);
                conn = DriverManager.getConnection(connUrl, userName, passWord);
            } catch (ClassNotFoundException e) {
                System.out.println("没有找到驱动类");
                e.printStackTrace();
            } catch (SQLException e) {
                e.printStackTrace();
            }
            return conn;
        }
    }

    package jkong.test;
    import java.io.BufferedReader;import java.io.IOException;import java.io.PrintWriter;import java.nio.charset.Charset;import java.sql.Connection;import java.sql.DriverManager;import java.sql.ResultSet;import java.sql.SQLException;import java.sql.Statement;import java.text.SimpleDateFormat;import java.util.ArrayList;import java.util.Date;import java.util.HashMap;import java.util.List;import java.util.Map;
    import javax.servlet.http.HttpServletRequest;import javax.servlet.http.HttpServletResponse;
    import org.apache.flume.Context;import org.apache.flume.Event;import org.apache.flume.event.EventBuilder;import org.apache.flume.source.http.BidirectionalHTTPSourceHandler;import org.apache.flume.source.http.JSONHandler;import org.json.JSONObject;import org.slf4j.Logger;import org.slf4j.LoggerFactory;
    public class PlainJSONHandler2 implements BidirectionalHTTPSourceHandler{private static final Logger LOG = LoggerFactory.getLogger(JSONHandler.class);private static int data_number = 0;@Overridepublic void configure(Context cont) {data_number = 0;}
    @Overridepublic List<Event> getEvents(HttpServletRequest request, HttpServletResponse respose) {String readLine = null;String deviceSN = null;String actualTime = null;Map<String, String> headers = null;try {if(data_number > 65536)data_number = 0;if(data_number%800 != 0){return null;}BufferedReader reader = request.getReader();String charset = request.getCharacterEncoding();if (charset != null) {  LOG.debug("Charset is " + charset);  charset = "UTF-8";}readLine = reader.readLine();headers = new HashMap<String, String>();if(readLine != null){    int start = readLine.indexOf("deviceId");    deviceSN = readLine.substring(start+11, start+23);    start = readLine.indexOf("actualTime");    actualTime = readLine.substring(start+12, start+25);    String manufacture = deviceSN.substring(0, 3);  headers.put("deviceId", deviceSN);headers.put("manufacture", manufacture);MyRunnable R1 = new MyRunnable(deviceSN);R1.start();JSONObject json = new JSONObject();json.put("deviceId", deviceSN);json.put("actualTime", actualTime);json.put("manufacture", manufacture);json.put("information", readLine);readLine = json.toString();}String result = getResult(deviceSN);PrintWriter writer = respose.getWriter();    writer.println(result);    writer.flush();    writer.close();    result = "";} catch (IOException e) {e.printStackTrace();}return getSimpleEvents(readLine, headers);}public String getResult(String deviceSN){// long currentTime = System.currentTimeMillis();//Date curDate = new Date(currentTime);String result = new String("{"result": 0,"timeStamp": "2018-08-14","periodConfigParameter": {"uploadConfig": {"msgreportInterval": "36000"}}}");//SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd");//String showTime = formatter.format(curDate);    return result;}
    @Overridepublic void onChannelException(HttpServletRequest request, HttpServletResponse response, Exception ex) {}
    @Overridepublic void onSuccessfulCommit(HttpServletRequest request, HttpServletResponse response) {}private List<Event> getSimpleEvents(String events, Map<String, String> headers) {if(events == null)return null;  List<Event> newEvents = new ArrayList<Event>();    newEvents.add(EventBuilder.withBody(events, Charset.forName("UTF-8"), headers));    System.out.println("info: " + newEvents.toString());    return newEvents;  }}

    class MyRunnable implements Runnable {private Thread t;private String deviceSN;private String connUrl = "jdbc:hive2://localhost:10000/osgi";private String userName = "hive";private String passWord = "hive";private Connection conn = null;private String tableName = "period_data";private boolean isHasPartition = false;
    MyRunnable(String deviceSN) {this.deviceSN = deviceSN;}
    public void run() {Date date = new Date();    SimpleDateFormat sd = new SimpleDateFormat("yy-MM-dd");    String day = sd.format(date);        String manufacture = deviceSN.substring(0, 3);    addPartition(day, manufacture, deviceSN);}
    public void start() {if (t == null) {t = new Thread(this, deviceSN);t.start();}}public void addPartition(String day, String manufacture, String deviceSN) {try {if (null == conn) {conn = getConnect(userName, passWord, connUrl);}Statement stmt = conn.createStatement();String addPartition = "alter table "+tableName+" add partition (day='"+day+"', mf='"+manufacture+"', sn='"+deviceSN+"')";System.out.println(addPartition);String showPartitions = "show partitions "+ tableName;System.out.println(showPartitions);ResultSet res = stmt.executeQuery(showPartitions);while (res.next()) {System.out.println("已创建分区: "+res.getString(1));if (("day="+day+"/mf="+manufacture+"/sn="+deviceSN+"").equals(res.getString(1))) {isHasPartition = true;}}
    if (!isHasPartition) {System.out.println("开始创建分区!!!");stmt.executeUpdate(addPartition);}isHasPartition = false;
    } catch (SQLException e) {e.printStackTrace();}}public Connection getConnect(String userName, String passWord, String connUrl) {String driverName = "org.apache.hive.jdbc.HiveDriver";Connection conn = null;try {Class.forName(driverName);conn = DriverManager.getConnection(connUrl, userName, passWord);} catch (ClassNotFoundException e) {System.out.println("没有找到驱动类");e.printStackTrace();} catch (SQLException e) {e.printStackTrace();}return conn;}}

  • 相关阅读:
    make输出全部重定向到文件
    python selenium-webdriver 元素定位(三)
    通过vmware 启动cloudera-quickstart-vm-5.10.0-0-vmware.zip镜像无法启动。
    编写第一个python selenium-webdriver程序(二)
    sublime Text3 新建文件时定义模块
    python selenium-webdriver 环境搭建(一)
    gitlab 添加SSH Key
    python 使用 'python -m pip install --upgrade pip'提示PermissionError: [WinError 5] 拒绝访问
    bitnami gitlab 配置域名
    bitnami gitlab 安装
  • 原文地址:https://www.cnblogs.com/redhat0019/p/9055335.html
Copyright © 2011-2022 走看看