一文詳解Flink知識(shí)體系
失敗率重啟策略可以在flink-conf.yaml中設(shè)置下面的配置參數(shù)來(lái)啟用:
restart-strategy:failure-rate
配置參數(shù)描述默認(rèn)值restart-strategy.failure-rate.max-failures-per-interval在一個(gè)Job認(rèn)定為失敗之前,最大的重啟次數(shù)1restart-strategy.failure-rate.failure-rate-interval計(jì)算失敗率的時(shí)間間隔1分鐘restart-strategy.failure-rate.delay兩次連續(xù)重啟嘗試之間的時(shí)間間隔akka.a(chǎn)sk.timeout
例子:
restart-strategy.failure-rate.max-failures-per-interval: 3
restart-strategy.failure-rate.failure-rate-interval: 5 min
restart-strategy.failure-rate.delay: 10 s
失敗率重啟策略也可以在程序中設(shè)置:
val env = ExecutionEnvironment.getExecutionEnvironment()
env.setRestartStrategy(RestartStrategies.failureRateRestart(
3, // 每個(gè)測(cè)量時(shí)間間隔最大失敗次數(shù)
Time.of(5, TimeUnit.MINUTES), //失敗率測(cè)量的時(shí)間間隔
Time.of(10, TimeUnit.SECONDS) // 兩次連續(xù)重啟嘗試的時(shí)間間隔
))
4) 無(wú)重啟策略
Job直接失敗,不會(huì)嘗試進(jìn)行重啟
restart-strategy: none
無(wú)重啟策略也可以在程序中設(shè)置
val env = ExecutionEnvironment.getExecutionEnvironment()
env.setRestartStrategy(RestartStrategies.noRestart())
5) 案例
需求:輸入五次zhangsan,程序掛掉。
代碼:
import org.a(chǎn)pache.flink.a(chǎn)pi.common.restartstrategy.RestartStrategies
import org.a(chǎn)pache.flink.runtime.state.filesystem.FsStateBackend
import org.a(chǎn)pache.flink.streaming.a(chǎn)pi.environment.CheckpointConfig.ExternalizedCheckpointCleanup
import org.a(chǎn)pache.flink.streaming.a(chǎn)pi.scala._
object FixDelayRestartStrategiesDemo {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
//如果想要開(kāi)啟重啟策略,就必須開(kāi)啟CheckPoint
env.enableCheckpointing(5000L)
//指定狀態(tài)存儲(chǔ)后端,默認(rèn)就是內(nèi)存
//現(xiàn)在指定的是FsStateBackend,支持本地系統(tǒng)、
//new FsStateBackend要指定存儲(chǔ)系統(tǒng)的協(xié)議:scheme (hdfs://, file://, etc)
env.setStateBackend(new FsStateBackend(args(0)))
//如果程序被cancle,保留以前做的checkpoint
env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
//指定以后存儲(chǔ)多個(gè)checkpoint目錄
env.getCheckpointConfig.setMaxConcurrentCheckpoints(2)
//指定重啟策略,默認(rèn)的重啟策略是不停的重啟
//程序出現(xiàn)異常是會(huì)重啟,重啟五次,每次延遲5秒,如果超過(guò)了5次,程序退出
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5, 5000))
val lines: DataStream[String] = env.socketTextStream(args(1), 8888)
val result = lines.flatMap(_.split(" ").map(word => {
if(word.equals("zhangsan")) {
throw new RuntimeException("zhangsan,程序重啟!");
}
(word, 1)
})).keyBy(0).sum(1)
result.print()
env.execute()
}
}
3) checkpoint 案例
1. 需求:
假定用戶需要每隔1秒鐘需要統(tǒng)計(jì)4秒中窗口中數(shù)據(jù)的量,然后對(duì)統(tǒng)計(jì)的結(jié)果值進(jìn)行checkpoint處理
2. 數(shù)據(jù)規(guī)劃:
使用自定義算子每秒鐘產(chǎn)生大約10000條數(shù)據(jù)。產(chǎn)生的數(shù)據(jù)為一個(gè)四元組(Long,String,String,Integer)—------(id,name,info,count)。數(shù)據(jù)經(jīng)統(tǒng)計(jì)后,統(tǒng)計(jì)結(jié)果打印到終端輸出。打印輸出的結(jié)果為L(zhǎng)ong類型的數(shù)據(jù) 。
3. 開(kāi)發(fā)思路:
source算子每隔1秒鐘發(fā)送10000條數(shù)據(jù),并注入到Window算子中。window算子每隔1秒鐘統(tǒng)計(jì)一次最近4秒鐘內(nèi)數(shù)據(jù)數(shù)量。每隔1秒鐘將統(tǒng)計(jì)結(jié)果打印到終端。每隔6秒鐘觸發(fā)一次checkpoint,然后將checkpoint的結(jié)果保存到HDFS中。
5. 開(kāi)發(fā)步驟:
獲取流處理執(zhí)行環(huán)境
設(shè)置檢查點(diǎn)機(jī)制
自定義數(shù)據(jù)源
數(shù)據(jù)分組
劃分時(shí)間窗口
數(shù)據(jù)聚合
數(shù)據(jù)打印
觸發(fā)執(zhí)行
示例代碼:
//發(fā)送數(shù)據(jù)形式
case class SEvent(id: Long, name: String, info: String, count: Int)
class SEventSourceWithChk extends RichSourceFunction[SEvent]{
private var count = 0L
private var isRunning = true
private val alphabet = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWZYX0987654321"
// 任務(wù)取消時(shí)調(diào)用
override def cancel(): Unit = {
isRunning = false
}
//// source算子的邏輯,即:每秒鐘向流圖中注入10000個(gè)元組
override def run(sourceContext: SourceContext[SEvent]): Unit = {
while(isRunning) {
for (i <- 0 until 10000) {
sourceContext.collect(SEvent(1, "hello-"+count, alphabet,1))
count += 1L
}
Thread.sleep(1000)
}
}
}
*
該段代碼是流圖定義代碼,具體實(shí)現(xiàn)業(yè)務(wù)流程,另外,代碼中窗口的觸發(fā)時(shí)間使 用了event time。
object FlinkEventTimeAPIChkMain {
def main(args: Array[String]): Unit ={
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStateBackend(new FsStateBackend("hdfs://hadoop01:9000/flink-checkpoint/checkpoint/"))
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
env.getCheckpointConfig.setCheckpointInterval(6000)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//保留策略:默認(rèn)情況下,檢查點(diǎn)不會(huì)被保留,僅用于故障中恢復(fù)作業(yè),可以啟用外部持久化檢查點(diǎn),同時(shí)指定保留策略
//ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:在作業(yè)取消時(shí)保留檢查點(diǎn),注意在這種情況下,您必須在取消后手動(dòng)清理檢查點(diǎn)狀態(tài)
//ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION:當(dāng)作業(yè)被cancel時(shí),刪除檢查點(diǎn),檢查點(diǎn)狀態(tài)僅在作業(yè)失敗時(shí)可用
env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION)
// 應(yīng)用邏輯
val source: DataStream[SEvent] = env.a(chǎn)ddSource(new SEventSourceWithChk)
source.a(chǎn)ssignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[SEvent] {
// 設(shè)置watermark
override def getCurrentWatermark: Watermark = {
new Watermark(System.currentTimeMillis())
}
// 給每個(gè)元組打上時(shí)間戳
override def extractTimestamp(t: SEvent, l: Long): Long = {
System.currentTimeMillis()
}
})
.keyBy(0)
.window(SlidingEventTimeWindows.of(Time.seconds(4), Time.seconds(1)))
.a(chǎn)pply(new WindowStatisticWithChk)
.print()
env.execute()
}
}
//該數(shù)據(jù)在算子制作快照時(shí)用于保存到目前為止算子記錄的數(shù)據(jù)條數(shù)。
// 用戶自定義狀態(tài)
class UDFState extends Serializable{
private var count = 0L
// 設(shè)置用戶自定義狀態(tài)
def setState(s: Long) = count = s
// 獲取用戶自定狀態(tài)
def getState = count
}
//該段代碼是window算子的代碼,每當(dāng)觸發(fā)計(jì)算時(shí)統(tǒng)計(jì)窗口中元組數(shù)量。
class WindowStatisticWithChk extends WindowFunction[SEvent, Long, Tuple, TimeWindow] with ListCheckpointed[UDFState]{
private var total = 0L
// window算子的實(shí)現(xiàn)邏輯,即:統(tǒng)計(jì)window中元組的數(shù)量
override def apply(key: Tuple, window: TimeWindow, input: Iterable[SEvent], out: Collector[Long]): Unit = {
var count = 0L
for (event <- input) {
count += 1L
}
total += count
out.collect(count)
}
// 從自定義快照中恢復(fù)狀態(tài)
override def restoreState(state: util.List[UDFState]): Unit = {
val udfState = state.get(0)
total = udfState.getState
}
// 制作自定義狀態(tài)快照
override def snapshotState(checkpointId: Long, timestamp: Long): util.List[UDFState] = {
val udfList: util.ArrayList[UDFState] = new util.ArrayList[UDFState]
val udfState = new UDFState
udfState.setState(total)
udfList.a(chǎn)dd(udfState)
udfList
}
}
4. 端對(duì)端僅處理一次語(yǔ)義
當(dāng)談及僅一次處理時(shí),我們真正想表達(dá)的是每條輸入消息只會(huì)影響最終結(jié)果一次!(影響應(yīng)用狀態(tài)一次,而非被處理一次)即使出現(xiàn)機(jī)器故障或軟件崩潰,Flink也要保證不會(huì)有數(shù)據(jù)被重復(fù)處理或壓根就沒(méi)有被處理從而影響狀態(tài)。
在 Flink 1.4 版本之前,精準(zhǔn)一次處理只限于 Flink 應(yīng)用內(nèi),也就是所有的 Operator 完全由 Flink 狀態(tài)保存并管理的才能實(shí)現(xiàn)精確一次處理。但 Flink 處理完數(shù)據(jù)后大多需要將結(jié)果發(fā)送到外部系統(tǒng),比如 Sink 到 Kafka 中,這個(gè)過(guò)程中 Flink 并不保證精準(zhǔn)一次處理。
在 Flink 1.4 版本正式引入了一個(gè)里程碑式的功能:兩階段提交 Sink,即 TwoPhaseCommitSinkFunction 函數(shù)。該 SinkFunction 提取并封裝了兩階段提交協(xié)議中的公共邏輯,自此 Flink 搭配特定 Source 和 Sink(如 Kafka 0.11 版)實(shí)現(xiàn)精確一次處理語(yǔ)義(英文簡(jiǎn)稱:EOS,即 Exactly-Once Semantics)。
在 Flink 中需要端到端精準(zhǔn)一次處理的位置有三個(gè):
Flink 端到端精準(zhǔn)一次處理
Source 端:數(shù)據(jù)從上一階段進(jìn)入到 Flink 時(shí),需要保證消息精準(zhǔn)一次消費(fèi)。
Flink 內(nèi)部端:這個(gè)我們已經(jīng)了解,利用 Checkpoint 機(jī)制,把狀態(tài)存盤(pán),發(fā)生故障的時(shí)候可以恢復(fù),保證內(nèi)部的狀態(tài)一致性。不了解的小伙伴可以看下我之前的文章:
Flink可靠性的基石-checkpoint機(jī)制詳細(xì)解析
Sink 端:將處理完的數(shù)據(jù)發(fā)送到下一階段時(shí),需要保證數(shù)據(jù)能夠準(zhǔn)確無(wú)誤發(fā)送到下一階段。
1) Flink端到端精準(zhǔn)一次處理語(yǔ)義(EOS)
以下內(nèi)容適用于 Flink 1.4 及之后版本
對(duì)于 Source 端:Source 端的精準(zhǔn)一次處理比較簡(jiǎn)單,畢竟數(shù)據(jù)是落到 Flink 中,所以 Flink 只需要保存消費(fèi)數(shù)據(jù)的偏移量即可, 如消費(fèi) Kafka 中的數(shù)據(jù),Flink 將 Kafka Consumer 作為 Source,可以將偏移量保存下來(lái),如果后續(xù)任務(wù)出現(xiàn)了故障,恢復(fù)的時(shí)候可以由連接器重置偏移量,重新消費(fèi)數(shù)據(jù),保證一致性。
對(duì)于 Sink 端:Sink 端是最復(fù)雜的,因?yàn)閿?shù)據(jù)是落地到其他系統(tǒng)上的,數(shù)據(jù)一旦離開(kāi) Flink 之后,Flink 就監(jiān)控不到這些數(shù)據(jù)了,所以精準(zhǔn)一次處理語(yǔ)義必須也要應(yīng)用于 Flink 寫(xiě)入數(shù)據(jù)的外部系統(tǒng),故這些外部系統(tǒng)必須提供一種手段允許提交或回滾這些寫(xiě)入操作,同時(shí)還要保證與 Flink Checkpoint 能夠協(xié)調(diào)使用(Kafka 0.11 版本已經(jīng)實(shí)現(xiàn)精確一次處理語(yǔ)義)。
我們以 Flink 與 Kafka 組合為例,Flink 從 Kafka 中讀數(shù)據(jù),處理完的數(shù)據(jù)在寫(xiě)入 Kafka 中。
為什么以Kafka為例,第一個(gè)原因是目前大多數(shù)的 Flink 系統(tǒng)讀寫(xiě)數(shù)據(jù)都是與 Kafka 系統(tǒng)進(jìn)行的。第二個(gè)原因,也是最重要的原因 Kafka 0.11 版本正式發(fā)布了對(duì)于事務(wù)的支持,這是與Kafka交互的Flink應(yīng)用要實(shí)現(xiàn)端到端精準(zhǔn)一次語(yǔ)義的必要條件。
當(dāng)然,Flink 支持這種精準(zhǔn)一次處理語(yǔ)義并不只是限于與 Kafka 的結(jié)合,可以使用任何 Source/Sink,只要它們提供了必要的協(xié)調(diào)機(jī)制。
2) Flink 與 Kafka 組合
Flink 應(yīng)用示例
如上圖所示,Flink 中包含以下組件:
一個(gè) Source,從 Kafka 中讀取數(shù)據(jù)(即 KafkaConsumer)
一個(gè)時(shí)間窗口化的聚會(huì)操作(Window)
一個(gè) Sink,將結(jié)果寫(xiě)入到 Kafka(即 KafkaProducer)
若要 Sink 支持精準(zhǔn)一次處理語(yǔ)義(EOS),它必須以事務(wù)的方式寫(xiě)數(shù)據(jù)到 Kafka,這樣當(dāng)提交事務(wù)時(shí)兩次 Checkpoint 間的所有寫(xiě)入操作當(dāng)作為一個(gè)事務(wù)被提交。這確保了出現(xiàn)故障或崩潰時(shí)這些寫(xiě)入操作能夠被回滾。
當(dāng)然了,在一個(gè)分布式且含有多個(gè)并發(fā)執(zhí)行 Sink 的應(yīng)用中,僅僅執(zhí)行單次提交或回滾是不夠的,因?yàn)樗薪M件都必須對(duì)這些提交或回滾達(dá)成共識(shí),這樣才能保證得到一個(gè)一致性的結(jié)果。Flink 使用兩階段提交協(xié)議以及預(yù)提交(Pre-commit)階段來(lái)解決這個(gè)問(wèn)題。
3) 兩階段提交協(xié)議(2PC)
兩階段提交協(xié)議(Two-Phase Commit,2PC)是很常用的解決分布式事務(wù)問(wèn)題的方式,它可以保證在分布式事務(wù)中,要么所有參與進(jìn)程都提交事務(wù),要么都取消,即實(shí)現(xiàn) ACID 中的 A (原子性)。
在數(shù)據(jù)一致性的環(huán)境下,其代表的含義是:要么所有備份數(shù)據(jù)同時(shí)更改某個(gè)數(shù)值,要么都不改,以此來(lái)達(dá)到數(shù)據(jù)的強(qiáng)一致性。
兩階段提交協(xié)議中有兩個(gè)重要角色,協(xié)調(diào)者(Coordinator)和參與者(Participant),其中協(xié)調(diào)者只有一個(gè),起到分布式事務(wù)的協(xié)調(diào)管理作用,參與者有多個(gè)。
顧名思義,兩階段提交將提交過(guò)程劃分為連續(xù)的兩個(gè)階段:表決階段(Voting)和提交階段(Commit)。
兩階段提交協(xié)議過(guò)程如下圖所示:
兩階段提交協(xié)議
第一階段:表決階段
協(xié)調(diào)者向所有參與者發(fā)送一個(gè) VOTE_REQUEST 消息。
當(dāng)參與者接收到 VOTE_REQUEST 消息,向協(xié)調(diào)者發(fā)送 VOTE_COMMIT 消息作為回應(yīng),告訴協(xié)調(diào)者自己已經(jīng)做好準(zhǔn)備提交準(zhǔn)備,如果參與者沒(méi)有準(zhǔn)備好或遇到其他故障,就返回一個(gè) VOTE_ABORT 消息,告訴協(xié)調(diào)者目前無(wú)法提交事務(wù)。
第二階段:提交階段
協(xié)調(diào)者收集來(lái)自各個(gè)參與者的表決消息。如果所有參與者一致認(rèn)為可以提交事務(wù),那么協(xié)調(diào)者決定事務(wù)的最終提交,在此情形下協(xié)調(diào)者向所有參與者發(fā)送一個(gè) GLOBAL_COMMIT 消息,通知參與者進(jìn)行本地提交;如果所有參與者中有任意一個(gè)返回消息是 VOTE_ABORT,協(xié)調(diào)者就會(huì)取消事務(wù),向所有參與者廣播一條 GLOBAL_ABORT 消息通知所有的參與者取消事務(wù)。
每個(gè)提交了表決信息的參與者等候協(xié)調(diào)者返回消息,如果參與者接收到一個(gè) GLOBAL_COMMIT 消息,那么參與者提交本地事務(wù),否則如果接收到 GLOBAL_ABORT 消息,則參與者取消本地事務(wù)。
4) 兩階段提交協(xié)議在 Flink 中的應(yīng)用
Flink 的兩階段提交思路:
我們從 Flink 程序啟動(dòng)到消費(fèi) Kafka 數(shù)據(jù),最后到 Flink 將數(shù)據(jù) Sink 到 Kafka 為止,來(lái)分析 Flink 的精準(zhǔn)一次處理。
當(dāng) Checkpoint 啟動(dòng)時(shí),JobManager 會(huì)將檢查點(diǎn)分界線(checkpoint battier)注入數(shù)據(jù)流,checkpoint barrier 會(huì)在算子間傳遞下去,如下如所示:
Flink 精準(zhǔn)一次處理:Checkpoint 啟動(dòng)
Source 端:Flink Kafka Source 負(fù)責(zé)保存 Kafka 消費(fèi) offset,當(dāng) Chckpoint 成功時(shí) Flink 負(fù)責(zé)提交這些寫(xiě)入,否則就終止取消掉它們,當(dāng) Chckpoint 完成位移保存,它會(huì)將 checkpoint barrier(檢查點(diǎn)分界線) 傳給下一個(gè) Operator,然后每個(gè)算子會(huì)對(duì)當(dāng)前的狀態(tài)做個(gè)快照,保存到狀態(tài)后端(State Backend)。
對(duì)于 Source 任務(wù)而言,就會(huì)把當(dāng)前的 offset 作為狀態(tài)保存起來(lái)。下次從 Checkpoint 恢復(fù)時(shí),Source 任務(wù)可以重新提交偏移量,從上次保存的位置開(kāi)始重新消費(fèi)數(shù)據(jù),如下圖所示:
Flink 精準(zhǔn)一次處理:checkpoint barrier 及 offset 保存Slink 端:從 Source 端開(kāi)始,每個(gè)內(nèi)部的 transform 任務(wù)遇到 checkpoint barrier(檢查點(diǎn)分界線)時(shí),都會(huì)把狀態(tài)存到 Checkpoint 里。數(shù)據(jù)處理完畢到 Sink 端時(shí),Sink 任務(wù)首先把數(shù)據(jù)寫(xiě)入外部 Kafka,這些數(shù)據(jù)都屬于預(yù)提交的事務(wù)(還不能被消費(fèi)),此時(shí)的 Pre-commit 預(yù)提交階段下 Data Sink 在保存狀態(tài)到狀態(tài)后端的同時(shí)還必須預(yù)提交它的外部事務(wù),如下圖所示:
Flink 精準(zhǔn)一次處理:預(yù)提交到外部系統(tǒng)
當(dāng)所有算子任務(wù)的快照完成(所有創(chuàng)建的快照都被視為是 Checkpoint 的一部分),也就是這次的 Checkpoint 完成時(shí),JobManager 會(huì)向所有任務(wù)發(fā)通知,確認(rèn)這次 Checkpoint 完成,此時(shí) Pre-commit 預(yù)提交階段才算完成。才正式到兩階段提交協(xié)議的第二個(gè)階段:commit 階段。該階段中 JobManager 會(huì)為應(yīng)用中每個(gè) Operator 發(fā)起 Checkpoint 已完成的回調(diào)邏輯。
本例中的 Data Source 和窗口操作無(wú)外部狀態(tài),因此在該階段,這兩個(gè) Opeartor 無(wú)需執(zhí)行任何邏輯,但是 Data Sink 是有外部狀態(tài)的,此時(shí)我們必須提交外部事務(wù),當(dāng) Sink 任務(wù)收到確認(rèn)通知,就會(huì)正式提交之前的事務(wù),Kafka 中未確認(rèn)的數(shù)據(jù)就改為“已確認(rèn)”,數(shù)據(jù)就真正可以被消費(fèi)了,如下圖所示:
Flink 精準(zhǔn)一次處理:數(shù)據(jù)精準(zhǔn)被消費(fèi)
注:Flink 由 JobManager 協(xié)調(diào)各個(gè) TaskManager 進(jìn)行 Checkpoint 存儲(chǔ),Checkpoint 保存在 StateBackend(狀態(tài)后端) 中,默認(rèn) StateBackend 是內(nèi)存級(jí)的,也可以改為文件級(jí)的進(jìn)行持久化保存。
最后,一張圖總結(jié)下 Flink 的 EOS:
Flink 端到端精準(zhǔn)一次處理
此圖建議保存,總結(jié)全面且簡(jiǎn)明扼要,再也不慫面試官!
5) Exactly-Once 案例
Kafka來(lái)實(shí)現(xiàn)End-to-End Exactly-Once語(yǔ)義:
import java.util.Properties
import org.a(chǎn)pache.flink.a(chǎn)pi.common.serialization.SimpleStringSchema
import org.a(chǎn)pache.flink.streaming.a(chǎn)pi.CheckpointingMode
import org.a(chǎn)pache.flink.streaming.a(chǎn)pi.environment.CheckpointConfig
import org.a(chǎn)pache.flink.streaming.a(chǎn)pi.scala.StreamExecutionEnvironment
import org.a(chǎn)pache.flink.streaming.connectors.kafka.FlinkKafkaProducer011
import org.a(chǎn)pache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper
*
* Kafka Producer的容錯(cuò)-Kafka 0.9 and 0.10
* 如果Flink開(kāi)啟了checkpoint,針對(duì)FlinkKafkaProducer09 和FlinkKafkaProducer010 可以提供 at-least-once的語(yǔ)義,還需要配置下面兩個(gè)參數(shù)
* ?setLogFailuresOnly(false)
* ?setFlushOnCheckpoint(true)
*
* 注意:建議修改kafka 生產(chǎn)者的重試次數(shù)
* retries【這個(gè)參數(shù)的值默認(rèn)是0】
*
* Kafka Producer的容錯(cuò)-Kafka 0.11
* 如果Flink開(kāi)啟了checkpoint,針對(duì)FlinkKafkaProducer011 就可以提供 exactly-once的語(yǔ)義
* 但是需要選擇具體的語(yǔ)義
* ?Semantic.NONE
* ?Semantic.AT_LEAST_ONCE【默認(rèn)】
* ?Semantic.EXACTLY_ONCE
object StreamingKafkaSinkScala {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
//隱式轉(zhuǎn)換
import org.a(chǎn)pache.flink.a(chǎn)pi.scala._
//checkpoint配置
env.enableCheckpointing(5000)
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
env.getCheckpointConfig.setCheckpointTimeout(60000)
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
val text = env.socketTextStream("node01", 9001, '')
val topic = "test"
val prop = new Properties()
prop.setProperty("bootstrap.servers", "node01:9092")
//設(shè)置事務(wù)超時(shí)時(shí)間,也可在kafka配置中設(shè)置
prop.setProperty("transaction.timeout.ms",60000*15+"");
//使用至少一次語(yǔ)義的形式
//val myProducer = new FlinkKafkaProducer011(brokerList, topic, new SimpleStringSchema());
//使用支持僅一次語(yǔ)義的形式
val myProducer =
new FlinkKafkaProducer011[String](topic, new KeyedSerializationSchemaWrapper[String](new SimpleStringSchema), prop, FlinkKafkaProducer011.Semantic.EXACTLY_ONCE);
text.a(chǎn)ddSink(myProducer)
env.execute("StreamingKafkaSinkScala")
}
}
Redis實(shí)現(xiàn)End-to-End Exactly-Once語(yǔ)義:
代碼開(kāi)發(fā)步驟:
獲取流處理執(zhí)行環(huán)境設(shè)置檢查點(diǎn)機(jī)制定義kafkaConsumer數(shù)據(jù)轉(zhuǎn)換:分組,求和數(shù)據(jù)寫(xiě)入redis觸發(fā)執(zhí)行object ExactlyRedisSink {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.enableCheckpointing(5000)
env.setStateBackend(new FsStateBackend("hdfs://node01:8020/check/11"))
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
env.getCheckpointConfig.setCheckpointTimeout(60000)
env.getCheckpointConfig.setFailOnCheckpointingErrors(false)
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION)
//設(shè)置kafka,加載kafka數(shù)據(jù)源
val properties = new Properties()
properties.setProperty("bootstrap.servers", "node01:9092,node02:9092,node03:9092")
properties.setProperty("group.id", "test")
properties.setProperty("enable.a(chǎn)uto.commit", "false")
val kafkaConsumer = new FlinkKafkaConsumer011[String]("test2", new SimpleStringSchema(), properties)
kafkaConsumer.setStartFromLatest()
//檢查點(diǎn)制作成功,才開(kāi)始提交偏移量
kafkaConsumer.setCommitOffsetsOnCheckpoints(true)
val kafkaSource: DataStream[String] = env.a(chǎn)ddSource(kafkaConsumer)
//數(shù)據(jù)轉(zhuǎn)換
val sumData: DataStream[(String, Int)] = kafkaSource.flatMap(_.split(" "))
.map(_ -> 1)
.keyBy(0)
.sum(1)
val set = new util.HashSet[InetSocketAddress]()
set.a(chǎn)dd(new InetSocketAddress(InetAddress.getByName("node01"),7001))
set.a(chǎn)dd(new InetSocketAddress(InetAddress.getByName("node01"),7002))
set.a(chǎn)dd(new InetSocketAddress(InetAddress.getByName("node01"),7003))
val config: FlinkJedisClusterConfig = new FlinkJedisClusterConfig.Builder()
.setNodes(set)
.setMaxIdle(5)
.setMaxTotal(10)
.setMinIdle(5)
.setTimeout(10)
.build()
//數(shù)據(jù)寫(xiě)入
sumData.a(chǎn)ddSink(new RedisSink(config,new MyRedisSink))
env.execute()
}
}
class MyRedisSink extends RedisMapper[(String,Int)] {
override def getCommandDescription: RedisCommandDescription = {
new RedisCommandDescription(RedisCommand.HSET,"resink")
}
override def getKeyFromData(data: (String, Int)): String = {
data._1
}
override def getValueFromData(data: (String, Int)): String = {
data._2.toString
}
}
八、Flink SQL
Flink SQL 是 Flink 實(shí)時(shí)計(jì)算為簡(jiǎn)化計(jì)算模型,降低用戶使用實(shí)時(shí)計(jì)算門(mén)檻而設(shè)計(jì)的一套符合標(biāo)準(zhǔn) SQL 語(yǔ)義的開(kāi)發(fā)語(yǔ)言。自 2015 年開(kāi)始,阿里巴巴開(kāi)始調(diào)研開(kāi)源流計(jì)算引擎,最終決定基于 Flink 打造新一代計(jì)算引擎,針對(duì) Flink 存在的不足進(jìn)行優(yōu)化和改進(jìn),并且在 2019 年初將最終代碼開(kāi)源,也就是我們熟知的 Blink。Blink 在原來(lái)的 Flink 基礎(chǔ)上最顯著的一個(gè)貢獻(xiàn)就是 Flink SQL 的實(shí)現(xiàn)。
Flink SQL 是面向用戶的 API 層,在我們傳統(tǒng)的流式計(jì)算領(lǐng)域,比如 Storm、Spark Streaming 都會(huì)提供一些 Function 或者 Datastream API,用戶通過(guò) Java 或 Scala 寫(xiě)業(yè)務(wù)邏輯,這種方式雖然靈活,但有一些不足,比如具備一定門(mén)檻且調(diào)優(yōu)較難,隨著版本的不斷更新,API 也出現(xiàn)了很多不兼容的地方。
在這個(gè)背景下,毫無(wú)疑問(wèn),SQL 就成了我們最佳選擇,之所以選擇將 SQL 作為核心 API,是因?yàn)槠渚哂袔讉(gè)非常重要的特點(diǎn):
SQL 屬于設(shè)定式語(yǔ)言,用戶只要表達(dá)清楚需求即可,不需要了解具體做法;
SQL 可優(yōu)化,內(nèi)置多種查詢優(yōu)化器,這些查詢優(yōu)化器可為 SQL 翻譯出最優(yōu)執(zhí)行計(jì)劃;
SQL 易于理解,不同行業(yè)和領(lǐng)域的人都懂,學(xué)習(xí)成本較低;
SQL 非常穩(wěn)定,在數(shù)據(jù)庫(kù) 30 多年的歷史中,SQL 本身變化較少;
流與批的統(tǒng)一,Flink 底層 Runtime 本身就是一個(gè)流與批統(tǒng)一的引擎,而 SQL 可以做到 API 層的流與批統(tǒng)一。
1. Flink SQL 常用算子
SELECT:
SELECT 用于從 DataSet/DataStream 中選擇數(shù)據(jù),用于篩選出某些列。
示例:
SELECT * FROM Table; // 取出表中的所有列
SELECT name,age FROM Table; // 取出表中 name 和 age 兩列
與此同時(shí) SELECT 語(yǔ)句中可以使用函數(shù)和別名,例如我們上面提到的 WordCount 中:
SELECT word, COUNT(word) FROM table GROUP BY word;
WHERE:
WHERE 用于從數(shù)據(jù)集/流中過(guò)濾數(shù)據(jù),與 SELECT 一起使用,用于根據(jù)某些條件對(duì)關(guān)系做水平分割,即選擇符合條件的記錄。
示例:
SELECT name,age FROM Table where name LIKE ‘% 小明 %’;
SELECT * FROM Table WHERE age = 20;
WHERE 是從原數(shù)據(jù)中進(jìn)行過(guò)濾,那么在 WHERE 條件中,Flink SQL 同樣支持 =、<、>、、>=、<=,以及 AND、OR 等表達(dá)式的組合,最終滿足過(guò)濾條件的數(shù)據(jù)會(huì)被選擇出來(lái)。并且 WHERE 可以結(jié)合 IN、NOT IN 聯(lián)合使用。舉個(gè)例子:
SELECT name, age
FROM Table
WHERE name IN (SELECT name FROM Table2)
DISTINCT:
DISTINCT 用于從數(shù)據(jù)集/流中去重根據(jù) SELECT 的結(jié)果進(jìn)行去重。
示例:
SELECT DISTINCT name FROM Table;
對(duì)于流式查詢,計(jì)算查詢結(jié)果所需的 State 可能會(huì)無(wú)限增長(zhǎng),用戶需要自己控制查詢的狀態(tài)范圍,以防止?fàn)顟B(tài)過(guò)大。
GROUP BY:
GROUP BY 是對(duì)數(shù)據(jù)進(jìn)行分組操作。例如我們需要計(jì)算成績(jī)明細(xì)表中,每個(gè)學(xué)生的總分。
示例:
SELECT name, SUM(score) as TotalScore FROM Table GROUP BY name;
UNION 和 UNION ALL:
UNION 用于將兩個(gè)結(jié)果集合并起來(lái),要求兩個(gè)結(jié)果集字段完全一致,包括字段類型、字段順序。不同于 UNION ALL 的是,UNION 會(huì)對(duì)結(jié)果數(shù)據(jù)去重。
示例:
SELECT * FROM T1 UNION (ALL) SELECT * FROM T2;
JOIN:
JOIN 用于把來(lái)自兩個(gè)表的數(shù)據(jù)聯(lián)合起來(lái)形成結(jié)果表,Flink 支持的 JOIN 類型包括:
JOIN - INNER JOIN
LEFT JOIN - LEFT OUTER JOIN
RIGHT JOIN - RIGHT OUTER JOIN
FULL JOIN - FULL OUTER JOIN
這里的 JOIN 的語(yǔ)義和我們?cè)陉P(guān)系型數(shù)據(jù)庫(kù)中使用的 JOIN 語(yǔ)義一致。
示例:
JOIN(將訂單表數(shù)據(jù)和商品表進(jìn)行關(guān)聯(lián))
SELECT * FROM Orders INNER JOIN Product ON Orders.productId = Product.id
LEFT JOIN 與 JOIN 的區(qū)別是當(dāng)右表沒(méi)有與左邊相 JOIN 的數(shù)據(jù)時(shí)候,右邊對(duì)應(yīng)的字段補(bǔ) NULL 輸出,RIGHT JOIN 相當(dāng)于 LEFT JOIN 左右兩個(gè)表交互一下位置。FULL JOIN 相當(dāng)于 RIGHT JOIN 和 LEFT JOIN 之后進(jìn)行 UNION ALL 操作。
示例:
SELECT * FROM Orders LEFT JOIN Product ON Orders.productId = Product.id
SELECT * FROM Orders RIGHT JOIN Product ON Orders.productId = Product.id
SELECT * FROM Orders FULL OUTER JOIN Product ON Orders.productId = Product.id
Group Window:
根據(jù)窗口數(shù)據(jù)劃分的不同,目前 Apache Flink 有如下 3 種 Bounded Window:
Tumble,滾動(dòng)窗口,窗口數(shù)據(jù)有固定的大小,窗口數(shù)據(jù)無(wú)疊加;
Hop,滑動(dòng)窗口,窗口數(shù)據(jù)有固定大小,并且有固定的窗口重建頻率,窗口數(shù)據(jù)有疊加;
Session,會(huì)話窗口,窗口數(shù)據(jù)沒(méi)有固定的大小,根據(jù)窗口數(shù)據(jù)活躍程度劃分窗口,窗口數(shù)據(jù)無(wú)疊加。
Tumble Window:
Tumble 滾動(dòng)窗口有固定大小,窗口數(shù)據(jù)不重疊,具體語(yǔ)義如下:
Tumble 滾動(dòng)窗口對(duì)應(yīng)的語(yǔ)法如下:
SELECT
[gk],
[TUMBLE_START(timeCol, size)],
[TUMBLE_END(timeCol, size)],
agg1(col1),
...
aggn(colN)
FROM Tab1
GROUP BY [gk], TUMBLE(timeCol, size)
其中:
[gk] 決定了是否需要按照字段進(jìn)行聚合;
TUMBLE_START 代表窗口開(kāi)始時(shí)間;
TUMBLE_END 代表窗口結(jié)束時(shí)間;
timeCol 是流表中表示時(shí)間字段;
size 表示窗口的大小,如 秒、分鐘、小時(shí)、天。
舉個(gè)例子,假如我們要計(jì)算每個(gè)人每天的訂單量,按照 user 進(jìn)行聚合分組:
SELECT user,
TUMBLE_START(rowtime, INTERVAL ‘1’ DAY) as wStart,
SUM(amount)
FROM Orders
GROUP BY TUMBLE(rowtime, INTERVAL ‘1’ DAY), user;
Hop Window:
Hop 滑動(dòng)窗口和滾動(dòng)窗口類似,窗口有固定的 size,與滾動(dòng)窗口不同的是滑動(dòng)窗口可以通過(guò) slide 參數(shù)控制滑動(dòng)窗口的新建頻率。因此當(dāng) slide 值小于窗口 size 的值的時(shí)候多個(gè)滑動(dòng)窗口會(huì)重疊,具體語(yǔ)義如下:
Hop 滑動(dòng)窗口對(duì)應(yīng)語(yǔ)法如下:
SELECT
[gk],
[HOP_START(timeCol, slide, size)] ,
[HOP_END(timeCol, slide, size)],
agg1(col1),
...
aggN(colN)
FROM Tab1
GROUP BY [gk], HOP(timeCol, slide, size)
每次字段的意思和 Tumble 窗口類似:
[gk] 決定了是否需要按照字段進(jìn)行聚合;
HOP_START 表示窗口開(kāi)始時(shí)間;
HOP_END 表示窗口結(jié)束時(shí)間;
timeCol 表示流表中表示時(shí)間字段;
slide 表示每次窗口滑動(dòng)的大小;
size 表示整個(gè)窗口的大小,如 秒、分鐘、小時(shí)、天。
舉例說(shuō)明,我們要每過(guò)一小時(shí)計(jì)算一次過(guò)去 24 小時(shí)內(nèi)每個(gè)商品的銷量:
SELECT product,
SUM(amount)
FROM Orders
GROUP BY HOP(rowtime, INTERVAL '1' HOUR, INTERVAL '1' DAY), product
Session Window:
會(huì)話時(shí)間窗口沒(méi)有固定的持續(xù)時(shí)間,但它們的界限由 interval 不活動(dòng)時(shí)間定義,即如果在定義的間隙期間沒(méi)有出現(xiàn)事件,則會(huì)話窗口關(guān)閉。
Seeeion 會(huì)話窗口對(duì)應(yīng)語(yǔ)法如下:
SELECT
[gk],
SESSION_START(timeCol, gap) AS winStart,
SESSION_END(timeCol, gap) AS winEnd,
agg1(col1),
...
aggn(colN)
FROM Tab1
GROUP BY [gk], SESSION(timeCol, gap)
[gk] 決定了是否需要按照字段進(jìn)行聚合;
SESSION_START 表示窗口開(kāi)始時(shí)間;
SESSION_END 表示窗口結(jié)束時(shí)間;
timeCol 表示流表中表示時(shí)間字段;
gap 表示窗口數(shù)據(jù)非活躍周期的時(shí)長(zhǎng)。
例如,我們需要計(jì)算每個(gè)用戶訪問(wèn)時(shí)間 12 小時(shí)內(nèi)的訂單量:
SELECT user,
SESSION_START(rowtime, INTERVAL ‘12’ HOUR) AS sStart,
SESSION_ROWTIME(rowtime, INTERVAL ‘12’ HOUR) AS sEnd,
SUM(amount)
FROM Orders
GROUP BY SESSION(rowtime, INTERVAL ‘12’ HOUR), user
Table API和SQL捆綁在flink-table Maven工件中。必須將以下依賴項(xiàng)添加到你的項(xiàng)目才能使用Table API和SQL:
另外,你需要為Flink的Scala批處理或流式API添加依賴項(xiàng)。對(duì)于批量查詢,您需要添加:
2. Flink SQL 實(shí)戰(zhàn)案例1) 批數(shù)據(jù)SQL
用法:
構(gòu)建Table運(yùn)行環(huán)境將DataSet注冊(cè)為一張表使用Table運(yùn)行環(huán)境的 sqlQuery 方法來(lái)執(zhí)行SQL語(yǔ)句
示例:使用Flink SQL統(tǒng)計(jì)用戶消費(fèi)訂單的總金額、最大金額、最小金額、訂單總數(shù)。
訂單id用戶名訂單日期消費(fèi)金額1Zhangsan2018-10-20 15:30358.5
測(cè)試數(shù)據(jù)(訂單ID、用戶名、訂單日期、訂單金額):
Order(1, "zhangsan", "2018-10-20 15:30", 358.5),
Order(2, "zhangsan", "2018-10-20 16:30", 131.5),
Order(3, "lisi", "2018-10-20 16:30", 127.5),
Order(4, "lisi", "2018-10-20 16:30", 328.5),
Order(5, "lisi", "2018-10-20 16:30", 432.5),
Order(6, "zhaoliu", "2018-10-20 22:30", 451.0),
Order(7, "zhaoliu", "2018-10-20 22:30", 362.0),
Order(8, "zhaoliu", "2018-10-20 22:30", 364.0),
Order(9, "zhaoliu", "2018-10-20 22:30", 341.0)
步驟:
獲取一個(gè)批處理運(yùn)行環(huán)境獲取一個(gè)Table運(yùn)行環(huán)境創(chuàng)建一個(gè)樣例類 Order 用來(lái)映射數(shù)據(jù)(訂單名、用戶名、訂單日期、訂單金額)基于本地 Order 集合創(chuàng)建一個(gè)DataSet source使用Table運(yùn)行環(huán)境將DataSet注冊(cè)為一張表使用SQL語(yǔ)句來(lái)操作數(shù)據(jù)(統(tǒng)計(jì)用戶消費(fèi)訂單的總金額、最大金額、最小金額、訂單總數(shù))使用TableEnv.toDataSet將Table轉(zhuǎn)換為DataSet打印測(cè)試
示例代碼:
import org.a(chǎn)pache.flink.a(chǎn)pi.scala.ExecutionEnvironment
import org.a(chǎn)pache.flink.table.a(chǎn)pi.{Table, TableEnvironment}
import org.a(chǎn)pache.flink.table.a(chǎn)pi.scala.BatchTableEnvironment
import org.a(chǎn)pache.flink.a(chǎn)pi.scala._
import org.a(chǎn)pache.flink.types.Row
*
* 使用Flink SQL統(tǒng)計(jì)用戶消費(fèi)訂單的總金額、最大金額、最小金額、訂單總數(shù)。
object BatchFlinkSqlDemo {
//3. 創(chuàng)建一個(gè)樣例類 Order 用來(lái)映射數(shù)據(jù)(訂單名、用戶名、訂單日期、訂單金額)
case class Order(id:Int, userName:String, createTime:String, money:Double)
def main(args: Array[String]): Unit = {
*
* 實(shí)現(xiàn)思路:
* 1. 獲取一個(gè)批處理運(yùn)行環(huán)境
* 2. 獲取一個(gè)Table運(yùn)行環(huán)境
* 3. 創(chuàng)建一個(gè)樣例類 Order 用來(lái)映射數(shù)據(jù)(訂單名、用戶名、訂單日期、訂單金額)
* 4. 基于本地 Order 集合創(chuàng)建一個(gè)DataSet source
* 5. 使用Table運(yùn)行環(huán)境將DataSet注冊(cè)為一張表
* 6. 使用SQL語(yǔ)句來(lái)操作數(shù)據(jù)(統(tǒng)計(jì)用戶消費(fèi)訂單的總金額、最大金額、最小金額、訂單總數(shù))
* 7. 使用TableEnv.toDataSet將Table轉(zhuǎn)換為DataSet
* 8. 打印測(cè)試
//1. 獲取一個(gè)批處理運(yùn)行環(huán)境
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
//2. 獲取一個(gè)Table運(yùn)行環(huán)境
val tabEnv: BatchTableEnvironment = TableEnvironment.getTableEnvironment(env)
//4. 基于本地 Order 集合創(chuàng)建一個(gè)DataSet source
val orderDataSet: DataSet[Order] = env.fromElements(
Order(1, "zhangsan", "2018-10-20 15:30", 358.5),
Order(2, "zhangsan", "2018-10-20 16:30", 131.5),
Order(3, "lisi", "2018-10-20 16:30", 127.5),
Order(4, "lisi", "2018-10-20 16:30", 328.5),
Order(5, "lisi", "2018-10-20 16:30", 432.5),
Order(6, "zhaoliu", "2018-10-20 22:30", 451.0),
Order(7, "zhaoliu", "2018-10-20 22:30", 362.0),
Order(8, "zhaoliu", "2018-10-20 22:30", 364.0),
Order(9, "zhaoliu", "2018-10-20 22:30", 341.0)
)
//5. 使用Table運(yùn)行環(huán)境將DataSet注冊(cè)為一張表
tabEnv.registerDataSet("t_order", orderDataSet)
//6. 使用SQL語(yǔ)句來(lái)操作數(shù)據(jù)(統(tǒng)計(jì)用戶消費(fèi)訂單的總金額、最大金額、最小金額、訂單總數(shù))
//用戶消費(fèi)訂單的總金額、最大金額、最小金額、訂單總數(shù)。
val sql =
"""
| select
| userName,
| sum(money) totalMoney,
| max(money) maxMoney,
| min(money) minMoney,
| count(1) totalCount
| from t_order
| group by userName
|""".stripMargin //在scala中stripMargin默認(rèn)是“|”作為多行連接符
//7. 使用TableEnv.toDataSet將Table轉(zhuǎn)換為DataSet
val table: Table = tabEnv.sqlQuery(sql)
table.printSchema()
tabEnv.toDataSet[Row](table).print()
}
}
2) 流數(shù)據(jù)SQL
流處理中也可以支持SQL。但是需要注意以下幾點(diǎn):
要使用流處理的SQL,必須要添加水印時(shí)間使用 registerDataStream 注冊(cè)表的時(shí)候,使用 ' 來(lái)指定字段注冊(cè)表的時(shí)候,必須要指定一個(gè)rowtime,否則無(wú)法在SQL中使用窗口必須要導(dǎo)入 import org.a(chǎn)pache.flink.table.a(chǎn)pi.scala._ 隱式參數(shù)SQL中使用 trumble(時(shí)間列名, interval '時(shí)間' sencond) 來(lái)進(jìn)行定義窗口
示例:使用Flink SQL來(lái)統(tǒng)計(jì)5秒內(nèi) 用戶的 訂單總數(shù)、訂單的最大金額、訂單的最小金額。
步驟
獲取流處理運(yùn)行環(huán)境獲取Table運(yùn)行環(huán)境設(shè)置處理時(shí)間為 EventTime創(chuàng)建一個(gè)訂單樣例類 Order ,包含四個(gè)字段(訂單ID、用戶ID、訂單金額、時(shí)間戳)創(chuàng)建一個(gè)自定義數(shù)據(jù)源使用for循環(huán)生成1000個(gè)訂單隨機(jī)生成訂單ID(UUID)隨機(jī)生成用戶ID(0-2)隨機(jī)生成訂單金額(0-100)時(shí)間戳為當(dāng)前系統(tǒng)時(shí)間每隔1秒生成一個(gè)訂單添加水印,允許延遲2秒導(dǎo)入 import org.a(chǎn)pache.flink.table.a(chǎn)pi.scala._ 隱式參數(shù)使用 registerDataStream 注冊(cè)表,并分別指定字段,還要指定rowtime字段編寫(xiě)SQL語(yǔ)句統(tǒng)計(jì)用戶訂單總數(shù)、最大金額、最小金額分組時(shí)要使用 tumble(時(shí)間列, interval '窗口時(shí)間' second) 來(lái)創(chuàng)建窗口使用 tableEnv.sqlQuery 執(zhí)行sql語(yǔ)句將SQL的執(zhí)行結(jié)果轉(zhuǎn)換成DataStream再打印出來(lái)啟動(dòng)流處理程序
示例代碼:
import java.util.UUID
import java.util.concurrent.TimeUnit
import org.a(chǎn)pache.flink.streaming.a(chǎn)pi.TimeCharacteristic
import org.a(chǎn)pache.flink.streaming.a(chǎn)pi.functions.source.{RichSourceFunction, SourceFunction}
import org.a(chǎn)pache.flink.streaming.a(chǎn)pi.scala.{DataStream, StreamExecutionEnvironment}
import org.a(chǎn)pache.flink.table.a(chǎn)pi.{Table, TableEnvironment}
import org.a(chǎn)pache.flink.a(chǎn)pi.scala._
import org.a(chǎn)pache.flink.streaming.a(chǎn)pi.functions.AssignerWithPeriodicWatermarks
import org.a(chǎn)pache.flink.streaming.a(chǎn)pi.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.a(chǎn)pache.flink.streaming.a(chǎn)pi.watermark.Watermark
import org.a(chǎn)pache.flink.streaming.a(chǎn)pi.windowing.time.Time
import org.a(chǎn)pache.flink.types.Row
import scala.util.Random
*
* 需求:
* 使用Flink SQL來(lái)統(tǒng)計(jì)5秒內(nèi) 用戶的 訂單總數(shù)、訂單的最大金額、訂單的最小金額
*
* timestamp是關(guān)鍵字不能作為字段的名字(關(guān)鍵字不能作為字段名字)
object StreamFlinkSqlDemo {
*
* 1. 獲取流處理運(yùn)行環(huán)境
* 2. 獲取Table運(yùn)行環(huán)境
* 3. 設(shè)置處理時(shí)間為 EventTime
* 4. 創(chuàng)建一個(gè)訂單樣例類 Order ,包含四個(gè)字段(訂單ID、用戶ID、訂單金額、時(shí)間戳)
* 5. 創(chuàng)建一個(gè)自定義數(shù)據(jù)源
* 使用for循環(huán)生成1000個(gè)訂單
* 隨機(jī)生成訂單ID(UUID)
* 隨機(jī)生成用戶ID(0-2)
* 隨機(jī)生成訂單金額(0-100)
* 時(shí)間戳為當(dāng)前系統(tǒng)時(shí)間
* 每隔1秒生成一個(gè)訂單
* 6. 添加水印,允許延遲2秒
* 7. 導(dǎo)入 import org.a(chǎn)pache.flink.table.a(chǎn)pi.scala._ 隱式參數(shù)
* 8. 使用 registerDataStream 注冊(cè)表,并分別指定字段,還要指定rowtime字段
* 9. 編寫(xiě)SQL語(yǔ)句統(tǒng)計(jì)用戶訂單總數(shù)、最大金額、最小金額
* 分組時(shí)要使用 tumble(時(shí)間列, interval '窗口時(shí)間' second) 來(lái)創(chuàng)建窗口
* 10. 使用 tableEnv.sqlQuery 執(zhí)行sql語(yǔ)句
* 11. 將SQL的執(zhí)行結(jié)果轉(zhuǎn)換成DataStream再打印出來(lái)
* 12. 啟動(dòng)流處理程序
// 3. 創(chuàng)建一個(gè)訂單樣例類`Order`,包含四個(gè)字段(訂單ID、用戶ID、訂單金額、時(shí)間戳)
case class Order(orderId:String, userId:Int, money:Long, createTime:Long)
def main(args: Array[String]): Unit = {
// 1. 創(chuàng)建流處理運(yùn)行環(huán)境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
// 2. 設(shè)置處理時(shí)間為`EventTime`
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//獲取table的運(yùn)行環(huán)境
val tableEnv = TableEnvironment.getTableEnvironment(env)
// 4. 創(chuàng)建一個(gè)自定義數(shù)據(jù)源
val orderDataStream = env.a(chǎn)ddSource(new RichSourceFunction[Order] {
var isRunning = true
override def run(ctx: SourceFunction.SourceContext[Order]): Unit = {
// - 隨機(jī)生成訂單ID(UUID)
// - 隨機(jī)生成用戶ID(0-2)
// - 隨機(jī)生成訂單金額(0-100)
// - 時(shí)間戳為當(dāng)前系統(tǒng)時(shí)間
// - 每隔1秒生成一個(gè)訂單
for (i <- 0 until 1000 if isRunning) {
val order = Order(UUID.randomUUID().toString, Random.nextInt(3), Random.nextInt(101),
System.currentTimeMillis())
TimeUnit.SECONDS.sleep(1)
ctx.collect(order)
}
}
override def cancel(): Unit = { isRunning = false }
})
// 5. 添加水印,允許延遲2秒
val watermarkDataStream = orderDataStream.a(chǎn)ssignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor[Order](Time.seconds(2)) {
override def extractTimestamp(element: Order): Long = {
val eventTime = element.createTime
eventTime
}
}
)
// 6. 導(dǎo)入`import org.a(chǎn)pache.flink.table.a(chǎn)pi.scala._`隱式參數(shù)
// 7. 使用`registerDataStream`注冊(cè)表,并分別指定字段,還要指定rowtime字段
import org.a(chǎn)pache.flink.table.a(chǎn)pi.scala._
tableEnv.registerDataStream("t_order", watermarkDataStream, 'orderId, 'userId, 'money,'createTime.rowtime)
// 8. 編寫(xiě)SQL語(yǔ)句統(tǒng)計(jì)用戶訂單總數(shù)、最大金額、最小金額
// - 分組時(shí)要使用`tumble(時(shí)間列, interval '窗口時(shí)間' second)`來(lái)創(chuàng)建窗口
val sql =
"""
|select
| userId,
| count(1) as totalCount,
| max(money) as maxMoney,
| min(money) as minMoney
| from
| t_order
| group by
| tumble(createTime, interval '5' second),
| userId
""".stripMargin
// 9. 使用`tableEnv.sqlQuery`執(zhí)行sql語(yǔ)句
val table: Table = tableEnv.sqlQuery(sql)
// 10. 將SQL的執(zhí)行結(jié)果轉(zhuǎn)換成DataStream再打印出來(lái)
table.toRetractStream[Row].print()
env.execute("StreamSQLApp")
}
}
九、Flink CEP
我們?cè)诳粗辈サ臅r(shí)候,不管對(duì)于主播還是用戶來(lái)說(shuō),非常重要的一項(xiàng)就是彈幕文化。為了增加直播趣味性和互動(dòng)性, 各大網(wǎng)絡(luò)直播平臺(tái)紛紛采用彈窗彈幕作為用戶實(shí)時(shí)交流的方式,內(nèi)容豐富且形式多樣的彈幕數(shù)據(jù)中隱含著復(fù)雜的用戶屬性與用戶行為, 研究并理解在線直播平臺(tái)用戶具有彈幕內(nèi)容審核與監(jiān)控、輿論熱點(diǎn)預(yù)測(cè)、個(gè)性化摘要標(biāo)注等多方面的應(yīng)用價(jià)值。
本文不分析彈幕數(shù)據(jù)的應(yīng)用價(jià)值,只通過(guò)彈幕內(nèi)容審核與監(jiān)控案例來(lái)了解下Flink CEP的概念及功能。
在用戶發(fā)彈幕時(shí),直播平臺(tái)主要實(shí)時(shí)監(jiān)控識(shí)別兩類彈幕內(nèi)容:一類是發(fā)布不友善彈幕的用戶 ,一類是刷屏的用戶。
我們先記住上述需要實(shí)時(shí)監(jiān)控識(shí)別的兩類用戶,接下來(lái)介紹Flink CEP的API,然后使用CEP解決上述問(wèn)題。
本文首發(fā)于公眾號(hào)【五分鐘學(xué)大數(shù)據(jù)】,大數(shù)據(jù)領(lǐng)域原創(chuàng)技術(shù)號(hào)
1. Flink CEP 是什么
Flink CEP是一個(gè)基于Flink的復(fù)雜事件處理庫(kù),可以從多個(gè)數(shù)據(jù)流中發(fā)現(xiàn)復(fù)雜事件,識(shí)別有意義的事件(例如機(jī)會(huì)或者威脅),并盡快的做出響應(yīng),而不是需要等待幾天或則幾個(gè)月相當(dāng)長(zhǎng)的時(shí)間,才發(fā)現(xiàn)問(wèn)題。
2. Flink CEP API
CEP API的核心是Pattern(模式) API,它允許你快速定義復(fù)雜的事件模式。每個(gè)模式包含多個(gè)階段(stage)或者我們也可稱為狀態(tài)(state)。從一個(gè)狀態(tài)切換到另一個(gè)狀態(tài),用戶可以指定條件,這些條件可以作用在鄰近的事件或獨(dú)立事件上。
介紹API之前先來(lái)理解幾個(gè)概念:
1) 模式與模式序列
簡(jiǎn)單模式稱為模式,將最終在數(shù)據(jù)流中進(jìn)行搜索匹配的復(fù)雜模式序列稱為模式序列,每個(gè)復(fù)雜模式序列是由多個(gè)簡(jiǎn)單模式組成。
每個(gè)模式必須具有唯一的名稱,我們可以使用模式名稱來(lái)標(biāo)識(shí)該模式匹配到的事件。
2) 單個(gè)模式
一個(gè)模式既可以是單例的,也可以是循環(huán)的。單例模式接受單個(gè)事件,循環(huán)模式可以接受多個(gè)事件。
3) 模式示例:
有如下模式:a b+ c?d
其中a,b,c,d這些字母代表的是模式,+代表循環(huán),b+就是循環(huán)模式;?代表可選,c?就是可選模式;
所以上述模式的意思就是:a后面可以跟一個(gè)或多個(gè)b,后面再可選的跟c,最后跟d。
其中a、c? 、d是單例模式,b+是循環(huán)模式。
一般情況下,模式都是單例模式,可以使用量詞(Quantifiers)將其轉(zhuǎn)換為循環(huán)模式。
每個(gè)模式可以帶有一個(gè)或多個(gè)條件,這些條件是基于事件接收進(jìn)行定義的;蛘哒f(shuō),每個(gè)模式通過(guò)一個(gè)或多個(gè)條件來(lái)匹配和接收事件。
了解完上述概念后,接下來(lái)介紹下案例中需要用到的幾個(gè)CEP API:
4) 案例中用到的CEP API:
Begin:定義一個(gè)起始模式狀態(tài)
用法:start = Pattern.
Next:附加一個(gè)新的模式狀態(tài)。匹配事件必須直接接續(xù)上一個(gè)匹配事件
用法:next = start.next("next");
Where:定義當(dāng)前模式狀態(tài)的過(guò)濾條件。僅當(dāng)事件通過(guò)過(guò)濾器時(shí),它才能與狀態(tài)匹配
用法:patternState.where(_.message == "yyds");
Within: 定義事件序列與模式匹配的最大時(shí)間間隔。如果未完成的事件序列超過(guò)此時(shí)間,則將其丟棄
用法:patternState.within(Time.seconds(10));
Times:一個(gè)給定類型的事件出現(xiàn)了指定次數(shù)
用法:patternState.times(5);
API 先介紹以上這幾個(gè),接下來(lái)我們解決下文章開(kāi)頭提到的案例:
3. 監(jiān)測(cè)用戶彈幕行為案例案例一:監(jiān)測(cè)惡意用戶
規(guī)則:用戶如果在10s內(nèi),同時(shí)輸入 TMD 超過(guò)5次,就認(rèn)為用戶為惡意攻擊,識(shí)別出該用戶。
使用 Flink CEP 檢測(cè)惡意用戶:
import org.a(chǎn)pache.flink.a(chǎn)pi.scala._
import org.a(chǎn)pache.flink.cep.PatternSelectFunction
import org.a(chǎn)pache.flink.cep.scala.{CEP, PatternStream}
import org.a(chǎn)pache.flink.cep.scala.pattern.Pattern
import org.a(chǎn)pache.flink.streaming.a(chǎn)pi.TimeCharacteristic
import org.a(chǎn)pache.flink.streaming.a(chǎn)pi.scala.{DataStream, OutputTag, StreamExecutionEnvironment}
import org.a(chǎn)pache.flink.streaming.a(chǎn)pi.windowing.time.Time
object BarrageBehavior01 {
case class LoginEvent(userId:String, message:String, timestamp:Long){
override def toString: String = userId
}
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 使用IngestionTime作為EventTime
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
// 用于觀察測(cè)試數(shù)據(jù)處理順序
env.setParallelism(1)
// 模擬數(shù)據(jù)源
val loginEventStream: DataStream[LoginEvent] = env.fromCollection(
List(
LoginEvent("1", "TMD", 1618498576),
LoginEvent("1", "TMD", 1618498577),
LoginEvent("1", "TMD", 1618498579),
LoginEvent("1", "TMD", 1618498582),
LoginEvent("2", "TMD", 1618498583),
LoginEvent("1", "TMD", 1618498585)
)
).a(chǎn)ssignAscendingTimestamps(_.timestamp * 1000)
//定義模式
val loginEventPattern: Pattern[LoginEvent, LoginEvent] = Pattern.begin[LoginEvent]("begin")
.where(_.message == "TMD")
.times(5)
.within(Time.seconds(10))
//匹配模式
val patternStream: PatternStream[LoginEvent] = CEP.pattern(loginEventStream.keyBy(_.userId), loginEventPattern)
import scala.collection.Map
val result = patternStream.select((pattern:Map[String, Iterable[LoginEvent]])=> {
val first = pattern.getOrElse("begin", null).iterator.next()
(first.userId, first.timestamp)
})
//惡意用戶,實(shí)際處理可將按用戶進(jìn)行禁言等處理,為簡(jiǎn)化此處僅打印出該用戶
result.print("惡意用戶>>>")
env.execute("BarrageBehavior01")
}
}
案例二:監(jiān)測(cè)刷屏用戶
規(guī)則:用戶如果在10s內(nèi),同時(shí)連續(xù)輸入同樣一句話超過(guò)5次,就認(rèn)為是惡意刷屏。
使用 Flink CEP檢測(cè)刷屏用戶
object BarrageBehavior02 {
case class Message(userId: String, ip: String, msg: String)
def main(args: Array[String]): Unit = {
//初始化運(yùn)行環(huán)境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//設(shè)置并行度
env.setParallelism(1)
// 模擬數(shù)據(jù)源
val loginEventStream: DataStream[Message] = env.fromCollection(
List(
Message("1", "192.168.0.1", "beijing"),
Message("1", "192.168.0.2", "beijing"),
Message("1", "192.168.0.3", "beijing"),
Message("1", "192.168.0.4", "beijing"),
Message("2", "192.168.10.10", "shanghai"),
Message("3", "192.168.10.10", "beijing"),
Message("3", "192.168.10.11", "beijing"),
Message("4", "192.168.10.10", "beijing"),
Message("5", "192.168.10.11", "shanghai"),
Message("4", "192.168.10.12", "beijing"),
Message("5", "192.168.10.13", "shanghai"),
Message("5", "192.168.10.14", "shanghai"),
Message("5", "192.168.10.15", "beijing"),
Message("6", "192.168.10.16", "beijing"),
Message("6", "192.168.10.17", "beijing"),
Message("6", "192.168.10.18", "beijing"),
Message("5", "192.168.10.18", "shanghai"),
Message("6", "192.168.10.19", "beijing"),
Message("6", "192.168.10.19", "beijing"),
Message("5", "192.168.10.18", "shanghai")
)
)
//定義模式
val loginbeijingPattern = Pattern.begin[Message]("start")
.where(_.msg != null) //一條登錄失敗
.times(5).optional //將滿足五次的數(shù)據(jù)配對(duì)打印
.within(Time.seconds(10))
//進(jìn)行分組匹配
val loginbeijingDataPattern = CEP.pattern(loginEventStream.keyBy(_.userId), loginbeijingPattern)
//查找符合規(guī)則的數(shù)據(jù)
val loginbeijingResult: DataStream[Option[Iterable[Message]]] = loginbeijingDataPattern.select(patternSelectFun = (pattern: collection.Map[String, Iterable[Message]]) => {
var loginEventList: Option[Iterable[Message]] = null
loginEventList = pattern.get("start") match {
case Some(value) => {
if (value.toList.map(x => (x.userId, x.msg)).distinct.size == 1) {
Some(value)
} else {
None
}
}
}
loginEventList
})
//打印測(cè)試
loginbeijingResult.filter(x=>x!=None).map(x=>{
x match {
case Some(value)=> value
}
}).print()
env.execute("BarrageBehavior02)
}
}
4. Flink CEP API
除了案例中介紹的幾個(gè)API外,我們?cè)诮榻B下其他的常用API:
1) 條件 API
為了讓傳入事件被模式所接受,給模式指定傳入事件必須滿足的條件,這些條件由事件本身的屬性或者前面匹配過(guò)的事件的屬性統(tǒng)計(jì)量等來(lái)設(shè)定。比如,事件的某個(gè)值大于5,或者大于先前接受事件的某個(gè)值的平均值。
可以使用pattern.where()、pattern.or()、pattern.until()方法來(lái)指定條件。條件既可以是迭代條件IterativeConditions,也可以是簡(jiǎn)單條件SimpleConditions。
FlinkCEP支持事件之間的三種臨近條件:
next():嚴(yán)格的滿足條件
示例:模式為begin("first").where(_.name='a').next("second").where(.name='b')當(dāng)且僅當(dāng)數(shù)據(jù)為a,b時(shí),模式才會(huì)被命中。如果數(shù)據(jù)為a,c,b,由于a的后面跟了c,所以a會(huì)被直接丟棄,模式不會(huì)命中。
followedBy():松散的滿足條件
示例:模式為begin("first").where(_.name='a').followedBy("second").where(.name='b')當(dāng)且僅當(dāng)數(shù)據(jù)為a,b或者為a,c,b,模式均被命中,中間的c會(huì)被忽略掉。
followedByAny():非確定的松散滿足條件
示例:模式為begin("first").where(_.name='a').followedByAny("second").where(.name='b')當(dāng)且僅當(dāng)數(shù)據(jù)為a,c,b,b時(shí),對(duì)于followedBy模式而言命中的為{a,b},對(duì)于followedByAny而言會(huì)有兩次命中{a,b},{a,b}。
2) 量詞 API
還記得我們?cè)谏厦嬷v解模式的概念時(shí)說(shuō)過(guò)的一句話嘛:一般情況下,模式都是單例模式,可以使用量詞(Quantifiers)將其轉(zhuǎn)換為循環(huán)模式。這里的量詞就是指的量詞API。
以下這幾個(gè)量詞API,可以將模式指定為循環(huán)模式:
pattern.oneOrMore():一個(gè)給定的事件有一次或多次出現(xiàn),例如上面提到的b+。
pattern.times(#ofTimes):一個(gè)給定類型的事件出現(xiàn)了指定次數(shù),例如4次。
pattern.times(#fromTimes, #toTimes):一個(gè)給定類型的事件出現(xiàn)的次數(shù)在指定次數(shù)范圍內(nèi),例如2~4次。
可以使用pattern.greedy()方法將模式變成循環(huán)模式,但是不能讓一組模式都變成循環(huán)模式。greedy:就是盡可能的重復(fù)。
使用pattern.optional()方法將循環(huán)模式變成可選的,即可以是循環(huán)模式也可以是單個(gè)模式。
3) 匹配后的跳過(guò)策略
所謂的匹配跳過(guò)策略,是對(duì)多個(gè)成功匹配的模式進(jìn)行篩選。也就是說(shuō)如果多個(gè)匹配成功,可能我不需要這么多,按照匹配策略,過(guò)濾下就可以。
Flink中有五種跳過(guò)策略:
NO_SKIP: 不過(guò)濾,所有可能的匹配都會(huì)被發(fā)出。
SKIP_TO_NEXT: 丟棄與開(kāi)始匹配到的事件相同的事件,發(fā)出開(kāi)始匹配到的事件,即直接跳到下一個(gè)模式匹配到的事件,以此類推。
SKIP_PAST_LAST_EVENT: 丟棄匹配開(kāi)始后但結(jié)束之前匹配到的事件。
SKIP_TO_FIRST[PatternName]: 丟棄匹配開(kāi)始后但在PatternName模式匹配到的第一個(gè)事件之前匹配到的事件。
SKIP_TO_LAST[PatternName]: 丟棄匹配開(kāi)始后但在PatternName模式匹配到的最后一個(gè)事件之前匹配到的事件。
怎么理解上述策略,我們以NO_SKIP和SKIP_PAST_LAST_EVENT為例講解下:
在模式為:begin("start").where(_.name='a').oneOrMore().followedBy("second").where(_.name='b')中,我們輸入數(shù)據(jù):a,a,a,a,b ,如果是NO_SKIP策略,即不過(guò)濾策略,模式匹配到的是:{a,b},{a,a,b},{a,a,a,b},{a,a,a,a,b};如果是SKIP_PAST_LAST_EVENT策略,即丟棄匹配開(kāi)始后但結(jié)束之前匹配到的事件,模式匹配到的是:{a,a,a,a,b}。
5. Flink CEP 的使用場(chǎng)景
除上述案例場(chǎng)景外,Flink CEP 還廣泛用于網(wǎng)絡(luò)欺詐,故障檢測(cè),風(fēng)險(xiǎn)規(guī)避,智能營(yíng)銷等領(lǐng)域。
1) 實(shí)時(shí)反作弊和風(fēng)控
對(duì)于電商來(lái)說(shuō),羊毛黨是必不可少的,國(guó)內(nèi)拼多多曾爆出 100 元的無(wú)門(mén)檻券隨便領(lǐng),當(dāng)晚被人褥幾百億,對(duì)于這種情況肯定是沒(méi)有做好及時(shí)的風(fēng)控。另外還有就是商家上架商品時(shí)通過(guò)頻繁修改商品的名稱和濫用標(biāo)題來(lái)提高搜索關(guān)鍵字的排名、批量注冊(cè)一批機(jī)器賬號(hào)快速刷單來(lái)提高商品的銷售量等作弊行為,各種各樣的作弊手法也是需要不斷的去制定規(guī)則去匹配這種行為。
2) 實(shí)時(shí)營(yíng)銷
分析用戶在手機(jī) APP 的實(shí)時(shí)行為,統(tǒng)計(jì)用戶的活動(dòng)周期,通過(guò)為用戶畫(huà)像來(lái)給用戶進(jìn)行推薦。比如用戶在登錄 APP 后 1 分鐘內(nèi)只瀏覽了商品沒(méi)有下單;用戶在瀏覽一個(gè)商品后,3 分鐘內(nèi)又去查看其他同類的商品,進(jìn)行比價(jià)行為;用戶商品下單后 1 分鐘內(nèi)是否支付了該訂單。如果這些數(shù)據(jù)都可以很好的利用起來(lái),那么就可以給用戶推薦瀏覽過(guò)的類似商品,這樣可以大大提高購(gòu)買(mǎi)率。
3) 實(shí)時(shí)網(wǎng)絡(luò)攻擊檢測(cè)
當(dāng)下互聯(lián)網(wǎng)安全形勢(shì)仍然嚴(yán)峻,網(wǎng)絡(luò)攻擊屢見(jiàn)不鮮且花樣眾多,這里我們以 DDOS(分布式拒絕服務(wù)攻擊)產(chǎn)生的流入流量來(lái)作為遭受攻擊的判斷依據(jù)。對(duì)網(wǎng)絡(luò)遭受的潛在攻擊進(jìn)行實(shí)時(shí)檢測(cè)并給出預(yù)警,云服務(wù)廠商的多個(gè)數(shù)據(jù)中心會(huì)定時(shí)向監(jiān)控中心上報(bào)其瞬時(shí)流量,如果流量在預(yù)設(shè)的正常范圍內(nèi)則認(rèn)為是正,F(xiàn)象,不做任何操作;如果某數(shù)據(jù)中心在 10 秒內(nèi)連續(xù) 5 次上報(bào)的流量超過(guò)正常范圍的閾值,則觸發(fā)一條警告的事件;如果某數(shù)據(jù)中心 30 秒內(nèi)連續(xù)出現(xiàn) 30 次上報(bào)的流量超過(guò)正常范圍的閾值,則觸發(fā)嚴(yán)重的告警。
6. Flink CEP 的原理簡(jiǎn)單介紹
Apache Flink在實(shí)現(xiàn)CEP時(shí)借鑒了Efficient Pattern Matching over Event Streams論文中NFA的模型,在這篇論文中,還提到了一些優(yōu)化,我們?cè)谶@里先跳過(guò),只說(shuō)下NFA的概念。
在這篇論文中,提到了NFA,也就是Non-determined Finite Automaton,叫做不確定的有限狀態(tài)機(jī),指的是狀態(tài)有限,但是每個(gè)狀態(tài)可能被轉(zhuǎn)換成多個(gè)狀態(tài)(不確定)。
非確定有限自動(dòng)狀態(tài)機(jī)
先介紹兩個(gè)概念:
狀態(tài):狀態(tài)分為三類,起始狀態(tài)、中間狀態(tài)和最終狀態(tài)。
轉(zhuǎn)換:take/ignore/proceed都是轉(zhuǎn)換的名稱。
在NFA匹配規(guī)則里,本質(zhì)上是一個(gè)狀態(tài)轉(zhuǎn)換的過(guò)程。三種轉(zhuǎn)換的含義如下所示:
Take: 主要是條件的判斷,當(dāng)過(guò)來(lái)一條數(shù)據(jù)進(jìn)行判斷,一旦滿足條件,獲取當(dāng)前元素,放入到結(jié)果集中,然后將當(dāng)前狀態(tài)轉(zhuǎn)移到下一個(gè)的狀態(tài)。
Proceed:當(dāng)前的狀態(tài)可以不依賴任何的事件轉(zhuǎn)移到下一個(gè)狀態(tài),比如說(shuō)透?jìng)鞯囊馑肌?/p>
Ignore:當(dāng)一條數(shù)據(jù)到來(lái)的時(shí)候,可以忽略這個(gè)消息事件,當(dāng)前的狀態(tài)保持不變,相當(dāng)于自己到自己的一個(gè)狀態(tài)。
NFA的特點(diǎn):在NFA中,給定當(dāng)前狀態(tài),可能有多個(gè)下一個(gè)狀態(tài)?梢噪S機(jī)選擇下一個(gè)狀態(tài),也可以并行(同時(shí))選擇下一個(gè)狀態(tài)。輸入符號(hào)可以為空。
7. 規(guī)則引擎
規(guī)則引擎:將業(yè)務(wù)決策從應(yīng)用程序代碼中分離出來(lái),并使用預(yù)定義的語(yǔ)義模塊編寫(xiě)業(yè)務(wù)決策。接受數(shù)據(jù)輸入,解釋業(yè)務(wù)規(guī)則,并根據(jù)業(yè)務(wù)規(guī)則做出業(yè)務(wù)決策。
使用規(guī)則引擎可以通過(guò)降低實(shí)現(xiàn)復(fù)雜業(yè)務(wù)邏輯的組件的復(fù)雜性,降低應(yīng)用程序的維護(hù)和可擴(kuò)展性成本。
1) Drools
Drools 是一款使用 Java 編寫(xiě)的開(kāi)源規(guī)則引擎,通常用來(lái)解決業(yè)務(wù)代碼與業(yè)務(wù)規(guī)則的分離,它內(nèi)置的 Drools Fusion 模塊也提供 CEP 的功能。
優(yōu)勢(shì):
功能較為完善,具有如系統(tǒng)監(jiān)控、操作平臺(tái)等功能。規(guī)則支持動(dòng)態(tài)更新。
劣勢(shì):
以內(nèi)存實(shí)現(xiàn)時(shí)間窗功能,無(wú)法支持較長(zhǎng)跨度的時(shí)間窗。無(wú)法有效支持定時(shí)觸達(dá)(如用戶在瀏覽發(fā)生一段時(shí)間后觸達(dá)條件判斷)。2) Aviator
Aviator 是一個(gè)高性能、輕量級(jí)的 Java 語(yǔ)言實(shí)現(xiàn)的表達(dá)式求值引擎,主要用于各種表達(dá)式的動(dòng)態(tài)求值。
優(yōu)勢(shì):
支持大部分運(yùn)算操作符。支持函數(shù)調(diào)用和自定義函數(shù)。支持正則表達(dá)式匹配。支持傳入變量并且性能優(yōu)秀。
劣勢(shì):
沒(méi)有 if else、do while 等語(yǔ)句,沒(méi)有賦值語(yǔ)句,沒(méi)有位運(yùn)算符。3) EasyRules
EasyRules 集成了 MVEL 和 SpEL 表達(dá)式的一款輕量級(jí)規(guī)則引擎。
優(yōu)勢(shì):
輕量級(jí)框架,學(xué)習(xí)成本低。基于 POJO。為定義業(yè)務(wù)引擎提供有用的抽象和簡(jiǎn)便的應(yīng)用。支持從簡(jiǎn)單的規(guī)則組建成復(fù)雜規(guī)則。4) Esper
Esper 設(shè)計(jì)目標(biāo)為 CEP 的輕量級(jí)解決方案,可以方便的嵌入服務(wù)中,提供 CEP 功能。
優(yōu)勢(shì):
輕量級(jí)可嵌入開(kāi)發(fā),常用的 CEP 功能簡(jiǎn)單好用。EPL 語(yǔ)法與 SQL 類似,學(xué)習(xí)成本較低。
劣勢(shì):
單機(jī)全內(nèi)存方案,需要整合其他分布式和存儲(chǔ)。以內(nèi)存實(shí)現(xiàn)時(shí)間窗功能,無(wú)法支持較長(zhǎng)跨度的時(shí)間窗。無(wú)法有效支持定時(shí)觸達(dá)(如用戶在瀏覽發(fā)生一段時(shí)間后觸達(dá)條件判斷)。5) Flink CEP
Flink 是一個(gè)流式系統(tǒng),具有高吞吐低延遲的特點(diǎn),Flink CEP 是一套極具通用性、易于使用的實(shí)時(shí)流式事件處理方案。
優(yōu)勢(shì):
繼承了 Flink 高吞吐的特點(diǎn)。事件支持存儲(chǔ)到外部,可以支持較長(zhǎng)跨度的時(shí)間窗?梢灾С侄〞r(shí)觸達(dá)(用 followedBy + PartternTimeoutFunction 實(shí)現(xiàn))。十、Flink CDC1. CDC是什么
CDC 是 Change Data Capture(變更數(shù)據(jù)獲取)的簡(jiǎn)稱。核心思想是,監(jiān)測(cè)并捕獲數(shù)據(jù)庫(kù)的變動(dòng)(包括數(shù)據(jù)或數(shù)據(jù)表的插入、更新以及刪除等),將這些變更按發(fā)生的順序完整記錄下來(lái),寫(xiě)入到消息中間件中以供其他服務(wù)進(jìn)行訂閱及消費(fèi)。
在廣義的概念上,只要能捕獲數(shù)據(jù)變更的技術(shù),我們都可以稱為 CDC 。通常我們說(shuō)的 CDC 技術(shù)主要面向數(shù)據(jù)庫(kù)的變更,是一種用于捕獲數(shù)據(jù)庫(kù)中數(shù)據(jù)變更的技術(shù)。
CDC 技術(shù)應(yīng)用場(chǎng)景非常廣泛:
數(shù)據(jù)同步,用于備份,容災(zāi);
數(shù)據(jù)分發(fā),一個(gè)數(shù)據(jù)源分發(fā)給多個(gè)下游;
數(shù)據(jù)采集(E),面向數(shù)據(jù)倉(cāng)庫(kù)/數(shù)據(jù)湖的 ETL 數(shù)據(jù)集成。
2. CDC 的種類
CDC 主要分為基于查詢和基于 Binlog 兩種方式,我們主要了解一下這兩種之間的區(qū)別:
基于查詢的 CDC基于 Binlog 的 CDC開(kāi)源產(chǎn)品Sqoop、Kafka JDBC SourceCanal、Maxwell、Debezium執(zhí)行模式BatchStreaming是否可以捕獲所有數(shù)據(jù)變化否是延遲性高延遲低延遲是否增加數(shù)據(jù)庫(kù)壓力是否3. 傳統(tǒng)CDC與Flink CDC對(duì)比1) 傳統(tǒng) CDC ETL 分析
2) 基于 Flink CDC 的 ETL 分析
2) 基于 Flink CDC 的聚合分析
2) 基于 Flink CDC 的數(shù)據(jù)打?qū)?/p>
4. Flink-CDC 案例
Flink 社區(qū)開(kāi)發(fā)了 flink-cdc-connectors 組件,這是一個(gè)可以直接從 MySQL、PostgreSQL等數(shù)據(jù)庫(kù)直接讀取全量數(shù)據(jù)和增量變更數(shù)據(jù)的 source 組件。
開(kāi)源地址:https://github.com/ververica/flink-cdc-connectors。
示例代碼:
import com.a(chǎn)libaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.a(chǎn)libaba.ververica.cdc.debezium.DebeziumSourceFunction;
import com.a(chǎn)libaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import org.a(chǎn)pache.flink.a(chǎn)pi.common.restartstrategy.RestartStrategies;
import org.a(chǎn)pache.flink.runtime.state.filesystem.FsStateBackend;
import org.a(chǎn)pache.flink.streaming.a(chǎn)pi.CheckpointingMode;
import org.a(chǎn)pache.flink.streaming.a(chǎn)pi.datastream.DataStreamSource;
import org.a(chǎn)pache.flink.streaming.a(chǎn)pi.environment.CheckpointConfig;
import org.a(chǎn)pache.flink.streaming.a(chǎn)pi.environment.StreamExecutionEnvironment;
import java.util.Properties;
public class FlinkCDC {
public static void main(String[] args) throws Exception {
//1.創(chuàng)建執(zhí)行環(huán)境
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//2.Flink-CDC 將讀取 binlog 的位置信息以狀態(tài)的方式保存在 CK,如果想要做到斷點(diǎn)
續(xù)傳,需要從 Checkpoint 或者 Savepoint 啟動(dòng)程序
//2.1 開(kāi)啟 Checkpoint,每隔 5 秒鐘做一次 CK
env.enableCheckpointing(5000L);
//2.2 指定 CK 的一致性語(yǔ)義
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
//2.3 設(shè)置任務(wù)關(guān)閉的時(shí)候保留最后一次 CK 數(shù)據(jù)
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckp
ointCleanup.RETAIN_ON_CANCELLATION);
//2.4 指定從 CK 自動(dòng)重啟策略
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 2000L));
//2.5 設(shè)置狀態(tài)后端
env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/flinkCDC"));
//2.6 設(shè)置訪問(wèn) HDFS 的用戶名
System.setProperty("HADOOP_USER_NAME", "atguigu");
//3.創(chuàng)建 Flink-MySQL-CDC 的 Source
//initial (default): Performs an initial snapshot on the monitored database tables upon
first startup, and continue to read the latest binlog.
//latest-offset: Never to perform snapshot on the monitored database tables upon first
startup, just read from the end of the binlog which means only have the changes since the
connector was started.
//timestamp: Never to perform snapshot on the monitored database tables upon first
startup, and directly read binlog from the specified timestamp. The consumer will traverse the
binlog from the beginning and ignore change events whose timestamp is smaller than the
specified timestamp.
//specific-offset: Never to perform snapshot on the monitored database tables upon
first startup, and directly read binlog from the specified offset.
DebeziumSourceFunction
雖然實(shí)時(shí)計(jì)算在最近幾年才火起來(lái),但是在早期也有部分公司有實(shí)時(shí)計(jì)算的需求,但是數(shù)據(jù)量比較少,所以在實(shí)時(shí)方面形成不了完整的體系,基本所有的開(kāi)發(fā)都是具體問(wèn)題具體分析,來(lái)一個(gè)需求做一個(gè),基本不考慮它們之間的關(guān)系,開(kāi)發(fā)形式如下:
早期實(shí)時(shí)計(jì)算
如上圖所示,拿到數(shù)據(jù)源后,會(huì)經(jīng)過(guò)數(shù)據(jù)清洗,擴(kuò)維,通過(guò)Flink進(jìn)行業(yè)務(wù)邏輯處理,最后直接進(jìn)行業(yè)務(wù)輸出。把這個(gè)環(huán)節(jié)拆開(kāi)來(lái)看,數(shù)據(jù)源端會(huì)重復(fù)引用相同的數(shù)據(jù)源,后面進(jìn)行清洗、過(guò)濾、擴(kuò)維等操作,都要重復(fù)做一遍,唯一不同的是業(yè)務(wù)的代碼邏輯是不一樣的。
隨著產(chǎn)品和業(yè)務(wù)人員對(duì)實(shí)時(shí)數(shù)據(jù)需求的不斷增多,這種開(kāi)發(fā)模式出現(xiàn)的問(wèn)題越來(lái)越多:
數(shù)據(jù)指標(biāo)越來(lái)越多,“煙囪式”的開(kāi)發(fā)導(dǎo)致代碼耦合問(wèn)題嚴(yán)重。
需求越來(lái)越多,有的需要明細(xì)數(shù)據(jù),有的需要 OLAP 分析。單一的開(kāi)發(fā)模式難以應(yīng)付多種需求。
每個(gè)需求都要申請(qǐng)資源,導(dǎo)致資源成本急速膨脹,資源不能集約有效利用。
缺少完善的監(jiān)控系統(tǒng),無(wú)法在對(duì)業(yè)務(wù)產(chǎn)生影響之前發(fā)現(xiàn)并修復(fù)問(wèn)題。
大家看實(shí)時(shí)數(shù)倉(cāng)的發(fā)展和出現(xiàn)的問(wèn)題,和離線數(shù)倉(cāng)非常類似,后期數(shù)據(jù)量大了之后產(chǎn)生了各種問(wèn)題,離線數(shù)倉(cāng)當(dāng)時(shí)是怎么解決的?離線數(shù)倉(cāng)通過(guò)分層架構(gòu)使數(shù)據(jù)解耦,多個(gè)業(yè)務(wù)可以共用數(shù)據(jù),實(shí)時(shí)數(shù)倉(cāng)是否也可以用分層架構(gòu)呢?當(dāng)然是可以的,但是細(xì)節(jié)上和離線的分層還是有一些不同,稍后會(huì)講到。
2. 實(shí)時(shí)數(shù)倉(cāng)建設(shè)
從方法論來(lái)講,實(shí)時(shí)和離線是非常相似的,離線數(shù)倉(cāng)早期的時(shí)候也是具體問(wèn)題具體分析,當(dāng)數(shù)據(jù)規(guī)模漲到一定量的時(shí)候才會(huì)考慮如何治理。分層是一種非常有效的數(shù)據(jù)治理方式,所以在實(shí)時(shí)數(shù)倉(cāng)如何進(jìn)行管理的問(wèn)題上,首先考慮的也是分層的處理邏輯。
實(shí)時(shí)數(shù)倉(cāng)的架構(gòu)如下圖:
實(shí)時(shí)數(shù)倉(cāng)架構(gòu)
從上圖中我們具體分析下每層的作用:
數(shù)據(jù)源:在數(shù)據(jù)源的層面,離線和實(shí)時(shí)在數(shù)據(jù)源是一致的,主要分為日志類和業(yè)務(wù)類,日志類又包括用戶日志,埋點(diǎn)日志以及服務(wù)器日志等。
實(shí)時(shí)明細(xì)層:在明細(xì)層,為了解決重復(fù)建設(shè)的問(wèn)題,要進(jìn)行統(tǒng)一構(gòu)建,利用離線數(shù)倉(cāng)的模式,建設(shè)統(tǒng)一的基礎(chǔ)明細(xì)數(shù)據(jù)層,按照主題進(jìn)行管理,明細(xì)層的目的是給下游提供直接可用的數(shù)據(jù),因此要對(duì)基礎(chǔ)層進(jìn)行統(tǒng)一的加工,比如清洗、過(guò)濾、擴(kuò)維等。
匯總層:匯總層通過(guò)Flink的簡(jiǎn)潔算子直接可以算出結(jié)果,并且形成匯總指標(biāo)池,所有的指標(biāo)都統(tǒng)一在匯總層加工,所有人按照統(tǒng)一的規(guī)范管理建設(shè),形成可復(fù)用的匯總結(jié)果。
我們可以看出,實(shí)時(shí)數(shù)倉(cāng)和離線數(shù)倉(cāng)的分層非常類似,比如 數(shù)據(jù)源層,明細(xì)層,匯總層,乃至應(yīng)用層,他們命名的模式可能都是一樣的。但仔細(xì)比較不難發(fā)現(xiàn),兩者有很多區(qū)別:
與離線數(shù)倉(cāng)相比,實(shí)時(shí)數(shù)倉(cāng)的層次更少一些:
從目前建設(shè)離線數(shù)倉(cāng)的經(jīng)驗(yàn)來(lái)看,數(shù)倉(cāng)的數(shù)據(jù)明細(xì)層內(nèi)容會(huì)非常豐富,處理明細(xì)數(shù)據(jù)外一般還會(huì)包含輕度匯總層的概念,另外離線數(shù)倉(cāng)中應(yīng)用層數(shù)據(jù)在數(shù)倉(cāng)內(nèi)部,但實(shí)時(shí)數(shù)倉(cāng)中,app 應(yīng)用層數(shù)據(jù)已經(jīng)落入應(yīng)用系統(tǒng)的存儲(chǔ)介質(zhì)中,可以把該層與數(shù)倉(cāng)的表分離。
應(yīng)用層少建設(shè)的好處:實(shí)時(shí)處理數(shù)據(jù)的時(shí)候,每建一個(gè)層次,數(shù)據(jù)必然會(huì)產(chǎn)生一定的延遲。
匯總層少建的好處:在匯總統(tǒng)計(jì)的時(shí)候,往往為了容忍一部分?jǐn)?shù)據(jù)的延遲,可能會(huì)人為的制造一些延遲來(lái)保證數(shù)據(jù)的準(zhǔn)確。舉例,在統(tǒng)計(jì)跨天相關(guān)的訂單事件中的數(shù)據(jù)時(shí),可能會(huì)等到 00:00:05 或者 00:00:10 再統(tǒng)計(jì),確保 00:00 前的數(shù)據(jù)已經(jīng)全部接受到位了,再進(jìn)行統(tǒng)計(jì)。所以,匯總層的層次太多的話,就會(huì)更大的加重人為造成的數(shù)據(jù)延遲。
與離線數(shù)倉(cāng)相比,實(shí)時(shí)數(shù)倉(cāng)的數(shù)據(jù)源存儲(chǔ)不同:
在建設(shè)離線數(shù)倉(cāng)的時(shí)候,基本整個(gè)離線數(shù)倉(cāng)都是建立在 Hive 表之上。但是,在建設(shè)實(shí)時(shí)數(shù)倉(cāng)的時(shí)候,同一份表,會(huì)使用不同的方式進(jìn)行存儲(chǔ)。比如常見(jiàn)的情況下,明細(xì)數(shù)據(jù)或者匯總數(shù)據(jù)都會(huì)存在 Kafka 里面,但是像城市、渠道等維度信息需要借助 Hbase,MySQL 或者其他 KV 存儲(chǔ)等數(shù)據(jù)庫(kù)來(lái)進(jìn)行存儲(chǔ)。3. Lambda架構(gòu)的實(shí)時(shí)數(shù)倉(cāng)
Lambda和Kappa架構(gòu)的概念已在前文中解釋,不了解的小伙伴可點(diǎn)擊鏈接:一文讀懂大數(shù)據(jù)實(shí)時(shí)計(jì)算
下圖是基于 Flink 和 Kafka 的 Lambda 架構(gòu)的具體實(shí)踐,上層是實(shí)時(shí)計(jì)算,下層是離線計(jì)算,橫向是按計(jì)算引擎來(lái)分,縱向是按實(shí)時(shí)數(shù)倉(cāng)來(lái)區(qū)分:
Lambda架構(gòu)的實(shí)時(shí)數(shù)倉(cāng)
Lambda架構(gòu)是比較經(jīng)典的架構(gòu),以前實(shí)時(shí)的場(chǎng)景不是很多,以離線為主,當(dāng)附加了實(shí)時(shí)場(chǎng)景后,由于離線和實(shí)時(shí)的時(shí)效性不同,導(dǎo)致技術(shù)生態(tài)是不一樣的。Lambda架構(gòu)相當(dāng)于附加了一條實(shí)時(shí)生產(chǎn)鏈路,在應(yīng)用層面進(jìn)行一個(gè)整合,雙路生產(chǎn),各自獨(dú)立。這在業(yè)務(wù)應(yīng)用中也是順理成章采用的一種方式。
雙路生產(chǎn)會(huì)存在一些問(wèn)題,比如加工邏輯double,開(kāi)發(fā)運(yùn)維也會(huì)double,資源同樣會(huì)變成兩個(gè)資源鏈路。因?yàn)榇嬖谝陨蠁?wèn)題,所以又演進(jìn)了一個(gè)Kappa架構(gòu)。
4. Kappa架構(gòu)的實(shí)時(shí)數(shù)倉(cāng)
Kappa架構(gòu)相當(dāng)于去掉了離線計(jì)算部分的Lambda架構(gòu),具體如下圖所示:
Kappa架構(gòu)的實(shí)時(shí)數(shù)倉(cāng)
Kappa架構(gòu)從架構(gòu)設(shè)計(jì)來(lái)講比較簡(jiǎn)單,生產(chǎn)統(tǒng)一,一套邏輯同時(shí)生產(chǎn)離線和實(shí)時(shí)。但是在實(shí)際應(yīng)用場(chǎng)景有比較大的局限性,因?yàn)閷?shí)時(shí)數(shù)據(jù)的同一份表,會(huì)使用不同的方式進(jìn)行存儲(chǔ),這就導(dǎo)致關(guān)聯(lián)時(shí)需要跨數(shù)據(jù)源,操作數(shù)據(jù)有很大局限性,所以在業(yè)內(nèi)直接用Kappa架構(gòu)生產(chǎn)落地的案例不多見(jiàn),且場(chǎng)景比較單一。
關(guān)于 Kappa 架構(gòu),熟悉實(shí)時(shí)數(shù)倉(cāng)生產(chǎn)的同學(xué),可能會(huì)有一個(gè)疑問(wèn)。因?yàn)槲覀兘?jīng)常會(huì)面臨業(yè)務(wù)變更,所以很多業(yè)務(wù)邏輯是需要去迭代的。之前產(chǎn)出的一些數(shù)據(jù),如果口徑變更了,就需要重算,甚至重刷歷史數(shù)據(jù)。對(duì)于實(shí)時(shí)數(shù)倉(cāng)來(lái)說(shuō),怎么去解決數(shù)據(jù)重算問(wèn)題?
Kappa 架構(gòu)在這一塊的思路是:首先要準(zhǔn)備好一個(gè)能夠存儲(chǔ)歷史數(shù)據(jù)的消息隊(duì)列,比如 Kafka,并且這個(gè)消息隊(duì)列是可以支持你從某個(gè)歷史的節(jié)點(diǎn)重新開(kāi)始消費(fèi)的。接著需要新起一個(gè)任務(wù),從原來(lái)比較早的一個(gè)時(shí)間節(jié)點(diǎn)去消費(fèi) Kafka 上的數(shù)據(jù),然后當(dāng)這個(gè)新的任務(wù)運(yùn)行的進(jìn)度已經(jīng)能夠和現(xiàn)在的正在跑的任務(wù)齊平的時(shí)候,你就可以把現(xiàn)在任務(wù)的下游切換到新的任務(wù)上面,舊的任務(wù)就可以停掉,并且原來(lái)產(chǎn)出的結(jié)果表也可以被刪掉。
5. 流批結(jié)合的實(shí)時(shí)數(shù)倉(cāng)
隨著實(shí)時(shí) OLAP 技術(shù)的發(fā)展,目前開(kāi)源的OLAP引擎在性能,易用等方面有了很大的提升,如Doris、Presto等,加上數(shù)據(jù)湖技術(shù)的迅速發(fā)展,使得流批結(jié)合的方式變得簡(jiǎn)單。
如下圖是流批結(jié)合的實(shí)時(shí)數(shù)倉(cāng):
流批結(jié)合的實(shí)時(shí)數(shù)倉(cāng)
數(shù)據(jù)從日志統(tǒng)一采集到消息隊(duì)列,再到實(shí)時(shí)數(shù)倉(cāng),作為基礎(chǔ)數(shù)據(jù)流的建設(shè)是統(tǒng)一的。之后對(duì)于日志類實(shí)時(shí)特征,實(shí)時(shí)大屏類應(yīng)用走實(shí)時(shí)流計(jì)算。對(duì)于Binlog類業(yè)務(wù)分析走實(shí)時(shí)OLAP批處理。
我們看到流批結(jié)合的方式與上面幾種架構(gòu)的存儲(chǔ)方式發(fā)生了變化,由Kafka換成了Iceberg,Iceberg是介于上層計(jì)算引擎和底層存儲(chǔ)格式之間的一個(gè)中間層,我們可以把它定義成一種“數(shù)據(jù)組織格式”,底層存儲(chǔ)還是HDFS,那么為什么加了中間層,就對(duì)流批結(jié)合處理的比較好了呢?Iceberg的ACID能力可以簡(jiǎn)化整個(gè)流水線的設(shè)計(jì),降低整個(gè)流水線的延遲,并且所具有的修改、刪除能力能夠有效地降低開(kāi)銷,提升效率。Iceberg可以有效支持批處理的高吞吐數(shù)據(jù)掃描和流計(jì)算按分區(qū)粒度并發(fā)實(shí)時(shí)處理。
十二、Flink 面試題1. Flink 的容錯(cuò)機(jī)制(checkpoint)
Checkpoint機(jī)制是Flink可靠性的基石,可以保證Flink集群在某個(gè)算子因?yàn)槟承┰?如 異常退出)出現(xiàn)故障時(shí),能夠?qū)⒄麄(gè)應(yīng)用流圖的狀態(tài)恢復(fù)到故障之前的某一狀態(tài),保證應(yīng)用流圖狀態(tài)的一致性。Flink的Checkpoint機(jī)制原理來(lái)自“Chandy-Lamport algorithm”算法。
每個(gè)需要Checkpoint的應(yīng)用在啟動(dòng)時(shí),Flink的JobManager為其創(chuàng)建一個(gè) CheckpointCoordinator(檢查點(diǎn)協(xié)調(diào)器),CheckpointCoordinator全權(quán)負(fù)責(zé)本應(yīng)用的快照制作。
CheckpointCoordinator(檢查點(diǎn)協(xié)調(diào)器),CheckpointCoordinator全權(quán)負(fù)責(zé)本應(yīng)用的快照制作。
CheckpointCoordinator(檢查點(diǎn)協(xié)調(diào)器) 周期性的向該流應(yīng)用的所有source算子發(fā)送 barrier(屏障)。
當(dāng)某個(gè)source算子收到一個(gè)barrier時(shí),便暫停數(shù)據(jù)處理過(guò)程,然后將自己的當(dāng)前狀態(tài)制作成快照,并保存到指定的持久化存儲(chǔ)中,最后向CheckpointCoordinator報(bào)告自己快照制作情況,同時(shí)向自身所有下游算子廣播該barrier,恢復(fù)數(shù)據(jù)處理
下游算子收到barrier之后,會(huì)暫停自己的數(shù)據(jù)處理過(guò)程,然后將自身的相關(guān)狀態(tài)制作成快照,并保存到指定的持久化存儲(chǔ)中,最后向CheckpointCoordinator報(bào)告自身快照情況,同時(shí)向自身所有下游算子廣播該barrier,恢復(fù)數(shù)據(jù)處理。
每個(gè)算子按照步驟3不斷制作快照并向下游廣播,直到最后barrier傳遞到sink算子,快照制作完成。
當(dāng)CheckpointCoordinator收到所有算子的報(bào)告之后,認(rèn)為該周期的快照制作成功; 否則,如果在規(guī)定的時(shí)間內(nèi)沒(méi)有收到所有算子的報(bào)告,則認(rèn)為本周期快照制作失敗。
文章推薦:
Flink可靠性的基石-checkpoint機(jī)制詳細(xì)解析
2. Flink Checkpoint與 Spark 的相比,Flink 有什么區(qū)別或優(yōu)勢(shì)嗎
Spark Streaming 的 Checkpoint 僅僅是針對(duì) Driver 的故障恢復(fù)做了數(shù)據(jù)和元數(shù)據(jù)的 Checkpoint。而 Flink 的 Checkpoint 機(jī)制要復(fù)雜了很多,它采用的是輕量級(jí)的分布式快照,實(shí)現(xiàn)了每個(gè)算子的快照,及流動(dòng)中的數(shù)據(jù)的快照。
3. Flink 中的 Time 有哪幾種
Flink中的時(shí)間有三種類型,如下圖所示:
Event Time:是事件創(chuàng)建的時(shí)間。它通常由事件中的時(shí)間戳描述,例如采集的日志數(shù)據(jù)中,每一條日志都會(huì)記錄自己的生成時(shí)間,Flink通過(guò)時(shí)間戳分配器訪問(wèn)事件時(shí)間戳。
Ingestion Time:是數(shù)據(jù)進(jìn)入Flink的時(shí)間。
Processing Time:是每一個(gè)執(zhí)行基于時(shí)間操作的算子的本地系統(tǒng)時(shí)間,與機(jī)器相關(guān),默認(rèn)的時(shí)間屬性就是Processing Time。
例如,一條日志進(jìn)入Flink的時(shí)間為2021-01-22 10:00:00.123,到達(dá)Window的系統(tǒng)時(shí)間為2021-01-22 10:00:01.234,日志的內(nèi)容如下:
2021-01-06 18:37:15.624 INFO Fail over to rm2
對(duì)于業(yè)務(wù)來(lái)說(shuō),要統(tǒng)計(jì)1min內(nèi)的故障日志個(gè)數(shù),哪個(gè)時(shí)間是最有意義的?—— eventTime,因?yàn)槲覀円鶕?jù)日志的生成時(shí)間進(jìn)行統(tǒng)計(jì)。
4. 對(duì)于遲到數(shù)據(jù)是怎么處理的
Flink中 WaterMark 和 Window 機(jī)制解決了流式數(shù)據(jù)的亂序問(wèn)題,對(duì)于因?yàn)檠舆t而順序有誤的數(shù)據(jù),可以根據(jù)eventTime進(jìn)行業(yè)務(wù)處理,對(duì)于延遲的數(shù)據(jù)Flink也有自己的解決辦法,主要的辦法是給定一個(gè)允許延遲的時(shí)間,在該時(shí)間范圍內(nèi)仍可以接受處理延遲數(shù)據(jù):
設(shè)置允許延遲的時(shí)間是通過(guò)allowedLateness(lateness: Time)設(shè)置
保存延遲數(shù)據(jù)則是通過(guò)sideOutputLateData(outputTag: OutputTag[T])保存
獲取延遲數(shù)據(jù)是通過(guò)DataStream.getSideOutput(tag: OutputTag[X])獲取
文章推薦:
Flink 中極其重要的 Time 與 Window 詳細(xì)解析
5. Flink 的運(yùn)行必須依賴 Hadoop 組件嗎
Flink可以完全獨(dú)立于Hadoop,在不依賴Hadoop組件下運(yùn)行。但是做為大數(shù)據(jù)的基礎(chǔ)設(shè)施,Hadoop體系是任何大數(shù)據(jù)框架都繞不過(guò)去的。Flink可以集成眾多Hadooop 組件,例如Yarn、Hbase、HDFS等等。例如,Flink可以和Yarn集成做資源調(diào)度,也可以讀寫(xiě)HDFS,或者利用HDFS做檢查點(diǎn)。
6. Flink集群有哪些角色?各自有什么作用
有以下三個(gè)角色:
JobManager處理器:
也稱之為Master,用于協(xié)調(diào)分布式執(zhí)行,它們用來(lái)調(diào)度task,協(xié)調(diào)檢查點(diǎn),協(xié)調(diào)失敗時(shí)恢復(fù)等。Flink運(yùn)行時(shí)至少存在一個(gè)master處理器,如果配置高可用模式則會(huì)存在多個(gè)master處理器,它們其中有一個(gè)是leader,而其他的都是standby。
TaskManager處理器:
也稱之為Worker,用于執(zhí)行一個(gè)dataflow的task(或者特殊的subtask)、數(shù)據(jù)緩沖和data stream的交換,Flink運(yùn)行時(shí)至少會(huì)存在一個(gè)worker處理器。
Clint客戶端:
Client是Flink程序提交的客戶端,當(dāng)用戶提交一個(gè)Flink程序時(shí),會(huì)首先創(chuàng)建一個(gè)Client,該Client首先會(huì)對(duì)用戶提交的Flink程序進(jìn)行預(yù)處理,并提交到Flink集群中處理,所以Client需要從用戶提交的Flink程序配置中獲取JobManager的地址,并建立到JobManager的連接,將Flink Job提交給JobManager
7. Flink 資源管理中 Task Slot 的概念
在Flink中每個(gè)TaskManager是一個(gè)JVM的進(jìn)程, 可以在不同的線程中執(zhí)行一個(gè)或多個(gè)子任務(wù)。為了控制一個(gè)worker能接收多少個(gè)task。worker通過(guò)task slot(任務(wù)槽)來(lái)進(jìn)行控制(一個(gè)worker至少有一個(gè)task slot)。
8. Flink的重啟策略了解嗎
Flink支持不同的重啟策略,這些重啟策略控制著job失敗后如何重啟:
固定延遲重啟策略
固定延遲重啟策略會(huì)嘗試一個(gè)給定的次數(shù)來(lái)重啟Job,如果超過(guò)了最大的重啟次數(shù),Job最終將失敗。在連續(xù)的兩次重啟嘗試之間,重啟策略會(huì)等待一個(gè)固定的時(shí)間。
失敗率重啟策略
失敗率重啟策略在Job失敗后會(huì)重啟,但是超過(guò)失敗率后,Job會(huì)最終被認(rèn)定失敗。在兩個(gè)連續(xù)的重啟嘗試之間,重啟策略會(huì)等待一個(gè)固定的時(shí)間。
無(wú)重啟策略
Job直接失敗,不會(huì)嘗試進(jìn)行重啟。
9. Flink 是如何保證 Exactly-once 語(yǔ)義的
Flink通過(guò)實(shí)現(xiàn)兩階段提交和狀態(tài)保存來(lái)實(shí)現(xiàn)端到端的一致性語(yǔ)義。分為以下幾個(gè)步驟:
開(kāi)始事務(wù)(beginTransaction)創(chuàng)建一個(gè)臨時(shí)文件夾,來(lái)寫(xiě)把數(shù)據(jù)寫(xiě)入到這個(gè)文件夾里面
預(yù)提交(preCommit)將內(nèi)存中緩存的數(shù)據(jù)寫(xiě)入文件并關(guān)閉
正式提交(commit)將之前寫(xiě)完的臨時(shí)文件放入目標(biāo)目錄下。這代表著最終的數(shù)據(jù)會(huì)有一些延遲
丟棄(abort)丟棄臨時(shí)文件
若失敗發(fā)生在預(yù)提交成功后,正式提交前?梢愿鶕(jù)狀態(tài)來(lái)提交預(yù)提交的數(shù)據(jù),也可刪除預(yù)提交的數(shù)據(jù)。
文章推薦:
八張圖搞懂 Flink 端到端精準(zhǔn)一次處理語(yǔ)義 Exactly-once
10. 如果下級(jí)存儲(chǔ)不支持事務(wù),Flink 怎么保證 exactly-once
端到端的 exactly-once 對(duì) sink 要求比較高,具體實(shí)現(xiàn)主要有冪等寫(xiě)入和事務(wù)性寫(xiě)入兩種方式。
冪等寫(xiě)入的場(chǎng)景依賴于業(yè)務(wù)邏輯,更常見(jiàn)的是用事務(wù)性寫(xiě)入。而事務(wù)性寫(xiě)入又有預(yù)寫(xiě)日志(WAL)和兩階段提交(2PC)兩種方式。
如果外部系統(tǒng)不支持事務(wù),那么可以用預(yù)寫(xiě)日志的方式,把結(jié)果數(shù)據(jù)先當(dāng)成狀態(tài)保存,然后在收到 checkpoint 完成的通知時(shí),一次性寫(xiě)入 sink 系統(tǒng)。
11. Flink是如何處理反壓的
Flink 內(nèi)部是基于 producer-consumer 模型來(lái)進(jìn)行消息傳遞的,Flink的反壓設(shè)計(jì)也是基于這個(gè)模型。Flink 使用了高效有界的分布式阻塞隊(duì)列,就像 Java 通用的阻塞隊(duì)列(BlockingQueue)一樣。下游消費(fèi)者消費(fèi)變慢,上游就會(huì)受到阻塞。
12. Flink中的狀態(tài)存儲(chǔ)
Flink在做計(jì)算的過(guò)程中經(jīng)常需要存儲(chǔ)中間狀態(tài),來(lái)避免數(shù)據(jù)丟失和狀態(tài)恢復(fù)。選擇的狀態(tài)存儲(chǔ)策略不同,會(huì)影響狀態(tài)持久化如何和 checkpoint 交互。Flink提供了三種狀態(tài)存儲(chǔ)方式:MemoryStateBackend、FsStateBackend、RocksDBStateBackend。
13. Flink是如何支持流批一體的
這道題問(wèn)的比較開(kāi)闊,如果知道Flink底層原理,可以詳細(xì)說(shuō)說(shuō),如果不是很了解,就直接簡(jiǎn)單一句話:Flink的開(kāi)發(fā)者認(rèn)為批處理是流處理的一種特殊情況。批處理是有限的流處理。Flink 使用一個(gè)引擎支持了 DataSet API 和 DataStream API。
14. Flink的內(nèi)存管理是如何做的
Flink 并不是將大量對(duì)象存在堆上,而是將對(duì)象都序列化到一個(gè)預(yù)分配的內(nèi)存塊上。此外,Flink大量的使用了堆外內(nèi)存。如果需要處理的數(shù)據(jù)超出了內(nèi)存限制,則會(huì)將部分?jǐn)?shù)據(jù)存儲(chǔ)到硬盤(pán)上。Flink 為了直接操作二進(jìn)制數(shù)據(jù)實(shí)現(xiàn)了自己的序列化框架。
15. Flink CEP 編程中當(dāng)狀態(tài)沒(méi)有到達(dá)的時(shí)候會(huì)將數(shù)據(jù)保存在哪里
在流式處理中,CEP 當(dāng)然是要支持 EventTime 的,那么相對(duì)應(yīng)的也要支持?jǐn)?shù)據(jù)的遲到現(xiàn)象,也就是watermark的處理邏輯。CEP對(duì)未匹配成功的事件序列的處理,和遲到數(shù)據(jù)是類似的。在 Flink CEP的處理邏輯中,狀態(tài)沒(méi)有滿足的和遲到的數(shù)據(jù),都會(huì)存儲(chǔ)在一個(gè)Map數(shù)據(jù)結(jié)構(gòu)中,也就是說(shuō),如果我們限定判斷事件序列的時(shí)長(zhǎng)為5分鐘,那么內(nèi)存中就會(huì)存儲(chǔ)5分鐘的數(shù)據(jù),這在我看來(lái),也是對(duì)內(nèi)存的極大損傷之一。
最后
第一時(shí)間獲取最新大數(shù)據(jù)技術(shù),盡在本公眾號(hào):五分鐘學(xué)大數(shù)據(jù)
發(fā)表評(píng)論
請(qǐng)輸入評(píng)論內(nèi)容...
請(qǐng)輸入評(píng)論/評(píng)論長(zhǎng)度6~500個(gè)字
最新活動(dòng)更多
-
10月31日立即下載>> 【限時(shí)免費(fèi)下載】TE暖通空調(diào)系統(tǒng)高效可靠的組件解決方案
-
即日-11.13立即報(bào)名>>> 【在線會(huì)議】多物理場(chǎng)仿真助跑新能源汽車(chē)
-
11月28日立即報(bào)名>>> 2024工程師系列—工業(yè)電子技術(shù)在線會(huì)議
-
12月19日立即報(bào)名>> 【線下會(huì)議】OFweek 2024(第九屆)物聯(lián)網(wǎng)產(chǎn)業(yè)大會(huì)
-
即日-12.26火熱報(bào)名中>> OFweek2024中國(guó)智造CIO在線峰會(huì)
-
即日-2025.8.1立即下載>> 《2024智能制造產(chǎn)業(yè)高端化、智能化、綠色化發(fā)展藍(lán)皮書(shū)》
推薦專題
- 1 【一周車(chē)話】沒(méi)有方向盤(pán)和踏板的車(chē),你敢坐嗎?
- 2 特斯拉發(fā)布無(wú)人駕駛車(chē),還未迎來(lái)“Chatgpt時(shí)刻”
- 3 特斯拉股價(jià)大跌15%:Robotaxi離落地還差一個(gè)蘿卜快跑
- 4 馬斯克給的“驚喜”夠嗎?
- 5 打完“價(jià)格戰(zhàn)”,大模型還要比什么?
- 6 馬斯克致敬“國(guó)產(chǎn)蘿卜”?
- 7 神經(jīng)網(wǎng)絡(luò),誰(shuí)是盈利最強(qiáng)企業(yè)?
- 8 比蘋(píng)果偉大100倍!真正改寫(xiě)人類歷史的智能產(chǎn)品降臨
- 9 諾獎(jiǎng)進(jìn)入“AI時(shí)代”,人類何去何從?
- 10 Open AI融資后成萬(wàn)億獨(dú)角獸,AI人才之爭(zhēng)開(kāi)啟
- 高級(jí)軟件工程師 廣東省/深圳市
- 自動(dòng)化高級(jí)工程師 廣東省/深圳市
- 光器件研發(fā)工程師 福建省/福州市
- 銷售總監(jiān)(光器件) 北京市/海淀區(qū)
- 激光器高級(jí)銷售經(jīng)理 上海市/虹口區(qū)
- 光器件物理工程師 北京市/海淀區(qū)
- 激光研發(fā)工程師 北京市/昌平區(qū)
- 技術(shù)專家 廣東省/江門(mén)市
- 封裝工程師 北京市/海淀區(qū)
- 結(jié)構(gòu)工程師 廣東省/深圳市