zoukankan      html  css  js  c++  java
  • flume客户端模拟数据发送并记录在mysql数据库

    这里只是做了简单的demo,并未深研

    1.编写PolluteSink

      1.1 maven创建项目(pom.xml)

    <dependencies>
    <dependency>
    <groupId>org.apache.flume</groupId>
    <artifactId>flume-ng-core</artifactId>
    <version>1.6.0</version>
    </dependency>
    <dependency>
    <groupId>org.apache.flume</groupId>
    <artifactId>flume-ng-configuration</artifactId>
    <version>1.6.0</version>
    </dependency>
    <dependency>
    <groupId>org.apache.flume</groupId>
    <artifactId>flume-ng-sdk</artifactId>
    <version>1.6.0</version>
    </dependency>
    <dependency>
    <groupId>log4j</groupId>
    <artifactId>log4j</artifactId>
    <version>1.2.17</version>
    </dependency>
    <dependency>
    <groupId>org.apache.flume</groupId>
    <artifactId>flume-ng-core</artifactId>
    <version>1.6.0</version>
    </dependency>
    <dependency>
    <groupId>org.apache.camel</groupId>
    <artifactId>camel-support</artifactId>
    <version>3.0.0-M2</version>
    </dependency>
    <dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>6.0.6</version>
    </dependency>

    </dependencies>

    <build>
    <plugins>
    <plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-compiler-plugin</artifactId>
    <version>3.1</version>
    <configuration>
    <source>1.8</source>
    <target>1.8</target>
    </configuration>
    </plugin>
    </plugins>
    </build>

      1.2 自定义sink

    -----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

    package com.example.demo1.pollute;

    import com.example.demo1.entity.Pollute;
    import com.example.demo1.utils.FormatDataUtil;
    import com.google.common.base.Preconditions;
    import com.google.common.collect.Lists;
    import org.apache.flume.*;
    import org.apache.flume.conf.Configurable;
    import org.apache.flume.sink.AbstractSink;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;

    import java.nio.charset.Charset;
    import java.sql.*;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Map;

    /**
    * 污染源在线自动监控(监测)系统数据传输标准(国标)
    *
    * @author chenxiaokang
    * @date 2019/10/14 13:53
    */
    public class PolluteSink extends AbstractSink implements Configurable {

    private Logger LOG = LoggerFactory.getLogger(PolluteSink.class);

    private String tableName;
    private String user;
    private String password;
    private String hostname;
    private String port;
    private Connection conn;
    private String databaseName;
    private PreparedStatement preparedStatement;

    @Override
    public void configure(Context context) {
    databaseName = context.getString("databaseName");
    Preconditions.checkNotNull(databaseName, "databaseName must be set!!");
    tableName = context.getString("tableName");
    Preconditions.checkNotNull(tableName, "tableName must be set!!");
    user = context.getString("user");
    Preconditions.checkNotNull(user, "user must be set!!");
    password = context.getString("password");
    Preconditions.checkNotNull(password, "password must be set!!");
    hostname = context.getString("hostname");
    Preconditions.checkNotNull(hostname, "host must be set!!");
    port = context.getString("port");
    Preconditions.checkNotNull(port, "port must be set!!");
    }

    @Override
    public Status process() throws EventDeliveryException {
    Status result = Status.READY;
    Channel channel = getChannel();
    List<String> actions = Lists.newArrayList();
    Transaction transaction = null;
    try {
    transaction = channel.getTransaction();
    Event event;
    String content;
    transaction.begin();
    event = channel.take();
    LOG.info("event:{}",event);
    if (event != null) {
    System.out.println("content++++:"+event.getBody());
    content = new String(event.getBody(), Charset.forName("UTF-8"));
    System.out.println("content"+content);
    actions.add(content);
    } else {
    result = Status.BACKOFF;
    }
    Map<String, String> map = null;
    System.out.println("actions.size() :"+actions.size() );
    if (actions.size() > 0) {
    for (String temp : actions) {
    LOG.info("--- content : " + temp);
    System.out.println(temp);
    map = FormatDataUtil.getMapByData(temp);
    List<Pollute> pollutes = new ArrayList<>();
    for (Map.Entry<String, String> entry : map.entrySet()) {
    Pollute pollute = new Pollute();
    pollute.setKey(entry.getKey());
    pollute.setValue(entry.getValue());
    pollutes.add(pollute);
    }
    if (pollutes.size() > 0) {
    preparedStatement.clearBatch();
    for (Pollute pollute : pollutes) {
    preparedStatement.setString(1, pollute.getKey());
    preparedStatement.setString(2, pollute.getValue());
    preparedStatement.setTimestamp(3,new Timestamp(System.currentTimeMillis()));
    preparedStatement.addBatch();
    }
    preparedStatement.executeBatch();
    conn.commit();
    }
    }
    }
    transaction.commit();
    } catch (Exception e) {
    e.printStackTrace();
    LOG.error("", e);
    } finally {
    if (transaction != null) {
    transaction.close();
    }

    }
    return result;
    }

    @Override
    public void start() {
    super.start();
    try {
    //调用Class.forName()方法加载驱动程序
    Class.forName("com.mysql.jdbc.Driver");
    } catch (ClassNotFoundException e) {
    e.printStackTrace();
    }

    String url = "jdbc:mysql://"+hostname+":3306/"+databaseName+"?allowMultiQueries=true&useUnicode=true&" +
    "characterEncoding=UTF8&zeroDateTimeBehavior=convertToNull";
    //调用DriverManager对象的getConnection()方法,获得一个Connection对象

    try {
    LOG.info("user:{},password:{}",user,password);
    conn = DriverManager.getConnection(url, user, password);
    conn.setAutoCommit(false);
    //创建一个Statement对象
    preparedStatement = conn.prepareStatement("insert into bs_pollute(`key`,`value`,date_add) values (?,?,?)");

    } catch (SQLException e) {
    e.printStackTrace();
    }

    }

    @Override
    public void stop() {
    super.stop();
    if (conn != null) {
    try {
    conn.close();
    } catch (SQLException e) {
    e.printStackTrace();
    }
    }
    if (preparedStatement != null) {
    try {
    preparedStatement.close();
    } catch (SQLException e) {
    e.printStackTrace();
    }
    }

    if (conn != null) {
    try {
    conn.close();
    } catch (SQLException e) {
    e.printStackTrace();
    }
    }

    }

    }
    -----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
    package com.example.demo1.utils;

    import java.io.IOException;
    import java.text.ParseException;
    import java.text.SimpleDateFormat;
    import java.util.Date;
    import java.util.HashMap;
    import java.util.Map;

    /**
    * @author chenxiaokang
    * @date 2019/10/14 11:25
    */
    public class FormatDataUtil {

    private static String arg = "";
    private static String arg1 = ";";
    private static String arg2 = ",";
    private static String arg3 = "=";
    private static String arg4 = "&&";
    private static String arg6 = "##";
    private static String arg7 = "DataTime";

    public static void main(String[] args) throws IOException {
    String data = "QN=20040516010101001;ST=32;CN=1072;PW=123456;MN=88888880000001;Flag=3;CP=&&PW=654321&&";
    FileUtils.writeFile("polluteTxt",getMapByData(data));
    }

    /**
    * 将接收到的数据转为map
    * @param data
    * @return
    */
    public static Map<String, String> getMapByData(String data) {
    System.out.println("data:"+data);
    if (isEmpty(data)) {
    return null;
    }
    Map<String, String> map = new HashMap<String, String>();
    data = data.replaceAll(arg6, "");
    if(isContainsChars(data,arg1)){
    String[] temp = getArrayByChar(data, arg1);
    for (String s : temp) {
    // 如果有多个参数用逗号分割
    if (isContainsChars(s, arg2)) {
    String[] temp1 = getArrayByChar(s, arg2);
    for (String s1 : temp1) {
    intoMap(map, s1);
    }
    } else {
    intoMap(map,s);
    }
    }
    }
    return map;
    }

    /**
    * 因为字段中存在特殊的(CP=&&DataTime=20040516020111)格式,不光是以(key=value)格式
    * 所以需要对其进行处理
    * @param map
    * @param var
    */
    public static void intoMap(Map<String, String> map, String var) {
    // 数据中是否存在CP=&&DataTime=20040516020111;格式 如果有按&&分割
    if (isContainsChars(var, arg4)) {
    String[] temp2 = getArrayByChar(var, arg4);
    String[] temp3;
    if(temp2.length == 1){
    temp3 = getArrayByChar(temp2[0], arg3);
    }else{
    temp3 = getArrayByChar(temp2[1], arg3);
    }
    if (temp3.length == 2) {
    if(arg7.equals(temp3[0])){
    map.put(temp3[0],formatDateStr(temp3[1]));
    }else{
    map.put(temp3[0], temp3[1]);
    }
    }
    } else {
    // 数据直接是key=value格式
    String[] temp4 = getArrayByChar(var, arg3);
    if (temp4.length == 2) {
    if(arg7.equals(temp4[0])){
    map.put(temp4[0],formatDateStr(temp4[1]));
    }else{
    map.put(temp4[0], temp4[1]);
    }
    }
    }
    }

    /**
    * 根据指定字符分割字符串
    *
    * @param datastr
    * @param s
    * @return
    */
    public static String[] getArrayByChar(String datastr, String s) {
    if (isEmpty(datastr) || !isContainsChars(datastr, s)) {
    return null;
    }
    return datastr.trim().replaceAll("\s*", arg).split(s);
    }

    /**
    * 判断字符串是否包含指定字符串
    *
    * @param str
    * @return
    */
    public static boolean isContainsChars(String str, String arg) {
    if (isEmpty(str) || isEmpty(arg)) {
    return false;
    }
    return str.contains(arg);
    }
    /**
    * 判断字符串是否为空
    *
    * @param str
    * @return
    */
    public static boolean isEmpty(String str) {
    return str == null || arg.equals(str);
    }

    /**
    * 格式化时间字符串
    * @param datestr
    * @return
    */
    public static String formatDateStr(String datestr){
    if(isEmpty(datestr)){
    return arg;
    }
    SimpleDateFormat format = new SimpleDateFormat("yyyyMMddHHmmss");
    try {
    Date date = format.parse(datestr);
    return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(date);
    } catch (ParseException e) {
    e.printStackTrace();
    }
    return datestr;
    }

    }

    -----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

    包结构如下

      1.3 创建entity

    package com.example.demo1.entity;

    import java.util.Date;

    /**
    * 数据
    * @author chenxiaokang
    * @date 2019/10/15 10:26
    */
    public class Pollute {
    private String key;
    private String value;
    private Date dateAdd;

    public Date getDateAdd() {
    return dateAdd;
    }

    public void setDateAdd(Date dateAdd) {
    this.dateAdd = dateAdd;
    }

    public String getValue() {
    return value;
    }

    public void setValue(String value) {
    this.value = value;
    }

    public String getKey() {
    return key;
    }

    public void setKey(String key) {
    this.key = key;
    }
    }

      1.4 将项目打包成jar,并拷贝到flume中lib(需要将mysql的jar也要考进去)

    2.flume中conf配置(模拟数据需要将agent.sources.ri.type 改为 avro)并启动flume 命令/opt/flume/bin/flume-ng agent --conf-file /opt/flume/conf/flume-conf.properties -n agent -c /opt/flume/conf/  -Dflume.root.logger=DEBUG,console

    3.编写模拟发送数据类并启动

    package com.example.demo1.utils;

    import org.apache.flume.Event;
    import org.apache.flume.EventDeliveryException;
    import org.apache.flume.api.RpcClient;
    import org.apache.flume.api.RpcClientFactory;
    import org.apache.flume.event.EventBuilder;
    import java.nio.charset.Charset;

    /**
    * @author chenxiaokang
    * @date 2019/10/14 18:32
    */
    public class CouScoketUtil {

    public static void main(String[] args) throws Exception {
    MyRpcClientFacade client = new MyRpcClientFacade();
    // Initialize client with the remote Flume agent's host and port
    client.init("服务器IP地址", 7001);

    // Send 10 events to the remote Flume agent. That agent should be
    // configured to listen with an AvroSource.
    // String sampleData = "ERROR";
    String sampleData = "ST=91;CN=9011;PW=123456;MN=88888880000001;Flag=0;CP=&&QN=20040516010101001;QnRtn=1&&";
    System.out.println("发送数据");
    client.sendDataToFlume(sampleData);
    client.cleanUp();
    }
    }

    class MyRpcClientFacade {
    private RpcClient client;
    private String hostname;
    private int port;

    public void init(String hostname, int port) {
    // Setup the RPC connection
    this.hostname = hostname;
    this.port = port;
    this.client = RpcClientFactory.getDefaultInstance(hostname, port);
    System.out.println("建立连接");
    // Use the following method to create a thrift client (instead of the
    // above line):
    // this.client = RpcClientFactory.getThriftInstance(hostname, port);
    }

    public void sendDataToFlume(String data) {
    // Create a Flume Event object that encapsulates the sample data
    Event event = EventBuilder.withBody(data, Charset.forName("UTF-8"));
    try {
    client.append(event);
    } catch (EventDeliveryException e) {
    // clean up and recreate the client
    client.close();
    client = null;
    client = RpcClientFactory.getDefaultInstance(hostname, port);
    // Use the following method to create a thrift client (instead of
    // the above line):
    // this.client = RpcClientFactory.getThriftInstance(hostname, port);
    }
    }

    public void cleanUp() {
    // Close the RPC connection
    System.out.println("断开连接");
    client.close();
    }
    }

    4.数据保存成功

  • 相关阅读:
    mysql改为mysqli几项注意
    修改链接服务器地址提高下载速度
    果然最适合码农的博客还是博客园
    mysql
    php 检测字符集
    Internet Download Manager has been registered with a fake Serial Number
    SVN图标不见了
    理解createTrackbar函数
    程序块结构
    数组初始化
  • 原文地址:https://www.cnblogs.com/xiaokangk/p/11678008.html
Copyright © 2011-2022 走看看