这里只是做了简单的demo,并未深研
1.编写PolluteSink
1.1 maven创建项目(pom.xml)
<dependencies>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-configuration</artifactId>
<version>1.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-sdk</artifactId>
<version>1.6.0</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-support</artifactId>
<version>3.0.0-M2</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>6.0.6</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
1.2 自定义sink
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
package com.example.demo1.pollute;
import com.example.demo1.entity.Pollute;
import com.example.demo1.utils.FormatDataUtil;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.charset.Charset;
import java.sql.*;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
* 污染源在线自动监控(监测)系统数据传输标准(国标)
*
* @author chenxiaokang
* @date 2019/10/14 13:53
*/
public class PolluteSink extends AbstractSink implements Configurable {
private Logger LOG = LoggerFactory.getLogger(PolluteSink.class);
private String tableName;
private String user;
private String password;
private String hostname;
private String port;
private Connection conn;
private String databaseName;
private PreparedStatement preparedStatement;
@Override
public void configure(Context context) {
databaseName = context.getString("databaseName");
Preconditions.checkNotNull(databaseName, "databaseName must be set!!");
tableName = context.getString("tableName");
Preconditions.checkNotNull(tableName, "tableName must be set!!");
user = context.getString("user");
Preconditions.checkNotNull(user, "user must be set!!");
password = context.getString("password");
Preconditions.checkNotNull(password, "password must be set!!");
hostname = context.getString("hostname");
Preconditions.checkNotNull(hostname, "host must be set!!");
port = context.getString("port");
Preconditions.checkNotNull(port, "port must be set!!");
}
@Override
public Status process() throws EventDeliveryException {
Status result = Status.READY;
Channel channel = getChannel();
List<String> actions = Lists.newArrayList();
Transaction transaction = null;
try {
transaction = channel.getTransaction();
Event event;
String content;
transaction.begin();
event = channel.take();
LOG.info("event:{}",event);
if (event != null) {
System.out.println("content++++:"+event.getBody());
content = new String(event.getBody(), Charset.forName("UTF-8"));
System.out.println("content"+content);
actions.add(content);
} else {
result = Status.BACKOFF;
}
Map<String, String> map = null;
System.out.println("actions.size() :"+actions.size() );
if (actions.size() > 0) {
for (String temp : actions) {
LOG.info("--- content : " + temp);
System.out.println(temp);
map = FormatDataUtil.getMapByData(temp);
List<Pollute> pollutes = new ArrayList<>();
for (Map.Entry<String, String> entry : map.entrySet()) {
Pollute pollute = new Pollute();
pollute.setKey(entry.getKey());
pollute.setValue(entry.getValue());
pollutes.add(pollute);
}
if (pollutes.size() > 0) {
preparedStatement.clearBatch();
for (Pollute pollute : pollutes) {
preparedStatement.setString(1, pollute.getKey());
preparedStatement.setString(2, pollute.getValue());
preparedStatement.setTimestamp(3,new Timestamp(System.currentTimeMillis()));
preparedStatement.addBatch();
}
preparedStatement.executeBatch();
conn.commit();
}
}
}
transaction.commit();
} catch (Exception e) {
e.printStackTrace();
LOG.error("", e);
} finally {
if (transaction != null) {
transaction.close();
}
}
return result;
}
@Override
public void start() {
super.start();
try {
//调用Class.forName()方法加载驱动程序
Class.forName("com.mysql.jdbc.Driver");
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
String url = "jdbc:mysql://"+hostname+":3306/"+databaseName+"?allowMultiQueries=true&useUnicode=true&" +
"characterEncoding=UTF8&zeroDateTimeBehavior=convertToNull";
//调用DriverManager对象的getConnection()方法,获得一个Connection对象
try {
LOG.info("user:{},password:{}",user,password);
conn = DriverManager.getConnection(url, user, password);
conn.setAutoCommit(false);
//创建一个Statement对象
preparedStatement = conn.prepareStatement("insert into bs_pollute(`key`,`value`,date_add) values (?,?,?)");
} catch (SQLException e) {
e.printStackTrace();
}
}
@Override
public void stop() {
super.stop();
if (conn != null) {
try {
conn.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
if (preparedStatement != null) {
try {
preparedStatement.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
if (conn != null) {
try {
conn.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
}
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
package com.example.demo1.utils;
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
/**
* @author chenxiaokang
* @date 2019/10/14 11:25
*/
public class FormatDataUtil {
private static String arg = "";
private static String arg1 = ";";
private static String arg2 = ",";
private static String arg3 = "=";
private static String arg4 = "&&";
private static String arg6 = "##";
private static String arg7 = "DataTime";
public static void main(String[] args) throws IOException {
String data = "QN=20040516010101001;ST=32;CN=1072;PW=123456;MN=88888880000001;Flag=3;CP=&&PW=654321&&";
FileUtils.writeFile("polluteTxt",getMapByData(data));
}
/**
* 将接收到的数据转为map
* @param data
* @return
*/
public static Map<String, String> getMapByData(String data) {
System.out.println("data:"+data);
if (isEmpty(data)) {
return null;
}
Map<String, String> map = new HashMap<String, String>();
data = data.replaceAll(arg6, "");
if(isContainsChars(data,arg1)){
String[] temp = getArrayByChar(data, arg1);
for (String s : temp) {
// 如果有多个参数用逗号分割
if (isContainsChars(s, arg2)) {
String[] temp1 = getArrayByChar(s, arg2);
for (String s1 : temp1) {
intoMap(map, s1);
}
} else {
intoMap(map,s);
}
}
}
return map;
}
/**
* 因为字段中存在特殊的(CP=&&DataTime=20040516020111)格式,不光是以(key=value)格式
* 所以需要对其进行处理
* @param map
* @param var
*/
public static void intoMap(Map<String, String> map, String var) {
// 数据中是否存在CP=&&DataTime=20040516020111;格式 如果有按&&分割
if (isContainsChars(var, arg4)) {
String[] temp2 = getArrayByChar(var, arg4);
String[] temp3;
if(temp2.length == 1){
temp3 = getArrayByChar(temp2[0], arg3);
}else{
temp3 = getArrayByChar(temp2[1], arg3);
}
if (temp3.length == 2) {
if(arg7.equals(temp3[0])){
map.put(temp3[0],formatDateStr(temp3[1]));
}else{
map.put(temp3[0], temp3[1]);
}
}
} else {
// 数据直接是key=value格式
String[] temp4 = getArrayByChar(var, arg3);
if (temp4.length == 2) {
if(arg7.equals(temp4[0])){
map.put(temp4[0],formatDateStr(temp4[1]));
}else{
map.put(temp4[0], temp4[1]);
}
}
}
}
/**
* 根据指定字符分割字符串
*
* @param datastr
* @param s
* @return
*/
public static String[] getArrayByChar(String datastr, String s) {
if (isEmpty(datastr) || !isContainsChars(datastr, s)) {
return null;
}
return datastr.trim().replaceAll("\s*", arg).split(s);
}
/**
* 判断字符串是否包含指定字符串
*
* @param str
* @return
*/
public static boolean isContainsChars(String str, String arg) {
if (isEmpty(str) || isEmpty(arg)) {
return false;
}
return str.contains(arg);
}
/**
* 判断字符串是否为空
*
* @param str
* @return
*/
public static boolean isEmpty(String str) {
return str == null || arg.equals(str);
}
/**
* 格式化时间字符串
* @param datestr
* @return
*/
public static String formatDateStr(String datestr){
if(isEmpty(datestr)){
return arg;
}
SimpleDateFormat format = new SimpleDateFormat("yyyyMMddHHmmss");
try {
Date date = format.parse(datestr);
return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(date);
} catch (ParseException e) {
e.printStackTrace();
}
return datestr;
}
}
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
包结构如下
1.3 创建entity
package com.example.demo1.entity;
import java.util.Date;
/**
* 数据
* @author chenxiaokang
* @date 2019/10/15 10:26
*/
public class Pollute {
private String key;
private String value;
private Date dateAdd;
public Date getDateAdd() {
return dateAdd;
}
public void setDateAdd(Date dateAdd) {
this.dateAdd = dateAdd;
}
public String getValue() {
return value;
}
public void setValue(String value) {
this.value = value;
}
public String getKey() {
return key;
}
public void setKey(String key) {
this.key = key;
}
}
1.4 将项目打包成jar,并拷贝到flume中lib(需要将mysql的jar也要考进去)
2.flume中conf配置(模拟数据需要将agent.sources.ri.type 改为 avro)并启动flume 命令/opt/flume/bin/flume-ng agent --conf-file /opt/flume/conf/flume-conf.properties -n agent -c /opt/flume/conf/ -Dflume.root.logger=DEBUG,console
3.编写模拟发送数据类并启动
package com.example.demo1.utils;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.api.RpcClient;
import org.apache.flume.api.RpcClientFactory;
import org.apache.flume.event.EventBuilder;
import java.nio.charset.Charset;
/**
* @author chenxiaokang
* @date 2019/10/14 18:32
*/
public class CouScoketUtil {
public static void main(String[] args) throws Exception {
MyRpcClientFacade client = new MyRpcClientFacade();
// Initialize client with the remote Flume agent's host and port
client.init("服务器IP地址", 7001);
// Send 10 events to the remote Flume agent. That agent should be
// configured to listen with an AvroSource.
// String sampleData = "ERROR";
String sampleData = "ST=91;CN=9011;PW=123456;MN=88888880000001;Flag=0;CP=&&QN=20040516010101001;QnRtn=1&&";
System.out.println("发送数据");
client.sendDataToFlume(sampleData);
client.cleanUp();
}
}
class MyRpcClientFacade {
private RpcClient client;
private String hostname;
private int port;
public void init(String hostname, int port) {
// Setup the RPC connection
this.hostname = hostname;
this.port = port;
this.client = RpcClientFactory.getDefaultInstance(hostname, port);
System.out.println("建立连接");
// Use the following method to create a thrift client (instead of the
// above line):
// this.client = RpcClientFactory.getThriftInstance(hostname, port);
}
public void sendDataToFlume(String data) {
// Create a Flume Event object that encapsulates the sample data
Event event = EventBuilder.withBody(data, Charset.forName("UTF-8"));
try {
client.append(event);
} catch (EventDeliveryException e) {
// clean up and recreate the client
client.close();
client = null;
client = RpcClientFactory.getDefaultInstance(hostname, port);
// Use the following method to create a thrift client (instead of
// the above line):
// this.client = RpcClientFactory.getThriftInstance(hostname, port);
}
}
public void cleanUp() {
// Close the RPC connection
System.out.println("断开连接");
client.close();
}
}