软件环境
操作系统:Ubuntu Server 18.04
JDK:1.8.0
* 先登录root账号再进行以下操作
1.安装JDK
https://www.cnblogs.com/live41/p/14235891.html
2.下载安装包
假定下载到home目录
cd /home
http://flume.apache.org/download.html
下载apache-flume-1.9.0-bin.tar.gz文件
* 其中1.9.0是版本号,如果有新版就下载新版
3.解压安装包
tar -xvf apache-flume-1.9.0-bin.tar.gz
4.改名
mv apache-flume-1.9.0-bin flume cd /home/flume/conf mv flume-conf.properties.example flume-conf
5.配置系统环境变量
vim ~/.bashrc
添加以下内容:
export PATH=$PATH:home/flume/bin
更新环境变量
source ~/.bashrc
6.修改配置文件
cd /home/flume/conf
vim flume-conf
添加以下内容:
# Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444 # Describe the sink a1.sinks.k1.type = logger # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
这里的a1是agent名,也就是启动实例名,对应启动命令的agent名(--name)。
常修改的有2个属性
(1) a1.sources.r1.type
接收的数据源类型,官方说明文档:
http://flume.apache.org/FlumeUserGuide.html#flume-sources
* 常用的有:
netcat - 接收的是原始数据,一般是telnet测试时使用 avro - 只能接收用avro序列化后的数据,一般是实际生产时使用 spooling - 监听本地硬盘的指定目录的文件 kafka - 从kafka的topic获取数据
(2) a1.sinks.k1.type
接收数据的目的地类型(就是保存到哪里),官方说明文档:
http://flume.apache.org/FlumeUserGuide.html#flume-sinks
* 常用的有:
logger - 输出到log4j配置的地方 file_roll - 输出到指定目录 kafka - 输出数据到kafka的指定topic,大数据场景常用 hbase - 存入hbase hive - 存入hive elasticsearch - 输出到elasticsearch,检索类大数据场景常用 avro - 输出为以avro序列化的数据,一般是多级flume时使用。例如,flume1接收原始数据后,序列化为avro数据,再发送给flume2
7.启动
flume-ng agent --conf conf --conf-file /home/flume/conf/flume-conf --name a1 -Dflume.root.logger=INFO,console
也可以用缩写参数,下面语句跟上面的同样效果:
flume-ng agent -c conf -f /home/flume/conf/flume-conf -n a1 -Dflume.root.logger=INFO,console
* 这里的a1必须跟flume-conf里面的agent名一致
8.命令测试
由于上面的flume启动是用前台运行,独占了终端不能继续输入命令,所以如果要测试,有2个方法:
a.新开窗口来输入测试命令
b.使用&或nohup在后台运行flume,使用后只能用kill来关闭进程
flume-ng agent -c conf -f /home/flume/conf/flume-conf -n a1 -Dflume.root.logger=INFO,console & 或 nohup flume-ng agent -c conf -f /home/flume/conf/flume-conf -n a1 -Dflume.root.logger=INFO,console 或 nohup flume-ng agent -c conf -f /home/flume/conf/flume-conf -n a1 -Dflume.root.logger=INFO,console &
使用telnet命令进行测试:
telnet localhost 44444
然后输入任意内容,看到运行flume的窗口显示对应消息,就是部署成功了。
9.代码测试
测试之前要先把a1.sources.r1.type = netcat的改成a1.sources.r1.type = avro,否则会报错(因为API默认是发送avro序列化数据)。
public class FlumeSender { public static void main(String[] args) { String hostname = "localhost"; int port = 44444; RpcClient client = null; try { client = RpcClientFactory.getDefaultInstance(hostname, port); String data = "Hello Flume! times: "; for (int i = 0; i < 3; i++) { Event event = EventBuilder.withBody(data + i, Charset.forName("UTF-8")); client.append(event); } } catch (Exception e) { System.out.println(e); } finally { if (client != null) { client.close(); } } } }