詳解Flink CEP的概念及功能
result.print("惡意用戶>>>")
env.execute("BarrageBehavior01")
}
}
實(shí)例二:監(jiān)測刷屏用戶
規(guī)則:用戶如果在10s內(nèi),同時(shí)連續(xù)輸入同樣一句話超過5次,就認(rèn)為是惡意刷屏。
使用 Flink CEP檢測刷屏用戶
object BarrageBehavior02 {
case class Message(userId: String, ip: String, msg: String)
def main(args: Array[String]): Unit = {
//初始化運(yùn)行環(huán)境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//設(shè)置并行度
env.setParallelism(1)
// 模擬數(shù)據(jù)源
val loginEventStream: DataStream[Message] = env.fromCollection(
List(
Message("1", "192.168.0.1", "beijing"),
Message("1", "192.168.0.2", "beijing"),
Message("1", "192.168.0.3", "beijing"),
Message("1", "192.168.0.4", "beijing"),
Message("2", "192.168.10.10", "shanghai"),
Message("3", "192.168.10.10", "beijing"),
Message("3", "192.168.10.11", "beijing"),
Message("4", "192.168.10.10", "beijing"),
Message("5", "192.168.10.11", "shanghai"),
Message("4", "192.168.10.12", "beijing"),
Message("5", "192.168.10.13", "shanghai"),
Message("5", "192.168.10.14", "shanghai"),
Message("5", "192.168.10.15", "beijing"),
Message("6", "192.168.10.16", "beijing"),
Message("6", "192.168.10.17", "beijing"),
Message("6", "192.168.10.18", "beijing"),
Message("5", "192.168.10.18", "shanghai"),
Message("6", "192.168.10.19", "beijing"),
Message("6", "192.168.10.19", "beijing"),
Message("5", "192.168.10.18", "shanghai")
)
)
//定義模式
val loginbeijingPattern = Pattern.begin[Message]("start")
.where(_.msg != null) //一條登錄失敗
.times(5).optional //將滿足五次的數(shù)據(jù)配對打印
.within(Time.seconds(10))
//進(jìn)行分組匹配
val loginbeijingDataPattern = CEP.pattern(loginEventStream.keyBy(_.userId), loginbeijingPattern)
//查找符合規(guī)則的數(shù)據(jù)
val loginbeijingResult: DataStream[Option[Iterable[Message]]] = loginbeijingDataPattern.select(patternSelectFun = (pattern: collection.Map[String, Iterable[Message]]) => {
var loginEventList: Option[Iterable[Message]] = null
loginEventList = pattern.get("start") match {
case Some(value) => {
if (value.toList.map(x => (x.userId, x.msg)).distinct.size == 1) {
Some(value)
} else {
None
}
}
}
loginEventList
})
//打印測試
loginbeijingResult.filter(x=>x。絅one).map(x=>{
x match {
case Some(value)=> value
}
}).print()
env.execute("BarrageBehavior02)
}
}
Flink CEP API
除了案例中介紹的幾個(gè)API外,我們在介紹下其他的常用API:
1. 條件 API
為了讓傳入事件被模式所接受,給模式指定傳入事件必須滿足的條件,這些條件由事件本身的屬性或者前面匹配過的事件的屬性統(tǒng)計(jì)量等來設(shè)定。比如,事件的某個(gè)值大于5,或者大于先前接受事件的某個(gè)值的平均值。
可以使用pattern.where()、pattern.or()、pattern.until()方法來指定條件。條件既可以是迭代條件IterativeConditions,也可以是簡單條件SimpleConditions。
FlinkCEP支持事件之間的三種臨近條件:
next():嚴(yán)格的滿足條件
示例:模式為begin("first").where(_.name='a').next("second").where(.name='b')當(dāng)且僅當(dāng)數(shù)據(jù)為a,b時(shí),模式才會被命中。如果數(shù)據(jù)為a,c,b,由于a的后面跟了c,所以a會被直接丟棄,模式不會命中。
followedBy():松散的滿足條件
示例:模式為begin("first").where(_.name='a').followedBy("second").where(.name='b')當(dāng)且僅當(dāng)數(shù)據(jù)為a,b或者為a,c,b,模式均被命中,中間的c會被忽略掉。
followedByAny():非確定的松散滿足條件
示例:模式為begin("first").where(_.name='a').followedByAny("second").where(.name='b')當(dāng)且僅當(dāng)數(shù)據(jù)為a,c,b,b時(shí),對于followedBy模式而言命中的為{a,b},對于followedByAny而言會有兩次命中{a,b},{a,b}。
2. 量詞 API
還記得我們在上面講解模式概念時(shí)說過的一句話:一般情況下,模式都是單例模式,可以使用量詞(Quantifiers)將其轉(zhuǎn)換為循環(huán)模式。這里的量詞就是指的量詞API。
以下這幾個(gè)量詞API,可以將模式指定為循環(huán)模式:
pattern.oneOrMore():一個(gè)給定的事件有一次或多次出現(xiàn),例如上面提到的b+。
pattern.times(#ofTimes):一個(gè)給定類型的事件出現(xiàn)了指定次數(shù),例如4次。
pattern.times(#fromTimes, #toTimes):一個(gè)給定類型的事件出現(xiàn)的次數(shù)在指定次數(shù)范圍內(nèi),例如2~4次。
可以使用pattern.greedy()方法將模式變成循環(huán)模式,但是不能讓一組模式都變成循環(huán)模式。greedy:就是盡可能的重復(fù)。
使用pattern.optional()方法將循環(huán)模式變成可選的,即可以是循環(huán)模式也可以是單個(gè)模式。
3. 匹配后的跳過策略
所謂的匹配跳過策略,是對多個(gè)成功匹配的模式進(jìn)行篩選。也就是說如果多個(gè)匹配成功,可能我不需要這么多,按照匹配策略,過濾下就可以。
Flink中有五種跳過策略:
NO_SKIP: 不過濾,所有可能的匹配都會被發(fā)出。
SKIP_TO_NEXT: 丟棄與開始匹配到的事件相同的事件,發(fā)出開始匹配到的事件,即直接跳到下一個(gè)模式匹配到的事件,以此類推。
SKIP_PAST_LAST_EVENT: 丟棄匹配開始后但結(jié)束之前匹配到的事件。
SKIP_TO_FIRST[PatternName]: 丟棄匹配開始后但在PatternName模式匹配到的第一個(gè)事件之前匹配到的事件。
SKIP_TO_LAST[PatternName]: 丟棄匹配開始后但在PatternName模式匹配到的最后一個(gè)事件之前匹配到的事件。
怎么理解上述策略,我們以NO_SKIP和SKIP_PAST_LAST_EVENT為例講解下:
在模式為:begin("start").where(_.name='a').oneOrMore().followedBy("second").where(_.name='b')中,我們輸入數(shù)據(jù):a,a,a,a,b ,如果是NO_SKIP策略,即不過濾策略,模式匹配到的是:{a,b},{a,a,b},{a,a,a,b},{a,a,a,a,b};如果是SKIP_PAST_LAST_EVENT策略,即丟棄匹配開始后但結(jié)束之前匹配到的事件,模式匹配到的是:{a,a,a,a,b}。
Flink CEP 的使用場景
除上述案例場景外,F(xiàn)link CEP 還廣泛用于網(wǎng)絡(luò)欺詐,故障檢測,風(fēng)險(xiǎn)規(guī)避,智能營銷等領(lǐng)域。
1. 實(shí)時(shí)反作弊和風(fēng)控
對于電商來說,羊毛黨是必不可少的,國內(nèi)拼多多曾爆出 100 元的無門檻券隨便領(lǐng),當(dāng)晚被人褥幾百億,對于這種情況肯定是沒有做好及時(shí)的風(fēng)控。另外還有就是商家上架商品時(shí)通過頻繁修改商品的名稱和濫用標(biāo)題來提高搜索關(guān)鍵字的排名、批量注冊一批機(jī)器賬號快速刷單來提高商品的銷售量等作弊行為,各種各樣的作弊手法也是需要不斷的去制定規(guī)則去匹配這種行為。
2. 實(shí)時(shí)營銷
分析用戶在手機(jī) APP 的實(shí)時(shí)行為,統(tǒng)計(jì)用戶的活動周期,通過為用戶畫像來給用戶進(jìn)行推薦。比如用戶在登錄 APP 后 1 分鐘內(nèi)只瀏覽了商品沒有下單;用戶在瀏覽一個(gè)商品后,3 分鐘內(nèi)又去查看其他同類的商品,進(jìn)行比價(jià)行為;用戶商品下單后 1 分鐘內(nèi)是否支付了該訂單。如果這些數(shù)據(jù)都可以很好的利用起來,那么就可以給用戶推薦瀏覽過的類似商品,這樣可以大大提高購買率。
3. 實(shí)時(shí)網(wǎng)絡(luò)攻擊檢測
當(dāng)下互聯(lián)網(wǎng)安全形勢仍然嚴(yán)峻,網(wǎng)絡(luò)攻擊屢見不鮮且花樣眾多,這里我們以 DDOS(分布式拒絕服務(wù)攻擊)產(chǎn)生的流入流量來作為遭受攻擊的判斷依據(jù)。對網(wǎng)絡(luò)遭受的潛在攻擊進(jìn)行實(shí)時(shí)檢測并給出預(yù)警,云服務(wù)廠商的多個(gè)數(shù)據(jù)中心會定時(shí)向監(jiān)控中心上報(bào)其瞬時(shí)流量,如果流量在預(yù)設(shè)的正常范圍內(nèi)則認(rèn)為是正,F(xiàn)象,不做任何操作;如果某數(shù)據(jù)中心在 10 秒內(nèi)連續(xù) 5 次上報(bào)的流量超過正常范圍的閾值,則觸發(fā)一條警告的事件;如果某數(shù)據(jù)中心 30 秒內(nèi)連續(xù)出現(xiàn) 30 次上報(bào)的流量超過正常范圍的閾值,則觸發(fā)嚴(yán)重的告警。
Flink CEP 的原理簡單介紹
Apache Flink在實(shí)現(xiàn)CEP時(shí)借鑒了Efficient Pattern Matching over Event Streams論文中NFA的模型,在這篇論文中,還提到了一些優(yōu)化,我們在這里先跳過,只說下NFA的概念。
在這篇論文中,提到了NFA,也就是Non-determined Finite Automaton,叫做不確定的有限狀態(tài)機(jī),指的是狀態(tài)有限,但是每個(gè)狀態(tài)可能被轉(zhuǎn)換成多個(gè)狀態(tài)(不確定)。
非確定有限自動狀態(tài)機(jī):
先介紹兩個(gè)概念:
狀態(tài):狀態(tài)分為三類,起始狀態(tài)、中間狀態(tài)和最終狀態(tài)。
轉(zhuǎn)換:take/ignore/proceed都是轉(zhuǎn)換的名稱。
在NFA匹配規(guī)則里,本質(zhì)上是一個(gè)狀態(tài)轉(zhuǎn)換的過程。三種轉(zhuǎn)換的含義如下所示:
Take: 主要是條件的判斷,當(dāng)過來一條數(shù)據(jù)進(jìn)行判斷,一旦滿足條件,獲取當(dāng)前元素,放入到結(jié)果集中,然后將當(dāng)前狀態(tài)轉(zhuǎn)移到下一個(gè)的狀態(tài)。
Proceed:當(dāng)前的狀態(tài)可以不依賴任何的事件轉(zhuǎn)移到下一個(gè)狀態(tài),比如說透傳的意思。
Ignore:當(dāng)一條數(shù)據(jù)到來的時(shí)候,可以忽略這個(gè)消息事件,當(dāng)前的狀態(tài)保持不變,相當(dāng)于自己到自己的一個(gè)狀態(tài)。
NFA的特點(diǎn):在NFA中,給定當(dāng)前狀態(tài),可能有多個(gè)下一個(gè)狀態(tài)。可以隨機(jī)選擇下一個(gè)狀態(tài),也可以并行(同時(shí))選擇下一個(gè)狀態(tài)。輸入符號可以為空。
規(guī)則引擎
規(guī)則引擎:將業(yè)務(wù)決策從應(yīng)用程序代碼中分離出來,并使用預(yù)定義的語義模塊編寫業(yè)務(wù)決策。接受數(shù)據(jù)輸入,解釋業(yè)務(wù)規(guī)則,并根據(jù)業(yè)務(wù)規(guī)則做出業(yè)務(wù)決策。
使用規(guī)則引擎可以通過降低實(shí)現(xiàn)復(fù)雜業(yè)務(wù)邏輯的組件的復(fù)雜性,降低應(yīng)用程序的維護(hù)和可擴(kuò)展性成本。
1. Drools
Drools 是一款使用 Java 編寫的開源規(guī)則引擎,通常用來解決業(yè)務(wù)代碼與業(yè)務(wù)規(guī)則的分離,它內(nèi)置的 Drools Fusion 模塊也提供 CEP 的功能。
優(yōu)勢:
功能較為完善,具有如系統(tǒng)監(jiān)控、操作平臺等功能。規(guī)則支持動態(tài)更新。
劣勢:
以內(nèi)存實(shí)現(xiàn)時(shí)間窗功能,無法支持較長跨度的時(shí)間窗。無法有效支持定時(shí)觸達(dá)(如用戶在瀏覽發(fā)生一段時(shí)間后觸達(dá)條件判斷)。2. Aviator
Aviator 是一個(gè)高性能、輕量級的 Java 語言實(shí)現(xiàn)的表達(dá)式求值引擎,主要用于各種表達(dá)式的動態(tài)求值。
優(yōu)勢:
支持大部分運(yùn)算操作符。支持函數(shù)調(diào)用和自定義函數(shù)。支持正則表達(dá)式匹配。支持傳入變量并且性能優(yōu)秀。
劣勢:
沒有 if else、do while 等語句,沒有賦值語句,沒有位運(yùn)算符。3. EasyRules
EasyRules 集成了 MVEL 和 SpEL 表達(dá)式的一款輕量級規(guī)則引擎。
優(yōu)勢:
輕量級框架,學(xué)習(xí)成本低; POJO。為定義業(yè)務(wù)引擎提供有用的抽象和簡便的應(yīng)用。支持從簡單的規(guī)則組建成復(fù)雜規(guī)則。4. Esper
Esper 設(shè)計(jì)目標(biāo)為 CEP 的輕量級解決方案,可以方便的嵌入服務(wù)中,提供 CEP 功能。
優(yōu)勢:
輕量級可嵌入開發(fā),常用的 CEP 功能簡單好用。EPL 語法與 SQL 類似,學(xué)習(xí)成本較低。
劣勢:
單機(jī)全內(nèi)存方案,需要整合其他分布式和存儲。以內(nèi)存實(shí)現(xiàn)時(shí)間窗功能,無法支持較長跨度的時(shí)間窗。無法有效支持定時(shí)觸達(dá)(如用戶在瀏覽發(fā)生一段時(shí)間后觸達(dá)條件判斷)。5. Flink CEP
Flink 是一個(gè)流式系統(tǒng),具有高吞吐低延遲的特點(diǎn),F(xiàn)link CEP 是一套極具通用性、易于使用的實(shí)時(shí)流式事件處理方案。
優(yōu)勢:
繼承了 Flink 高吞吐的特點(diǎn)。事件支持存儲到外部,可以支持較長跨度的時(shí)間窗?梢灾С侄〞r(shí)觸達(dá)(用 followedBy + PartternTimeoutFunction 實(shí)現(xiàn))。
請輸入評論內(nèi)容...
請輸入評論/評論長度6~500個(gè)字
最新活動更多
-
10月31日立即下載>> 【限時(shí)免費(fèi)下載】TE暖通空調(diào)系統(tǒng)高效可靠的組件解決方案
-
即日-11.13立即報(bào)名>>> 【在線會議】多物理場仿真助跑新能源汽車
-
11月28日立即報(bào)名>>> 2024工程師系列—工業(yè)電子技術(shù)在線會議
-
12月19日立即報(bào)名>> 【線下會議】OFweek 2024(第九屆)物聯(lián)網(wǎng)產(chǎn)業(yè)大會
-
即日-12.26火熱報(bào)名中>> OFweek2024中國智造CIO在線峰會
-
即日-2025.8.1立即下載>> 《2024智能制造產(chǎn)業(yè)高端化、智能化、綠色化發(fā)展藍(lán)皮書》
推薦專題
- 1 【一周車話】沒有方向盤和踏板的車,你敢坐嗎?
- 2 特斯拉發(fā)布無人駕駛車,還未迎來“Chatgpt時(shí)刻”
- 3 特斯拉股價(jià)大跌15%:Robotaxi離落地還差一個(gè)蘿卜快跑
- 4 馬斯克給的“驚喜”夠嗎?
- 5 打完“價(jià)格戰(zhàn)”,大模型還要比什么?
- 6 馬斯克致敬“國產(chǎn)蘿卜”?
- 7 神經(jīng)網(wǎng)絡(luò),誰是盈利最強(qiáng)企業(yè)?
- 8 比蘋果偉大100倍!真正改寫人類歷史的智能產(chǎn)品降臨
- 9 諾獎(jiǎng)進(jìn)入“AI時(shí)代”,人類何去何從?
- 10 Open AI融資后成萬億獨(dú)角獸,AI人才之爭開啟
- 高級軟件工程師 廣東省/深圳市
- 自動化高級工程師 廣東省/深圳市
- 光器件研發(fā)工程師 福建省/福州市
- 銷售總監(jiān)(光器件) 北京市/海淀區(qū)
- 激光器高級銷售經(jīng)理 上海市/虹口區(qū)
- 光器件物理工程師 北京市/海淀區(qū)
- 激光研發(fā)工程師 北京市/昌平區(qū)
- 技術(shù)專家 廣東省/江門市
- 封裝工程師 北京市/海淀區(qū)
- 結(jié)構(gòu)工程師 廣東省/深圳市