[ETL] Flume 理論與demo(Taildir Source & Hdfs Sink)
一、Flume簡介
1. Flume概述
Flume是Cloudera提供的一個高可用的,高可靠的,分布式的海量日志采集、聚合和傳輸?shù)南到y(tǒng),F(xiàn)lume支持在日志系統(tǒng)中定制各類數(shù)據(jù)發(fā)送方,用于收集數(shù)據(jù);同時,F(xiàn)lume提供對數(shù)據(jù)進(jìn)行簡單處理,并寫到各種數(shù)據(jù)接受方(可定制)的能力。
2. Flume系統(tǒng)功能
- 日志收集
Flume最早是Cloudera提供的日志收集系統(tǒng),目前是Apache下的一個孵化項目,F(xiàn)lume支持在日志系統(tǒng)中定制各類數(shù)據(jù)發(fā)送方,用于收集數(shù)據(jù)。
- 數(shù)據(jù)處理
Flume提供對數(shù)據(jù)進(jìn)行簡單處理,并寫到各種數(shù)據(jù)接受方(可定制)的能力, Flume提供了從console(控制臺)、RPC(Thrift-RPC)、text(文件)、tail(UNIX tail)、syslog(syslog日志系統(tǒng),支持TCP和UDP2種模式),exec(命令執(zhí)行)等數(shù)據(jù)源上收集數(shù)據(jù)的能力。
3. Flume的工作方式
Flume-og采用了多Master的方式。為了保證配置數(shù)據(jù)的一致性,F(xiàn)lume引入了ZooKeeper,用于保存配置數(shù)據(jù),ZooKeeper本身可保證配置數(shù)據(jù)的一致性和高可用,另外,在配置數(shù)據(jù)發(fā)生變化時,ZooKeeper可以通知Flume Master節(jié)點(diǎn)。Flume Master間使用gossip協(xié)議同步數(shù)據(jù)。
Flume-ng最明顯的改動就是取消了集中管理配置的 Master 和 Zookeeper,變?yōu)橐粋€純粹的傳輸工具。Flume-ng另一個主要的不同點(diǎn)是讀入數(shù)據(jù)和寫出數(shù)據(jù)現(xiàn)在由不同的工作線程處理(稱為 Runner)。 在 Flume-og 中,讀入線程同樣做寫出工作(除了故障重試)。如果寫出慢的話(不是完全失敗),它將阻塞 Flume 接收數(shù)據(jù)的能力。這種異步的設(shè)計使讀入線程可以順暢的工作而無需關(guān)注下游的任何問題。
4. Flume的安裝
如果使用Apache-Flume的話只要上傳安裝包到服務(wù)器,然后解壓,再配置一下環(huán)境變量即可。本文使用CDH-5.12.2安裝Flume。
二、Flume工作原理及組件詳解
1. Flume工作原理

Flume的主要工作就是啟動一個Agent,一個Agent由三個部分組成,Source、Sink和Channel。Source表示數(shù)據(jù)從哪里來,Sink表示數(shù)據(jù)收集到哪里去,Channel是一個緩沖區(qū)。更詳細(xì)介紹可以參看官方文檔。
2. Source組件
對于Source和Sink則根據(jù)不同的需求有很多種寫法。例如Source可以直接從一個文件/目錄中去取數(shù)據(jù),也可以別人直接給你傳數(shù)據(jù)。所以,總的來說Source只有兩類,一類是主動的Source、一類是被動的Source。
主動的Source就是將Source配置成主動的到別人那里去拿,主動的Source與被動的Source不同,它不是一個服務(wù)。主動的source有Exec Source、Spooling Directory Source、Taildir Source、Kafka Source等。
被動的Source就是別人給發(fā)過來,前提是Flume機(jī)器的source一定是一個服務(wù),可以是Http協(xié)議、tcp協(xié)議、Rcp協(xié)議的服務(wù),規(guī)定是哪種協(xié)議的服務(wù),B機(jī)器就按那種協(xié)議去發(fā)。總之,被動的Source就是一種服務(wù),至于使用什么協(xié)議就看實(shí)際需求。被動的source有Avro Source、Thrift Source、JMS Source、NetCat Source等。
3. Sink組件
一般來說,Sink沒有主動和被動之分。如果要將收集到的數(shù)據(jù)放到HDFS上的話,那么Sink就是Hdfs的客戶端;如果放到Kafka中,那么Sink就是Kafka的客戶端。總之,Sink一般都是做客戶端的。
4. Channel組件
對于Channel。只有兩種情況,一種緩沖在內(nèi)存中,一種緩沖在磁盤中。
三、Demo(Taildir Source & Hdfs Sink)
1. Taildir Source

相比于Spooldir Source,Taildir Source做了一些優(yōu)化。Spooldir Source讀取目錄時,文件在很短的時間內(nèi)不能修改,否則會報錯,導(dǎo)致Flume終止。而我們經(jīng)常需要上傳較大文件,當(dāng)文件達(dá)到幾MB或者十幾MB,F(xiàn)lume就會報錯。當(dāng)然,可以對Flume的源代碼進(jìn)行修改,來解決這個問題(可參見Flume Spooldir 源的一些問題)。Flume 1.7之后增加了Taildir Source,這個Source也可以解決這個問題。
其中,channels,type,filegroups,filegroups.<filegroupName>是必配屬性。type=TAILDIR;filegroups是給若干個目錄取別名,例如 g1 g2;filegroups.<filegroupName>是設(shè)置對應(yīng)目錄(g1或g2)下的文件匹配規(guī)則。
2. HDFS Sink

- 屬性的配置
配置基本屬性。其中,channel、type、hdfs.path是必配屬性。type=hdfs;hdfs.path=hdfs://namenode:rpc端口/path。
設(shè)置寫文件的方式。hdfs.rollInterval,間隔時間,每間隔多少秒寫一個文件;hdfs.rollSize,寫入一個文件的最大大小;hdfs.rollCount,設(shè)置往一個文件中寫數(shù)據(jù)的最大次數(shù)。HDFS Sink寫數(shù)據(jù)到HDFS中,有三種不同的方式來寫文件。第一種是按設(shè)置的hdfs.rollInterval間隔時間來寫,達(dá)到這么長時間就寫一個文件;第二種是根據(jù)文件的大小來生成文件,當(dāng)文件達(dá)到hdfs.rollSize設(shè)置的大小之后重新寫一個文件;第三種是按照設(shè)置的hdfs.rollCount每次寫的次數(shù),當(dāng)達(dá)到這個極限時關(guān)閉流,生成一個文件。如果三個都配了,無論哪個屬性先滿足設(shè)置,就會關(guān)閉流,生成一個文件。如果不考慮某種寫文件的方式,就將其屬性值設(shè)置為0。另外,hdfs.idleTimeout,設(shè)置超時時間,可以設(shè)置超過多少秒都沒有數(shù)據(jù)過來,無論是否滿足寫文件三個方式的設(shè)置,都會關(guān)閉流。
配置生成的文件名屬性。hdfs.filePrefix,生成的文件的前綴。hdfs.fileSuffix,生成文件的后綴。hdfs.inUsePrefix,是否使用用戶的前綴。
配置動態(tài)目錄屬性。需要用到的屬性為hdfs.rund,hdfs.roundValue,hdfs.roundUnit,hdfs.useLocalTimeStamp等。例如,本文的flume.conf最后四行配置,可以在hdfs上動態(tài)生成目錄,每隔10分鐘生成一個。
- 變量的使用
如下圖所示,可以使用這里的變量來設(shè)置一些動態(tài)的sink目錄,例如按照不同的時間(日期)來作為不同的sink目錄。
3. 配置flume.conf文件
a1.sources=s a1.channels=c a1.sinks=k a1.sources.s.type=TAILDIR a1.sources.s.filegroups=g1 a1.sources.s.filegroups.g1=/data/.*csv.* a1.sources.s.positionFile=/tmp/myflume/taildir_position.json a1.sources.s.channels=c a1.sources.s.fileHeader=true a1.channels.c.type=memory a1.channels.c.capacity=100 a1.channels.c.transactionCapacity=100 a1.channels.c.keep-alive=10 a1.channels.c.byteCapacity=0 a1.sinks.k.type=hdfs a1.sinks.k.channel=c a1.sinks.k.hdfs.path=hdfs://cdh01:8020/flume/%Y-%m-%d/%H%M a1.sinks.k.hdfs.rollInterval=60 a1.sinks.k.hdfs.rollSize=20971520 a1.sinks.k.hdfs.rollCount=0 a1.sinks.k.hdfs.idleTimeout=60 a1.sinks.k.hdfs.fileType=DataStream a1.sinks.k.hdfs.round=true a1.sinks.k.hdfs.roundValue=10 a1.sinks.k.hdfs.roundUnit=minute a1.sinks.k.hdfs.useLocalTimeStamp=true

浙公網(wǎng)安備 33010602011771號