成人AV在线无码|婷婷五月激情色,|伊人加勒比二三四区|国产一区激情都市|亚洲AV无码电影|日av韩av无码|天堂在线亚洲Av|无码一区二区影院|成人无码毛片AV|超碰在线看中文字幕

深入剖析美團(tuán)基于Flume的網(wǎng)站日志收集系統(tǒng)

美團(tuán)的日志收集系統(tǒng)負(fù)責(zé)美團(tuán)的所有業(yè)務(wù)日志的收集,并分別給Hadoop平臺(tái)提供離線數(shù)據(jù)和Storm平臺(tái)提供實(shí)時(shí)數(shù)據(jù)流。美團(tuán)的日志收集系統(tǒng)基于Flume設(shè)計(jì)和搭建而成。《基于Flume的美團(tuán)日志收集系統(tǒng)》

美團(tuán)的日志收集系統(tǒng)負(fù)責(zé)美團(tuán)的所有業(yè)務(wù)日志的收集,并分別給Hadoop平臺(tái)提供離線數(shù)據(jù)和Storm平臺(tái)提供實(shí)時(shí)數(shù)據(jù)流。美團(tuán)的日志收集系統(tǒng)基于Flume設(shè)計(jì)和搭建而成。

《基于Flume的美團(tuán)日志收集系統(tǒng)》將分兩部分給讀者呈現(xiàn)美團(tuán)日志收集系統(tǒng)的架構(gòu)設(shè)計(jì)和實(shí)戰(zhàn)經(jīng)驗(yàn)。

第一部分架構(gòu)和設(shè)計(jì),將主要著眼于日志收集系統(tǒng)整體的架構(gòu)設(shè)計(jì),以及為什么要做這樣的設(shè)計(jì)。

第二部分改進(jìn)和優(yōu)化,將主要著眼于實(shí)際部署和使用過程中遇到的問題,對(duì)Flume做的功能修改和優(yōu)化等。

1 日志收集系統(tǒng)簡(jiǎn)介日志收集是大數(shù)據(jù)的基石。

許多公司的業(yè)務(wù)平臺(tái)每天都會(huì)產(chǎn)生大量的日志數(shù)據(jù)。收集業(yè)務(wù)日志數(shù)據(jù),供離線和在線的分析系統(tǒng)使用,正是日志收集系統(tǒng)的要做的事情。高可用性,高可靠性和可擴(kuò)展性是日志收集系統(tǒng)所具有的基本特征。

目前常用的開源日志收集系統(tǒng)有Flume, Scribe等。Flume是Cloudera提供的一個(gè)高可用的,高可靠的,分布式的海量日志采集、聚合和傳輸?shù)南到y(tǒng),目前已經(jīng)是Apache的一個(gè)子項(xiàng)目。Scribe是Facebook開源的日志收集系統(tǒng),它為日志的分布式收集,統(tǒng)一處理提供一個(gè)可擴(kuò)展的,高容錯(cuò)的簡(jiǎn)單方案。

2 常用的開源日志收集系統(tǒng)對(duì)比下面將對(duì)常見的開源日志收集系統(tǒng)Flume和Scribe的各方面進(jìn)行對(duì)比。對(duì)比中Flume將主要采用Apache下的Flume-NG為參考對(duì)象。同時(shí),美團(tuán)將常用的日志收集系統(tǒng)分為三層(Agent層,Collector層和Store層)來進(jìn)行對(duì)比。

3 美團(tuán)日志收集系統(tǒng)架構(gòu)美團(tuán)的日志收集系統(tǒng)負(fù)責(zé)美團(tuán)的所有業(yè)務(wù)日志的收集,并分別給Hadoop平臺(tái)提供離線數(shù)據(jù)和Storm平臺(tái)提供實(shí)時(shí)數(shù)據(jù)流。美團(tuán)的日志收集系統(tǒng)基于Flume設(shè)計(jì)和搭建而成。目前每天收集和處理約T級(jí)別的日志數(shù)據(jù)。

下圖是美團(tuán)的日志收集系統(tǒng)的整體框架圖。

a. 整個(gè)系統(tǒng)分為三層:Agent層,Collector層和Store層。其中Agent層每個(gè)機(jī)器部署一個(gè)進(jìn)程,負(fù)責(zé)對(duì)單機(jī)的日志收集工作;Collector層部署在中心服務(wù)器上,負(fù)責(zé)接收Agent層發(fā)送的日志,并且將日志根據(jù)路由規(guī)則寫到相應(yīng)的Store層中;Store層負(fù)責(zé)提供永久或者臨時(shí)的日志存儲(chǔ)服務(wù),或者將日志流導(dǎo)向其它服務(wù)器。

b. Agent到Collector使用LoadBalance策略,將所有的日志均衡地發(fā)到所有的Collector上,達(dá)到負(fù)載均衡的目標(biāo),同時(shí)并處理單個(gè)Collector失效的問題。

c. Collector層的目標(biāo)主要有三個(gè):SinkHdfs, SinkKafka和SinkBypass。分別提供離線的數(shù)據(jù)到Hdfs,和提供實(shí)時(shí)的日志流到Kafka和Bypass。其中SinkHdfs又根據(jù)日志量的大小分為SinkHdfs_b,SinkHdfs_m和SinkHdfs_s三個(gè)Sink,以提高寫入到Hdfs的性能,具體見后面介紹。

d. 對(duì)于Store來說,Hdfs負(fù)責(zé)永久地存儲(chǔ)所有日志;Kafka存儲(chǔ)最新的7天日志,并給Storm系統(tǒng)提供實(shí)時(shí)日志流;Bypass負(fù)責(zé)給其它服務(wù)器和應(yīng)用提供實(shí)時(shí)日志流。

下圖是美團(tuán)的日志收集系統(tǒng)的模塊分解圖,詳解Agent, Collector和Bypass中的Source, Channel和Sink的關(guān)系。

a. 模塊命名規(guī)則:所有的Source以src開頭,所有的Channel以ch開頭,所有的Sink以sink開頭;

b. Channel統(tǒng)一使用美團(tuán)開發(fā)的DualChannel,具體原因后面詳述;對(duì)于過濾掉的日志使用NullChannel,具體原因后面詳述;

c. 模塊之間內(nèi)部通信統(tǒng)一使用Avro接口;

4 架構(gòu)設(shè)計(jì)考慮下面將從可用性,可靠性,可擴(kuò)展性和兼容性等方面,對(duì)上述的架構(gòu)做細(xì)致的解析。

4.1 可用性(availablity)對(duì)日志收集系統(tǒng)來說,可用性(availablity)指固定周期內(nèi)系統(tǒng)無故障運(yùn)行總時(shí)間。要想提高系統(tǒng)的可用性,就需要消除系統(tǒng)的單點(diǎn),提高系統(tǒng)的冗余度。下面來看看美團(tuán)的日志收集系統(tǒng)在可用性方面的考慮。

4.1.1 Agent死掉Agent死掉分為兩種情況:機(jī)器死機(jī)或者Agent進(jìn)程死掉。

對(duì)于機(jī)器死機(jī)的情況來說,由于產(chǎn)生日志的進(jìn)程也同樣會(huì)死掉,所以不會(huì)再產(chǎn)生新的日志,不存在不提供服務(wù)的情況。

對(duì)于Agent進(jìn)程死掉的情況來說,確實(shí)會(huì)降低系統(tǒng)的可用性。對(duì)此,美團(tuán)有下面三種方式來提高系統(tǒng)的可用性。首先,所有的Agent在supervise的方式下啟動(dòng),如果進(jìn)程死掉會(huì)被系統(tǒng)立即重啟,以提供服務(wù)。其次,對(duì)所有的Agent進(jìn)行存活監(jiān)控,發(fā)現(xiàn)Agent死掉立即報(bào)警。最后,對(duì)于非常重要的日志,建議應(yīng)用直接將日志寫磁盤,Agent使用spooldir的方式獲得最新的日志。

4.1.2 Collector死掉由于中心服務(wù)器提供的是對(duì)等的且無差別的服務(wù),且Agent訪問Collector做了LoadBalance和重試機(jī)制。所以當(dāng)某個(gè)Collector無法提供服務(wù)時(shí),Agent的重試策略會(huì)將數(shù)據(jù)發(fā)送到其它可用的Collector上面。所以整個(gè)服務(wù)不受影響。

4.1.3 Hdfs正常停機(jī)美團(tuán)在Collector的HdfsSink中提供了開關(guān)選項(xiàng),可以控制Collector停止寫Hdfs,并且將所有的events緩存到FileChannel的功能。

4.1.4 Hdfs異常停機(jī)或不可訪問假如Hdfs異常停機(jī)或不可訪問,此時(shí)Collector無法寫Hdfs。由于美團(tuán)使用DualChannel,Collector可以將所收到的events緩存到FileChannel,保存在磁盤上,繼續(xù)提供服務(wù)。當(dāng)Hdfs恢復(fù)服務(wù)以后,再將FileChannel中緩存的events再發(fā)送到Hdfs上。這種機(jī)制類似于Scribe,可以提供較好的容錯(cuò)性。

4.1.5 Collector變慢或者Agent/Collector網(wǎng)絡(luò)變慢如果Collector處理速度變慢(比如機(jī)器load過高)或者Agent/Collector之間的網(wǎng)絡(luò)變慢,可能導(dǎo)致Agent發(fā)送到Collector的速度變慢。同樣的,對(duì)于此種情況,美團(tuán)在Agent端使用DualChannel,Agent可以將收到的events緩存到FileChannel,保存在磁盤上,繼續(xù)提供服務(wù)。當(dāng)Collector恢復(fù)服務(wù)以后,再將FileChannel中緩存的events再發(fā)送給Collector。

4.1.6 Hdfs變慢當(dāng)Hadoop上的任務(wù)較多且有大量的讀寫操作時(shí),Hdfs的讀寫數(shù)據(jù)往往變的很慢。由于每天,每周都有高峰使用期,所以這種情況非常普遍。

對(duì)于Hdfs變慢的問題,美團(tuán)同樣使用DualChannel來解決。當(dāng)Hdfs寫入較快時(shí),所有的events只經(jīng)過MemChannel傳遞數(shù)據(jù),減少磁盤IO,獲得較高性能。當(dāng)Hdfs寫入較慢時(shí),所有的events只經(jīng)過FileChannel傳遞數(shù)據(jù),有一個(gè)較大的數(shù)據(jù)緩存空間。

4.2 可靠性(reliability)對(duì)日志收集系統(tǒng)來說,可靠性(reliability)是指Flume在數(shù)據(jù)流的傳輸過程中,保證events的可靠傳遞。

對(duì)Flume來說,所有的events都被保存在Agent的Channel中,然后被發(fā)送到數(shù)據(jù)流中的下一個(gè)Agent或者最終的存儲(chǔ)服務(wù)中。那么一個(gè)Agent的Channel中的events什么時(shí)候被刪除呢?當(dāng)且僅當(dāng)它們被保存到下一個(gè)Agent的Channel中或者被保存到最終的存儲(chǔ)服務(wù)中。這就是Flume提供數(shù)據(jù)流中點(diǎn)到點(diǎn)的可靠性保證的最基本的單跳消息傳遞語義。

那么Flume是如何做到上述最基本的消息傳遞語義呢?

首先,Agent間的事務(wù)交換。Flume使用事務(wù)的辦法來保證event的可靠傳遞。Source和Sink分別被封裝在事務(wù)中,這些事務(wù)由保存event的存儲(chǔ)提供或者由Channel提供。這就保證了event在數(shù)據(jù)流的點(diǎn)對(duì)點(diǎn)傳輸中是可靠的。在多級(jí)數(shù)據(jù)流中,如下圖,上一級(jí)的Sink和下一級(jí)的Source都被包含在事務(wù)中,保證數(shù)據(jù)可靠地從一個(gè)Channel到另一個(gè)Channel轉(zhuǎn)移。

其次,數(shù)據(jù)流中 Channel的持久性。Flume中MemoryChannel是可能丟失數(shù)據(jù)的(當(dāng)Agent死掉時(shí)),而FileChannel是持久性的,提供類似mysql的日志機(jī)制,保證數(shù)據(jù)不丟失。

4.3 可擴(kuò)展性(scalability)對(duì)日志收集系統(tǒng)來說,可擴(kuò)展性(scalability)是指系統(tǒng)能夠線性擴(kuò)展。當(dāng)日志量增大時(shí),系統(tǒng)能夠以簡(jiǎn)單的增加機(jī)器來達(dá)到線性擴(kuò)容的目的。

對(duì)于基于Flume的日志收集系統(tǒng)來說,需要在設(shè)計(jì)的每一層,都可以做到線性擴(kuò)展地提供服務(wù)。下面將對(duì)每一層的可擴(kuò)展性做相應(yīng)的說明。

4.3.1 Agent層對(duì)于Agent這一層來說,每個(gè)機(jī)器部署一個(gè)Agent,可以水平擴(kuò)展,不受限制。一個(gè)方面,Agent收集日志的能力受限于機(jī)器的性能,正常情況下一個(gè)Agent可以為單機(jī)提供足夠服務(wù)。另一方面,如果機(jī)器比較多,可能受限于后端Collector提供的服務(wù),但Agent到Collector是有Load Balance機(jī)制,使得Collector可以線性擴(kuò)展提高能力。

4.3.2 Collector層對(duì)于Collector這一層,Agent到Collector是有Load Balance機(jī)制,并且Collector提供無差別服務(wù),所以可以線性擴(kuò)展。其性能主要受限于Store層提供的能力。

4.3.3 Store層對(duì)于Store這一層來說,Hdfs和Kafka都是分布式系統(tǒng),可以做到線性擴(kuò)展。Bypass屬于臨時(shí)的應(yīng)用,只對(duì)應(yīng)于某一類日志,性能不是瓶頸。

4.4 Channel的選擇Flume1.4.0中,其官方提供常用的MemoryChannel和FileChannel供大家選擇。其優(yōu)劣如下:

MemoryChannel: 所有的events被保存在內(nèi)存中。優(yōu)點(diǎn)是高吞吐。缺點(diǎn)是容量有限并且Agent死掉時(shí)會(huì)丟失內(nèi)存中的數(shù)據(jù)。FileChannel: 所有的events被保存在文件中。優(yōu)點(diǎn)是容量較大且死掉時(shí)數(shù)據(jù)可恢復(fù)。缺點(diǎn)是速度較慢。上述兩種Channel,優(yōu)缺點(diǎn)相反,分別有自己適合的場(chǎng)景。然而,對(duì)于大部分應(yīng)用來說,美團(tuán)希望Channel可以同提供高吞吐和大緩存?;诖?,美團(tuán)開發(fā)了DualChannel。

DualChannel:基于 MemoryChannel和 FileChannel開發(fā)。當(dāng)堆積在Channel中的events數(shù)小于閾值時(shí),所有的events被保存在MemoryChannel中,Sink從MemoryChannel中讀取數(shù)據(jù); 當(dāng)堆積在Channel中的events數(shù)大于閾值時(shí), 所有的events被自動(dòng)存放在FileChannel中,Sink從FileChannel中讀取數(shù)據(jù)。這樣當(dāng)系統(tǒng)正常運(yùn)行時(shí),美團(tuán)可以使用MemoryChannel的高吞吐特性;當(dāng)系統(tǒng)有異常時(shí),美團(tuán)可以利用FileChannel的大緩存的特性。4.5 和scribe兼容在設(shè)計(jì)之初,美團(tuán)就要求每類日志都有一個(gè)category相對(duì)應(yīng),并且Flume的Agent提供AvroSource和ScribeSource兩種服務(wù)。這將保持和之前的Scribe相對(duì)應(yīng),減少業(yè)務(wù)的更改成本。

4.6 權(quán)限控制在目前的日志收集系統(tǒng)中,美團(tuán)只使用最簡(jiǎn)單的權(quán)限控制。只有設(shè)定的category才可以進(jìn)入到存儲(chǔ)系統(tǒng)。所以目前的權(quán)限控制就是category過濾。

如果權(quán)限控制放在Agent端,優(yōu)勢(shì)是可以較好地控制垃圾數(shù)據(jù)在系統(tǒng)中流轉(zhuǎn)。但劣勢(shì)是配置修改麻煩,每增加一個(gè)日志就需要重啟或者重載Agent的配置。

如果權(quán)限控制放在Collector端,優(yōu)勢(shì)是方便進(jìn)行配置的修改和加載。劣勢(shì)是部分沒有注冊(cè)的數(shù)據(jù)可能在Agent/Collector之間傳輸。

考慮到Agent/Collector之間的日志傳輸并非系統(tǒng)瓶頸,且目前日志收集屬內(nèi)部系統(tǒng),安全問題屬于次要問題,所以選擇采用Collector端控制。

4.7 提供實(shí)時(shí)流美團(tuán)的部分業(yè)務(wù),如實(shí)時(shí)推薦,反爬蟲服務(wù)等服務(wù),需要處理實(shí)時(shí)的數(shù)據(jù)流。因此美團(tuán)希望Flume能夠?qū)С鲆环輰?shí)時(shí)流給Kafka/Storm系統(tǒng)。

一個(gè)非常重要的要求是實(shí)時(shí)數(shù)據(jù)流不應(yīng)該受到其它Sink的速度影響,保證實(shí)時(shí)數(shù)據(jù)流的速度。這一點(diǎn),美團(tuán)是通過Collector中設(shè)置不同的Channel進(jìn)行隔離,并且DualChannel的大容量保證了日志的處理不受Sink的影響。

5 系統(tǒng)監(jiān)控對(duì)于一個(gè)大型復(fù)雜系統(tǒng)來說,監(jiān)控是必不可少的部分。設(shè)計(jì)合理的監(jiān)控,可以對(duì)異常情況及時(shí)發(fā)現(xiàn),只要有一部手機(jī),就可以知道系統(tǒng)是否正常運(yùn)作。對(duì)于美團(tuán)的日志收集系統(tǒng),美團(tuán)建立了多維度的監(jiān)控,防止未知的異常發(fā)生。

5.1 發(fā)送速度,擁堵情況,寫Hdfs速度通過發(fā)送給zabbix的數(shù)據(jù),美團(tuán)可以繪制出發(fā)送數(shù)量、擁堵情況和寫Hdfs速度的圖表,對(duì)于超預(yù)期的擁堵,美團(tuán)會(huì)報(bào)警出來查找原因。

下面是Flume Collector HdfsSink寫數(shù)據(jù)到Hdfs的速度截圖:

下面是Flume Collector的FileChannel中擁堵的events數(shù)據(jù)量截圖:

5.2 flume寫hfds狀態(tài)的監(jiān)控Flume寫入Hdfs會(huì)先生成tmp文件,對(duì)于特別重要的日志,美團(tuán)會(huì)每15分鐘左右檢查一下各個(gè)Collector是否都產(chǎn)生了tmp文件,對(duì)于沒有正常產(chǎn)生tmp文件的Collector和日志美團(tuán)需要檢查是否有異常。這樣可以及時(shí)發(fā)現(xiàn)Flume和日志的異常.

5.3 日志大小異常監(jiān)控對(duì)于重要的日志,美團(tuán)會(huì)每個(gè)小時(shí)都監(jiān)控日志大小周同比是否有較大波動(dòng),并給予提醒,這個(gè)報(bào)警有效的發(fā)現(xiàn)了異常的日志,且多次發(fā)現(xiàn)了應(yīng)用方日志發(fā)送的異常,及時(shí)給予了對(duì)方反饋,幫助他們及早修復(fù)自身系統(tǒng)的異常。

通過上述的講解,美團(tuán)可以看到,基于Flume的美團(tuán)日志收集系統(tǒng)已經(jīng)是具備高可用性,高可靠性,可擴(kuò)展等特性的分布式服務(wù)。

改進(jìn)和優(yōu)化下面,美團(tuán)將會(huì)講述在實(shí)際部署和使用過程中遇到的問題,對(duì)Flume的功能改進(jìn)和對(duì)系統(tǒng)做的優(yōu)化。

1 Flume的問題總結(jié)在Flume的使用過程中,遇到的主要問題如下:

a. Channel水土不服”:使用固定大小的MemoryChannel在日志高峰時(shí)常報(bào)隊(duì)列大小不夠的異常;使用FileChannel又導(dǎo)致IO繁忙的問題;

b. HdfsSink的性能問題:使用HdfsSink向Hdfs寫日志,在高峰時(shí)間速度較慢;

c. 系統(tǒng)的管理問題:配置升級(jí),模塊重啟等;

2 Flume的功能改進(jìn)和優(yōu)化點(diǎn)從上面的問題中可以看到,有一些需求是原生Flume無法滿足的,因此,基于開源的Flume美團(tuán)增加了許多功能,修改了一些Bug,并且進(jìn)行一些調(diào)優(yōu)。下面將對(duì)一些主要的方面做一些說明。

2.1 增加Zabbix monitor服務(wù)一方面,F(xiàn)lume本身提供了http, ganglia的監(jiān)控服務(wù),而美團(tuán)目前主要使用zabbix做監(jiān)控。因此,美團(tuán)為Flume添加了zabbix監(jiān)控模塊,和sa的監(jiān)控服務(wù)無縫融合。

另一方面,凈化Flume的metrics。只將美團(tuán)需要的metrics發(fā)送給zabbix,避免 zabbix server造成壓力。目前美團(tuán)最為關(guān)心的是Flume能否及時(shí)把應(yīng)用端發(fā)送過來的日志寫到Hdfs上, 對(duì)應(yīng)關(guān)注的metrics為:

Source : 接收的event數(shù)和處理的event數(shù)Channel : Channel中擁堵的event數(shù)Sink : 已經(jīng)處理的event數(shù)

2.2 為HdfsSink增加自動(dòng)創(chuàng)建index功能首先,美團(tuán)的HdfsSink寫到hadoop的文件采用lzo壓縮存儲(chǔ)。 HdfsSink可以讀取hadoop配置文件中提供的編碼類列表,然后通過配置的方式獲取使用何種壓縮編碼,美團(tuán)目前使用lzo壓縮數(shù)據(jù)。采用lzo壓縮而非bz2壓縮,是基于以下測(cè)試數(shù)據(jù):

其次,美團(tuán)的HdfsSink增加了創(chuàng)建lzo文件后自動(dòng)創(chuàng)建index功能。Hadoop提供了對(duì)lzo創(chuàng)建索引,使得壓縮文件是可切分的,這樣Hadoop Job可以并行處理數(shù)據(jù)文件。HdfsSink本身lzo壓縮,但寫完lzo文件并不會(huì)建索引,美團(tuán)在close文件之后添加了建索引功能。

Java Code復(fù)制內(nèi)容到剪貼板 /** *RenamebucketPathfilefrom.tmptopermanentlocation. */ privatevoidrenameBucket()throwsIOException,InterruptedException{ if(bucketPath.equals(targetPath)){ return; } finalPathsrcPath=newPath(bucketPath); finalPathdstPath=newPath(targetPath); callWithTimeout(newCallRunner<Object>(){ @Override publicObjectcall()throwsException{ if(fileSystem.exists(srcPath)){//couldblock LOG.info("Renaming"+srcPath+"to"+dstPath); fileSystem.rename(srcPath,dstPath);//couldblock //indexthedstPathlzofile if(codeC!=null&&".lzo".equals(codeC.getDefaultExtension())){ LzoIndexerlzoIndexer=newLzoIndexer(newConfiguration()); lzoIndexer.index(dstPath); } } returnnull; } }); }

2.3 增加HdfsSink的開關(guān)美團(tuán)在HdfsSink和DualChannel中增加開關(guān),當(dāng)開關(guān)打開的情況下,HdfsSink不再往Hdfs上寫數(shù)據(jù),并且數(shù)據(jù)只寫向DualChannel中的FileChannel。以此策略來防止Hdfs的正常停機(jī)維護(hù)。

2.4 增加DualChannelFlume本身提供了MemoryChannel和FileChannel。MemoryChannel處理速度快,但緩存大小有限,且沒有持久化;FileChannel則剛好相反。美團(tuán)希望利用兩者的優(yōu)勢(shì),在Sink處理速度夠快,Channel沒有緩存過多日志的時(shí)候,就使用MemoryChannel,當(dāng)Sink處理速度跟不上,又需要Channel能夠緩存下應(yīng)用端發(fā)送過來的日志時(shí),就使用FileChannel,由此美團(tuán)開發(fā)了DualChannel,能夠智能的在兩個(gè)Channel之間切換。

其具體的邏輯如下:

Java Code復(fù)制內(nèi)容到剪貼板 /*** *putToMemChannelindicateputeventtomemChannelorfileChannel *takeFromMemChannelindicatetakeeventfrommemChannelorfileChannel **/ privateAtomicBooleanputToMemChannel=newAtomicBoolean(true); privateAtomicBooleantakeFromMemChannel=newAtomicBoolean(true); voiddoPut(Eventevent){ if(switchon&&putToMemChannel.get()){ //往memChannel中寫數(shù)據(jù) memTransaction.put(event); if(memChannel.isFull()||fileChannel.getQueueSize()>100){ putToMemChannel.set(false); } }else{ //往fileChannel中寫數(shù)據(jù) fileTransaction.put(event); } } EventdoTake(){ Eventevent=null; if(takeFromMemChannel.get()){ //從memChannel中取數(shù)據(jù) event=memTransaction.take(); if(event==null){ takeFromMemChannel.set(false); } }else{ //從fileChannel中取數(shù)據(jù) event=fileTransaction.take(); if(event==null){ takeFromMemChannel.set(true); putToMemChannel.set(true); } } returnevent; }

2.5 增加NullChannelFlume提供了NullSink,可以把不需要的日志通過NullSink直接丟棄,不進(jìn)行存儲(chǔ)。然而,Source需要先將events存放到Channel中,NullSink再將events取出扔掉。為了提升性能,美團(tuán)把這一步移到了Channel里面做,所以開發(fā)了NullChannel。

2.6 增加KafkaSink為支持向Storm提供實(shí)時(shí)數(shù)據(jù)流,美團(tuán)增加了KafkaSink用來向Kafka寫實(shí)時(shí)數(shù)據(jù)流。其基本的邏輯如下:

Java Code復(fù)制內(nèi)容到剪貼板 publicclassKafkaSinkextendsAbstractSinkimplementsConfigurable{ privateStringzkConnect; privateIntegerzkTimeout; privateIntegerbatchSize; privateIntegerqueueSize; privateStringserializerClass; privateStringproducerType; privateStringtopicPrefix; privateProducer<String,String>producer; publicvoidconfigure(Contextcontext){ //讀取配置,并檢查配置 } @Override publicsynchronizedvoidstart(){ //初始化producer } @Override publicsynchronizedvoidstop(){ //關(guān)閉producer } @Override publicStatusprocess()throwsEventDeliveryException{ Statusstatus=Status.READY; Channelchannel=getChannel(); Transactiontx=channel.getTransaction(); try{ tx.begin(); //將日志按category分隊(duì)列存放 Map<String,List<String>>topic2EventList=newHashMap<String,List<String>>(); //從channel中取batchSize大小的日志,從header中獲取category,生成topic,并存放于上述的Map中; //將Map中的數(shù)據(jù)通過producer發(fā)送給kafka tx.commit(); }catch(Exceptione){ tx.rollback(); thrownewEventDeliveryException(e); }finally{ tx.close(); } returnstatus; } }

2.7 修復(fù)和scribe的兼容問題Scribed在通過ScribeSource發(fā)送數(shù)據(jù)包給Flume時(shí),大于4096字節(jié)的包,會(huì)先發(fā)送一個(gè)Dummy包檢查服務(wù)器的反應(yīng),而Flume的ScribeSource對(duì)于logentry.size()=0的包返回TRY_LATER,此時(shí)Scribed就認(rèn)為出錯(cuò),斷開連接。這樣循環(huán)反復(fù)嘗試,無法真正發(fā)送數(shù)據(jù)?,F(xiàn)在在ScribeSource的Thrift接口中,對(duì)size為0的情況返回OK,保證后續(xù)正常發(fā)送數(shù)據(jù)。

3. Flume系統(tǒng)調(diào)優(yōu)經(jīng)驗(yàn)總結(jié)3.1 基礎(chǔ)參數(shù)調(diào)優(yōu)經(jīng)驗(yàn)HdfsSink中默認(rèn)的serializer會(huì)每寫一行在行尾添加一個(gè)換行符,美團(tuán)日志本身帶有換行符,這樣會(huì)導(dǎo)致每條日志后面多一個(gè)空行,修改配置不要自動(dòng)添加換行符;lc.sinks.sink_hdfs.serializer.appendNewline = false調(diào)大MemoryChannel的capacity,盡量利用MemoryChannel快速的處理能力;調(diào)大HdfsSink的batchSize,增加吞吐量,減少hdfs的flush次數(shù);適當(dāng)調(diào)大HdfsSink的callTimeout,避免不必要的超時(shí)錯(cuò)誤;

3.2 HdfsSink獲取Filename的優(yōu)化HdfsSink的path參數(shù)指明了日志被寫到Hdfs的位置,該參數(shù)中可以引用格式化的參數(shù),將日志寫到一個(gè)動(dòng)態(tài)的目錄中。這方便了日志的管理。例如美團(tuán)可以將日志寫到category分類的目錄,并且按天和按小時(shí)存放:

lc.sinks.sink_hdfs.hdfs.path = /user/hive/work/orglog.db/%{category}/dt=%Y%m%d/hour=%HHdfsS ink中處理每條event時(shí),都要根據(jù)配置獲取此event應(yīng)該寫入的Hdfs path和filename,默認(rèn)的獲取方法是通過正則表達(dá)式替換配置中的變量,獲取真實(shí)的path和filename。因?yàn)榇诉^程是每條event都要做的操作,耗時(shí)很長(zhǎng)。通過美團(tuán)的測(cè)試,20萬條日志,這個(gè)操作要耗時(shí)6-8s左右。

由于美團(tuán)目前的path和filename有固定的模式,可以通過字符串拼接獲得。而后者比正則匹配快幾十倍。拼接定符串的方式,20萬條日志的操作只需要幾百毫秒。

3.3 HdfsSink的b/m/s優(yōu)化在美團(tuán)初始的設(shè)計(jì)中,所有的日志都通過一個(gè)Channel和一個(gè)HdfsSink寫到Hdfs上。美團(tuán)來看一看這樣做有什么問題。

首先,美團(tuán)來看一下HdfsSink在發(fā)送數(shù)據(jù)的邏輯:

Java Code復(fù)制內(nèi)容到剪貼板 //從Channel中取batchSize大小的events for(txnEventCount=0;txnEventCount<batchSize;txnEventCount++){ //對(duì)每條日志根據(jù)categoryappend到相應(yīng)的bucketWriter上; bucketWriter.append(event); } for(BucketWriterbucketWriter:writers){ //然后對(duì)每一個(gè)bucketWriter調(diào)用相應(yīng)的flush方法將數(shù)據(jù)flush到Hdfs上 bucketWriter.flush(); }

假設(shè)美團(tuán)的系統(tǒng)中有100個(gè)category,batchSize大小設(shè)置為20萬。則每20萬條數(shù)據(jù),就需要對(duì)100個(gè)文件進(jìn)行append或者flush操作。

其次,對(duì)于美團(tuán)的日志來說,基本符合80/20原則。即20%的category產(chǎn)生了系統(tǒng)80%的日志量。這樣對(duì)大部分日志來說,每20萬條可能只包含幾條日志,也需要往Hdfs上flush一次。

上述的情況會(huì)導(dǎo)致HdfsSink寫Hdfs的效率極差。下圖是單Channel的情況下每小時(shí)的發(fā)送量和寫hdfs的時(shí)間趨勢(shì)圖。

鑒于這種實(shí)際應(yīng)用場(chǎng)景,美團(tuán)把日志進(jìn)行了大小歸類,分為big, middle和small三類,這樣可以有效的避免小日志跟著大日志一起頻繁的flush,提升效果明顯。下圖是分隊(duì)列后big隊(duì)列的每小時(shí)的發(fā)送量和寫hdfs的時(shí)間趨勢(shì)圖。

4 未來發(fā)展目前,F(xiàn)lume日志收集系統(tǒng)提供了一個(gè)高可用,高可靠,可擴(kuò)展的分布式服務(wù),已經(jīng)有效地支持了美團(tuán)的日志數(shù)據(jù)收集工作。

后續(xù),美團(tuán)將在如下方面繼續(xù)研究:

日志管理系統(tǒng):圖形化的展示和控制日志收集系統(tǒng);跟進(jìn)社區(qū)發(fā)展:跟進(jìn)Flume 1.5的進(jìn)展,同時(shí)回饋社區(qū);

標(biāo)簽: