Flume简介
Flume是一个分布式的,可靠的,可用的,可以非常有效率的对大数据的日志数据进行收集、聚集、转移。
每一个flume进程都有一个agent层,agent层包含有source、channel、sink:
- source:采集日志数据,并发送给channel
- channel:管道,用于连接source和sink,它是一个缓冲区,从source传来的数据会以event的形式在channel中排成队列,然后被sink取走。
- sink:获取channel中的数据,并存储到目标源中,目标源可以是HDFS和Hbase
安装与配置
下载 Flume 1.8.0
解压缩,在flume-env.sh
配置java环境变量,复制一份默认的flume-conf.properties
查看flume的版本
1 | bin/flume-ng version |
通过设置agent的配置文件,可以进行不同类型的数据收集,配置文件格式:
1 | list the sources, sinks and channels for the agent |
OK,可以运行实例了
发送一个文件给Flume
新建配置文件 avro.conf
1 | a1.sources = r1 |
启动Flume代理
1 | bin/flume-ng agent -c /Users/lvshuo/bigdata/flume/conf/ -f /Users/lvshuo/bigdata/flume/conf/avro.conf -n a1 -Dflume.root.logger=INFO,console |
最后控制台上出现 Avro source r1 started.
表示agent a1启动成功。
使用avro-client发送文件
首先创建一个log文件
1 | echo "Hello World" > test.log |
发送这个文件到上面配置文件设置的localhost:4141
1 | bin/flume-ng avro-client -c /Users/lvshuo/bigdata/flume/conf/ -H localhost -p 4141 -F /Users/lvshuo/test.log |
Flume控制台接受数据
发送文件之后,可以在控制台上看到刚才创建文件的内容:
Spool监测配置目录中的新文件
Spool监测配置的目录下新增的文件,并将文件中的数据读取出来。
需要注意两点:
- 1) 拷贝到spool目录下的文件不可以再打开编辑。
- 2) spool目录下不可包含相应的子目录
创建agent的配置文件 spool.conf
新建一个文件夹,存放被监测的log文件
1 | a1.sources = r1 |
启动代理 agent a1
1 | bin/flume-ng agent -c /Users/lvshuo/bigdata/flume/conf/ -f /Users/lvshuo/bigdata/flume/conf/spool.conf -n a1 -Dflume.root.logger=INFO,console |
模拟产生log文件
追加文件到被监测的文件夹中(/Users/lvshuo/bigdata/logs)
1 | cp /Users/lvshuo/test.log /Users/lvshuo/bigdata/logs |
Flume控制台接受数据
观察到控制台输出log中的内容:
Flume在传完文件之后,将会修改文件的后缀,变为.COMPLETED
Flume整合Kafka
上面的案例里面,Flume将监测到的Log文件内容输出到了控制台。在实际的项目中,经常使用Kafka作为数据中间件,来同时支持离线批处理和在线流式计算
对于Kafka来说,Flume既能作为生产者,又能作为消费者
Flume作为Kafka消费者:
Flume作为Kafka生产者:
下面的例子,Flume作为Kafka的生产者,将监控到的Log文件写入到Kafka的相关Topic中
Flume传输Log到Kafka中
下面是Flume的配置,
1 | Name component of agent |
相关Flume配置参考
启动Flume代理a1
1 | bin/flume-ng agent -c /Users/lvshuo/bigdata/flume/conf/ -f /Users/lvshuo/bigdata/flume/conf/flume-kafka.conf -n a1 -Dflume.root.logger=INFO,console |
查看FlumeLogTopic中的数据,发现log中的数据已经发送到Topic中,可以供下游消费者进行消费
1 | kafka-console-consumer.sh --zookeeper localhost:2181 --topic flumeLogTopic --from-beginning |