kafkatopic數(shù)據(jù)保留多久 如何獲取kafka某一topic中最新的offset?
如何獲取kafka某一topic中最新的offset?如果你在0.9版本以上,可以用最新的Consumer client 客戶端,有() / consumer.position() 可以用于得到當(dāng)前最
如何獲取kafka某一topic中最新的offset?
如果你在0.9版本以上,可以用最新的Consumer client 客戶端,有() / consumer.position() 可以用于得到當(dāng)前最新的offset: ${log.dirs}/replication-offset-checkpoint
如何刪除weblogicjmsdelayed狀態(tài)的消息?
消消氣~
你看到的“ Topic htmltrack_error is already marked for deletion”是正常的。Kafka刪除topic的過程實(shí)際上是異步的:命令行僅僅是提交一個(gè)刪除申請(qǐng)給到controller,并標(biāo)記該topic為“待刪除”狀態(tài)。其實(shí)在Kafka服務(wù)器端真正的刪除邏輯應(yīng)該已經(jīng)開啟,正常情況下您需要多等待一段時(shí)間topic自然會(huì)被成功刪除。
spark streaming集群如何做容災(zāi)處理?
spark streaming集群做容災(zāi)處理spark streaming有兩種容錯(cuò)機(jī)制:spark自帶的checkpoint,使用Kafka direct自行維護(hù)offset。
第一種方法中,checkpoint是spark streaming自帶的一種檢查點(diǎn)機(jī)制,可以通過一些特殊配置把輸入數(shù)據(jù)和計(jì)算過程中的數(shù)據(jù)存儲(chǔ)在可靠的文件系統(tǒng)中(比如hdfs或s3)。
它可以保存計(jì)算過程中的狀態(tài),在發(fā)生失敗時(shí)可以控制回溯到什么程度,而不用重新進(jìn)行計(jì)算。驅(qū)動(dòng)器容錯(cuò),在驅(qū)動(dòng)器崩潰重啟后,控制從什么位置繼續(xù)讀取數(shù)據(jù)。
checkpoint有三種應(yīng)用方法元數(shù)據(jù)信息,主要包括:streaming應(yīng)用程序的配置計(jì)算過程中一系列Dstream操作沒有完成的批處理,在運(yùn)行隊(duì)列中的批處理但是沒有完成;消費(fèi)數(shù)據(jù)的偏移量;編譯后的執(zhí)行程序(jar文件)序列化后的二進(jìn)制文件。
第二種方法基于接收者的,采用push,由kafka的topic將數(shù)據(jù)推向spark,使用API:使用的是kafka高級(jí)消費(fèi)者API
效果:kafka將數(shù)據(jù)推到spark執(zhí)行節(jié)點(diǎn)中并儲(chǔ)存起來,然后由Spark Streaming啟動(dòng)作業(yè)來處理這些程序。
效果上,這種不是使用接收器接收,而是saprk每次拉取數(shù)據(jù)先去kafka中獲取上一次拉取的偏移量。根據(jù)偏移量獲取數(shù)據(jù)后,再進(jìn)行處理。
筆者推薦第二種處理方法,自行維護(hù)offset要比spark自帶的checkpoint更好一些,更加靈活、安全,容災(zāi)處理的能原力會(huì)更強(qiáng)。