一文詳解Flink知識(shí)體系
4) Flink 關(guān)聯(lián) Hive 分區(qū)表
Flink 1.12 支持了 Hive 最新的分區(qū)作為時(shí)態(tài)表的功能,可以通過 SQL 的方式直接關(guān)聯(lián) Hive 分區(qū)表的最新分區(qū),并且會(huì)自動(dòng)監(jiān)聽最新的 Hive 分區(qū),當(dāng)監(jiān)控到新的分區(qū)后,會(huì)自動(dòng)地做維表數(shù)據(jù)的全量替換。通過這種方式,用戶無需編寫 DataStream 程序即可完成 Kafka 流實(shí)時(shí)關(guān)聯(lián)最新的 Hive 分區(qū)實(shí)現(xiàn)數(shù)據(jù)打?qū)挕?/p>
具體用法:
在 Sql Client 中注冊(cè) HiveCatalog:
vim conf/sql-client-defaults.yaml
catalogs:
- name: hive_catalog
type: hive
hive-conf-dir: /disk0/soft/hive-conf/ #該目錄需要包hive-site.xml文件
創(chuàng)建 Kafka 表
CREATE TABLE hive_catalog.flink_db.kfk_fact_bill_master_12 (
master Row
Flink 事實(shí)表與 Hive 最新分區(qū)數(shù)據(jù)關(guān)聯(lián)
dim_extend_shop_info 是 Hive 中已存在的表,所以我們用 table hint 動(dòng)態(tài)地開啟維表參數(shù)。
CREATE VIEW IF NOT EXISTS hive_catalog.flink_db.view_fact_bill_master as
SELECT * FROM
(select t1.*, t2.group_id, t2.shop_id, t2.group_name, t2.shop_name, t2.brand_id,
ROW_NUMBER() OVER (PARTITION BY groupID, shopID, orderKey ORDER BY actionTime desc) rn
from hive_catalog.flink_db.kfk_fact_bill_master_12 t1
JOIN hive_catalog.flink_db.dim_extend_shop_info
+ OPTIONS('streaming-source.enable'='true',
'streaming-source.partition.include' = 'latest',
'streaming-source.monitor-interval' = '1 h',
'streaming-source.partition-order' = 'partition-name')
FOR SYSTEM_TIME AS OF t1.proctime AS t2 --時(shí)態(tài)表
ON t1.groupID = t2.group_id and t1.shopID = t2.shop_id
where groupID in (202042)) t where t.rn = 1
參數(shù)解釋:
streaming-source.enable 開啟流式讀取 Hive 數(shù)據(jù)。
streaming-source.partition.include 有以下兩個(gè)值:
latest 屬性: 只讀取最新分區(qū)數(shù)據(jù)。all: 讀取全量分區(qū)數(shù)據(jù) ,默認(rèn)值為 all,表示讀所有分區(qū),latest 只能用在 temporal join 中,用于讀取最新分區(qū)作為維表,不能直接讀取最新分區(qū)數(shù)據(jù)。
streaming-source.monitor-interval 監(jiān)聽新分區(qū)生成的時(shí)間、不宜過短 、最短是1 個(gè)小時(shí),因?yàn)槟壳暗膶?shí)現(xiàn)是每個(gè) task 都會(huì)查詢 metastore,高頻的查可能會(huì)對(duì)metastore 產(chǎn)生過大的壓力。需要注意的是,1.12.1 放開了這個(gè)限制,但仍建議按照實(shí)際業(yè)務(wù)不要配個(gè)太短的 interval。
streaming-source.partition-order 分區(qū)策略,主要有以下 3 種,其中最為推薦的是 partition-name:
partition-name 使用默認(rèn)分區(qū)名稱順序加載最新分區(qū)create-time 使用分區(qū)文件創(chuàng)建時(shí)間順序partition-time 使用分區(qū)時(shí)間順序六、Flink 狀態(tài)管理
我們前面寫的 wordcount 的例子,沒有包含狀態(tài)管理。如果一個(gè)task在處理過程中掛掉了,那么它在內(nèi)存中的狀態(tài)都會(huì)丟失,所有的數(shù)據(jù)都需要重新計(jì)算。從容錯(cuò)和消息處理的語(yǔ)義上(at least once, exactly once),Flink引入了state和checkpoint。
因此可以說flink因?yàn)橐肓藄tate和checkpoint所以才支持的exactly once
首先區(qū)分一下兩個(gè)概念:
state:
state一般指一個(gè)具體的task/operator的狀態(tài):
state數(shù)據(jù)默認(rèn)保存在java的堆內(nèi)存中,TaskManage節(jié)點(diǎn)的內(nèi)存中。
operator表示一些算子在運(yùn)行的過程中會(huì)產(chǎn)生的一些中間結(jié)果。
checkpoint:
checkpoint可以理解為checkpoint是把state數(shù)據(jù)定時(shí)持久化存儲(chǔ)了,則表示了一個(gè)Flink Job在一個(gè)特定時(shí)刻的一份全局狀態(tài)快照,即包含了所有task/operator的狀態(tài)。
注意:task(subTask)是Flink中執(zhí)行的基本單位。operator指算子(transformation)
State可以被記錄,在失敗的情況下數(shù)據(jù)還可以恢復(fù)。
Flink中有兩種基本類型的State:
Keyed State
Operator State
Keyed State和Operator State,可以以兩種形式存在:
原始狀態(tài)(raw state)
托管狀態(tài)(managed state)
托管狀態(tài)是由Flink框架管理的狀態(tài)。
我們說operator算子保存了數(shù)據(jù)的中間結(jié)果,中間結(jié)果保存在什么類型中,如果我們這里是托管狀態(tài),則由flink框架自行管理
原始狀態(tài)由用戶自行管理狀態(tài)具體的數(shù)據(jù)結(jié)構(gòu),框架在做checkpoint的時(shí)候,使用byte[]來讀寫狀態(tài)內(nèi)容,對(duì)其內(nèi)部數(shù)據(jù)結(jié)構(gòu)一無所知。
通常在DataStream上的狀態(tài)推薦使用托管的狀態(tài),當(dāng)實(shí)現(xiàn)一個(gè)用戶自定義的operator時(shí),會(huì)使用到原始狀態(tài)。
1. State-Keyed State
基于KeyedStream上的狀態(tài)。這個(gè)狀態(tài)是跟特定的key綁定的,對(duì)KeyedStream流上的每一個(gè)key,都對(duì)應(yīng)一個(gè)state,比如:stream.keyBy(…)。KeyBy之后的Operator State,可以理解為分區(qū)過的Operator State。
保存state的數(shù)據(jù)結(jié)構(gòu):
ValueState:即類型為T的單值狀態(tài)。這個(gè)狀態(tài)與對(duì)應(yīng)的key綁定,是最簡(jiǎn)單的狀態(tài)了。它可以通過update方法更新狀態(tài)值,通過value()方法獲取狀態(tài)值。
ListState:即key上的狀態(tài)值為一個(gè)列表?梢酝ㄟ^add方法往列表中附加值;也可以通過get()方法返回一個(gè)Iterable來遍歷狀態(tài)值。
ReducingState:這種狀態(tài)通過用戶傳入的reduceFunction,每次調(diào)用add方法添加值的時(shí)候,會(huì)調(diào)用reduceFunction,最后合并到一個(gè)單一的狀態(tài)值。
MapState
需要注意的是,以上所述的State對(duì)象,僅僅用于與狀態(tài)進(jìn)行交互(更新、刪除、清空等),而真正的狀態(tài)值,有可能是存在內(nèi)存、磁盤、或者其他分布式存儲(chǔ)系統(tǒng)中。相當(dāng)于我們只是持有了這個(gè)狀態(tài)的句柄。
1. ValueState
使用ValueState保存中間結(jié)果對(duì)下面數(shù)據(jù)進(jìn)行分組求和。
開發(fā)步驟:
1. 獲取流處理執(zhí)行環(huán)境
2. 加載數(shù)據(jù)源
3. 數(shù)據(jù)分組
4. 數(shù)據(jù)轉(zhuǎn)換,定義ValueState,保存中間結(jié)果
5. 數(shù)據(jù)打印
6. 觸發(fā)執(zhí)行
ValueState:測(cè)試數(shù)據(jù)源:
List(
(1L, 4L),
(2L, 3L),
(3L, 1L),
(1L, 2L),
(3L, 2L),
(1L, 2L),
(2L, 2L),
(2L, 9L)
)
示例代碼:
import org.a(chǎn)pache.flink.a(chǎn)pi.common.functions.RichFlatMapFunction
import org.a(chǎn)pache.flink.a(chǎn)pi.common.state.{ValueState, ValueStateDescriptor}
import org.a(chǎn)pache.flink.a(chǎn)pi.common.typeinfo.{TypeHint, TypeInformation}
import org.a(chǎn)pache.flink.configuration.Configuration
import org.a(chǎn)pache.flink.streaming.a(chǎn)pi.scala._
import org.a(chǎn)pache.flink.streaming.a(chǎn)pi.scala.{DataStream, StreamExecutionEnvironment}
import org.a(chǎn)pache.flink.util.Collector
object TestKeyedState {
class CountWithKeyedState extends RichFlatMapFunction[(Long, Long), (Long, Long)] {
*
* ValueState狀態(tài)句柄. 第一個(gè)值為count,第二個(gè)值為sum。
private var sum: ValueState[(Long, Long)] = _
override def flatMap(input: (Long, Long), out: Collector[(Long, Long)]): Unit = {
// 獲取當(dāng)前狀態(tài)值
val tmpCurrentSum: (Long, Long) = sum.value
// 狀態(tài)默認(rèn)值
val currentSum = if (tmpCurrentSum != null) {
tmpCurrentSum
} else {
(0L, 0L)
}
// 更新
val newSum = (currentSum._1 + 1, currentSum._2 + input._2)
// 更新狀態(tài)值
sum.update(newSum)
// 如果count >=3 清空狀態(tài)值,重新計(jì)算
if (newSum._1 >= 3) {
out.collect((input._1, newSum._2 / newSum._1))
sum.clear()
}
}
override def open(parameters: Configuration): Unit = {
sum = getRuntimeContext.getState(
new ValueStateDescriptor[(Long, Long)]("average", // 狀態(tài)名稱
TypeInformation.of(new TypeHint[(Long, Long)](){}) )// 狀態(tài)類型
)
}
}
def main(args: Array[String]): Unit = {
//初始化執(zhí)行環(huán)境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//構(gòu)建數(shù)據(jù)源
val inputStream: DataStream[(Long, Long)] = env.fromCollection(
List(
(1L, 4L),
(2L, 3L),
(3L, 1L),
(1L, 2L),
(3L, 2L),
(1L, 2L),
(2L, 2L),
(2L, 9L))
)
//執(zhí)行數(shù)據(jù)處理
inputStream.keyBy(0)
.flatMap(new CountWithKeyedState)
.setParallelism(1)
.print
//運(yùn)行任務(wù)
env.execute
}
}
2. MapState
使用MapState保存中間結(jié)果對(duì)下面數(shù)據(jù)進(jìn)行分組求和:
1. 獲取流處理執(zhí)行環(huán)境
2. 加載數(shù)據(jù)源
3. 數(shù)據(jù)分組
4. 數(shù)據(jù)轉(zhuǎn)換,定義MapState,保存中間結(jié)果
5. 數(shù)據(jù)打印
6. 觸發(fā)執(zhí)行
MapState:測(cè)試數(shù)據(jù)源:
List(
("java", 1),
("python", 3),
("java", 2),
("scala", 2),
("python", 1),
("java", 1),
("scala", 2)
)
示例代碼:
object MapState {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
*
* 使用MapState保存中間結(jié)果對(duì)下面數(shù)據(jù)進(jìn)行分組求和
* 1.獲取流處理執(zhí)行環(huán)境
* 2.加載數(shù)據(jù)源
* 3.?dāng)?shù)據(jù)分組
* 4.?dāng)?shù)據(jù)轉(zhuǎn)換,定義MapState,保存中間結(jié)果
* 5.?dāng)?shù)據(jù)打印
* 6.觸發(fā)執(zhí)行
val source: DataStream[(String, Int)] = env.fromCollection(List(
("java", 1),
("python", 3),
("java", 2),
("scala", 2),
("python", 1),
("java", 1),
("scala", 2)))
source.keyBy(0)
.map(new RichMapFunction[(String, Int), (String, Int)] {
var mste: MapState[String, Int] = _
override def open(parameters: Configuration): Unit = {
val msState = new MapStateDescriptor[String, Int]("ms",
TypeInformation.of(new TypeHint[(String)] {}),
TypeInformation.of(new TypeHint[(Int)] {}))
mste = getRuntimeContext.getMapState(msState)
}
override def map(value: (String, Int)): (String, Int) = {
val i: Int = mste.get(value._1)
mste.put(value._1, value._2 + i)
(value._1, value._2 + i)
}
}).print()
env.execute()
}
}
2. State-Operator State
與Key無關(guān)的State,與Operator綁定的state,整個(gè)operator只對(duì)應(yīng)一個(gè)state。
保存state的數(shù)據(jù)結(jié)構(gòu):
ListState
舉例來說,Flink中的 Kafka Connector,就使用了operator state。它會(huì)在每個(gè)connector實(shí)例中,保存該實(shí)例中消費(fèi)topic的所有(partition, offset)映射。
步驟:
獲取執(zhí)行環(huán)境
設(shè)置檢查點(diǎn)機(jī)制:路徑,重啟策略
自定義數(shù)據(jù)源
需要繼承并行數(shù)據(jù)源和CheckpointedFunction設(shè)置listState,通過上下文對(duì)象context獲取數(shù)據(jù)處理,保留offset制作快照
數(shù)據(jù)打印
觸發(fā)執(zhí)行
示例代碼:
import java.util
import org.a(chǎn)pache.flink.a(chǎn)pi.common.restartstrategy.RestartStrategies
import org.a(chǎn)pache.flink.a(chǎn)pi.common.state.{ListState, ListStateDescriptor}
import org.a(chǎn)pache.flink.a(chǎn)pi.common.time.Time
import org.a(chǎn)pache.flink.a(chǎn)pi.common.typeinfo.{TypeHint, TypeInformation}
import org.a(chǎn)pache.flink.runtime.state.{FunctionInitializationContext, FunctionSnapshotContext}
import org.a(chǎn)pache.flink.runtime.state.filesystem.FsStateBackend
import org.a(chǎn)pache.flink.streaming.a(chǎn)pi.CheckpointingMode
import org.a(chǎn)pache.flink.streaming.a(chǎn)pi.checkpoint.CheckpointedFunction
import org.a(chǎn)pache.flink.streaming.a(chǎn)pi.environment.CheckpointConfig
import org.a(chǎn)pache.flink.streaming.a(chǎn)pi.functions.source.{RichParallelSourceFunction, SourceFunction}
import org.a(chǎn)pache.flink.streaming.a(chǎn)pi.scala.StreamExecutionEnvironment
import org.a(chǎn)pache.flink.streaming.a(chǎn)pi.scala._
object ListOperate {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.enableCheckpointing(5000)
env.setStateBackend(new FsStateBackend("hdfs://node01:8020/tmp/check/8"))
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
env.getCheckpointConfig.setCheckpointTimeout(60000)
env.getCheckpointConfig.setFailOnCheckpointingErrors(false)
env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
//重啟策略
env.setRestartStrategy(RestartStrategies.failureRateRestart(3, Time.minutes(1), Time.seconds(5)))
//模擬kakfa偏移量
env.a(chǎn)ddSource(new MyRichParrelSourceFun)
.print()
env.execute()
}
}
class MyRichParrelSourceFun extends RichParallelSourceFunction[String]
with CheckpointedFunction {
var listState: ListState[Long] = _
var offset: Long = 0L
//任務(wù)運(yùn)行
override def run(ctx: SourceFunction.SourceContext[String]): Unit = {
val iterState: util.Iterator[Long] = listState.get().iterator()
while (iterState.hasNext) {
offset = iterState.next()
}
while (true) {
offset += 1
ctx.collect("offset:"+offset)
Thread.sleep(1000)
if(offset > 10){
1/0
}
}
}
//取消任務(wù)
override def cancel(): Unit = ???
//制作快照
override def snapshotState(context: FunctionSnapshotContext): Unit = {
listState.clear()
listState.a(chǎn)dd(offset)
}
//初始化狀態(tài)
override def initializeState(context: FunctionInitializationContext): Unit = {
listState = context.getOperatorStateStore.getListState(new ListStateDescriptor[Long](
"listState", TypeInformation.of(new TypeHint[Long] {})
))
}
}
3. Broadcast State
Broadcast State 是 Flink 1.5 引入的新特性。在開發(fā)過程中,如果遇到需要下發(fā)/廣播配置、規(guī)則等低吞吐事件流到下游所有 task 時(shí),就可以使用 Broadcast State 特性。下游的 task 接收這些配置、規(guī)則并保存為 BroadcastState, 將這些配置應(yīng)用到另一個(gè)數(shù)據(jù)流的計(jì)算中 。
1) API介紹
通常,我們首先會(huì)創(chuàng)建一個(gè)Keyed或Non-Keyed的Data Stream,然后再創(chuàng)建一個(gè)Broadcasted Stream,最后通過Data Stream來連接(調(diào)用connect方法)到Broadcasted Stream上,這樣實(shí)現(xiàn)將Broadcast State廣播到Data Stream下游的每個(gè)Task中。
如果Data Stream是Keyed Stream,則連接到Broadcasted Stream后,添加處理ProcessFunction時(shí)需要使用KeyedBroadcastProcessFunction來實(shí)現(xiàn),下面是KeyedBroadcastProcessFunction的API,代碼如下所示:
public abstract class KeyedBroadcastProcessFunction
上面泛型中的各個(gè)參數(shù)的含義,說明如下:
KS:表示Flink程序從最上游的Source Operator開始構(gòu)建Stream,當(dāng)調(diào)用keyBy時(shí)所依賴的Key的類型;IN1:表示非Broadcast的Data Stream中的數(shù)據(jù)記錄的類型;IN2:表示Broadcast Stream中的數(shù)據(jù)記錄的類型;OUT:表示經(jīng)過KeyedBroadcastProcessFunction的processElement()和processBroadcastElement()方法處理后輸出結(jié)果數(shù)據(jù)記錄的類型。
如果Data Stream是Non-Keyed Stream,則連接到Broadcasted Stream后,添加處理ProcessFunction時(shí)需要使用BroadcastProcessFunction來實(shí)現(xiàn),下面是BroadcastProcessFunction的API,代碼如下所示:
public abstract class BroadcastProcessFunction
上面泛型中的各個(gè)參數(shù)的含義,與前面KeyedBroadcastProcessFunction的泛型類型中的后3個(gè)含義相同,只是沒有調(diào)用keyBy操作對(duì)原始Stream進(jìn)行分區(qū)操作,就不需要KS泛型參數(shù)。
注意事項(xiàng):
Broadcast State 是Map類型,即K-V類型。
Broadcast State 只有在廣播一側(cè)的方法中processBroadcastElement可以修改;在非廣播一側(cè)方法中processElement只讀。
Broadcast State在運(yùn)行時(shí)保存在內(nèi)存中。
2) 場(chǎng)景舉例
動(dòng)態(tài)更新計(jì)算規(guī)則: 如事件流需要根據(jù)最新的規(guī)則進(jìn)行計(jì)算,則可將規(guī)則作為廣播狀態(tài)廣播到下游Task中。
實(shí)時(shí)增加額外字段: 如事件流需要實(shí)時(shí)增加用戶的基礎(chǔ)信息,則可將用戶的基礎(chǔ)信息作為廣播狀態(tài)廣播到下游Task中。
七、Flink的容錯(cuò)1. Checkpoint介紹
checkpoint機(jī)制是Flink可靠性的基石,可以保證Flink集群在某個(gè)算子因?yàn)槟承┰?如 異常退出)出現(xiàn)故障時(shí),能夠?qū)⒄麄(gè)應(yīng)用流圖的狀態(tài)恢復(fù)到故障之前的某一狀態(tài),保 證應(yīng)用流圖狀態(tài)的一致性。Flink的checkpoint機(jī)制原理來自“Chandy-Lamport algorithm”算法。
每個(gè)需要checkpoint的應(yīng)用在啟動(dòng)時(shí),Flink的JobManager為其創(chuàng)建一個(gè) CheckpointCoordinator(檢查點(diǎn)協(xié)調(diào)器),CheckpointCoordinator全權(quán)負(fù)責(zé)本應(yīng)用的快照制作。
CheckpointCoordinator(檢查點(diǎn)協(xié)調(diào)器) 周期性的向該流應(yīng)用的所有source算子發(fā)送 barrier(屏障)。
當(dāng)某個(gè)source算子收到一個(gè)barrier時(shí),便暫停數(shù)據(jù)處理過程,然后將自己的當(dāng)前狀態(tài)制作成快照,并保存到指定的持久化存儲(chǔ)中,最后向CheckpointCoordinator報(bào)告自己快照制作情況,同時(shí)向自身所有下游算子廣播該barrier,恢復(fù)數(shù)據(jù)處理
下游算子收到barrier之后,會(huì)暫停自己的數(shù)據(jù)處理過程,然后將自身的相關(guān)狀態(tài)制作成快照,并保存到指定的持久化存儲(chǔ)中,最后向CheckpointCoordinator報(bào)告自身快照情況,同時(shí)向自身所有下游算子廣播該barrier,恢復(fù)數(shù)據(jù)處理。
每個(gè)算子按照步驟3不斷制作快照并向下游廣播,直到最后barrier傳遞到sink算子,快照制作完成。
當(dāng)CheckpointCoordinator收到所有算子的報(bào)告之后,認(rèn)為該周期的快照制作成功; 否則,如果在規(guī)定的時(shí)間內(nèi)沒有收到所有算子的報(bào)告,則認(rèn)為本周期快照制作失敗。
如果一個(gè)算子有兩個(gè)輸入源,則暫時(shí)阻塞先收到barrier的輸入源,等到第二個(gè)輸入源相 同編號(hào)的barrier到來時(shí),再制作自身快照并向下游廣播該barrier。具體如下圖所示:
假設(shè)算子C有A和B兩個(gè)輸入源
在第i個(gè)快照周期中,由于某些原因(如處理時(shí)延、網(wǎng)絡(luò)時(shí)延等)輸入源A發(fā)出的 barrier 先到來,這時(shí)算子C暫時(shí)將輸入源A的輸入通道阻塞,僅收輸入源B的數(shù)據(jù)。
當(dāng)輸入源B發(fā)出的barrier到來時(shí),算子C制作自身快照并向 CheckpointCoordinator 報(bào)告自身的快照制作情況,然后將兩個(gè)barrier合并為一個(gè),向下游所有的算子廣播。
當(dāng)由于某些原因出現(xiàn)故障時(shí),CheckpointCoordinator通知流圖上所有算子統(tǒng)一恢復(fù)到某個(gè)周期的checkpoint狀態(tài),然后恢復(fù)數(shù)據(jù)流處理。分布式checkpoint機(jī)制保證了數(shù)據(jù)僅被處理一次(Exactly Once)。
2. 持久化存儲(chǔ)1) MemStateBackend
該持久化存儲(chǔ)主要將快照數(shù)據(jù)保存到JobManager的內(nèi)存中,僅適合作為測(cè)試以及快照的數(shù)據(jù)量非常小時(shí)使用,并不推薦用作大規(guī)模商業(yè)部署。
MemoryStateBackend 的局限性:
默認(rèn)情況下,每個(gè)狀態(tài)的大小限制為 5 MB。可以在MemoryStateBackend的構(gòu)造函數(shù)中增加此值。
無論配置的最大狀態(tài)大小如何,狀態(tài)都不能大于akka幀的大小(請(qǐng)參閱配置)。
聚合狀態(tài)必須適合 JobManager 內(nèi)存。
建議MemoryStateBackend 用于:
本地開發(fā)和調(diào)試。
狀態(tài)很少的作業(yè),例如僅包含一次記錄功能的作業(yè)(Map,FlatMap,Filter,...),kafka的消費(fèi)者需要很少的狀態(tài)。
2) FsStateBackend
該持久化存儲(chǔ)主要將快照數(shù)據(jù)保存到文件系統(tǒng)中,目前支持的文件系統(tǒng)主要是 HDFS和本地文件。如果使用HDFS,則初始化FsStateBackend時(shí),需要傳入以 “hdfs://”開頭的路徑(即: new FsStateBackend("hdfs:///hacluster/checkpoint")), 如果使用本地文件,則需要傳入以“file://”開頭的路徑(即:new FsStateBackend("file:///Data"))。在分布式情況下,不推薦使用本地文件。如果某 個(gè)算子在節(jié)點(diǎn)A上失敗,在節(jié)點(diǎn)B上恢復(fù),使用本地文件時(shí),在B上無法讀取節(jié)點(diǎn) A上的數(shù)據(jù),導(dǎo)致狀態(tài)恢復(fù)失敗。
建議FsStateBackend:
具有大狀態(tài),長(zhǎng)窗口,大鍵 / 值狀態(tài)的作業(yè)。
所有高可用性設(shè)置。
3) RocksDBStateBackend
RocksDBStatBackend介于本地文件和HDFS之間,平時(shí)使用RocksDB的功能,將數(shù) 據(jù)持久化到本地文件中,當(dāng)制作快照時(shí),將本地?cái)?shù)據(jù)制作成快照,并持久化到 FsStateBackend中(FsStateBackend不必用戶特別指明,只需在初始化時(shí)傳入HDFS 或本地路徑即可,如new RocksDBStateBackend("hdfs:///hacluster/checkpoint")或new RocksDBStateBackend("file:///Data"))。
如果用戶使用自定義窗口(window),不推薦用戶使用RocksDBStateBackend。在自定義窗口中,狀態(tài)以ListState的形式保存在StatBackend中,如果一個(gè)key值中有多個(gè)value值,則RocksDB讀取該種ListState非常緩慢,影響性能。用戶可以根據(jù)應(yīng)用的具體情況選擇FsStateBackend+HDFS或RocksStateBackend+HDFS。
4) 語(yǔ)法val env = StreamExecutionEnvironment.getExecutionEnvironment()
// start a checkpoint every 1000 ms
env.enableCheckpointing(1000)
// advanced options:
// 設(shè)置checkpoint的執(zhí)行模式,最多執(zhí)行一次或者至少執(zhí)行一次
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
// 設(shè)置checkpoint的超時(shí)時(shí)間
env.getCheckpointConfig.setCheckpointTimeout(60000)
// 如果在只做快照過程中出現(xiàn)錯(cuò)誤,是否讓整體任務(wù)失敗:true是 false不是
env.getCheckpointConfig.setFailTasksOnCheckpointingErrors(false)
//設(shè)置同一時(shí)間有多少 個(gè)checkpoint可以同時(shí)執(zhí)行
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
5) 修改State Backend的兩種方式
第一種:單任務(wù)調(diào)整
修改當(dāng)前任務(wù)代碼
env.setStateBackend(new FsStateBackend("hdfs://namenode:9000/flink/checkpoints"));
或者new MemoryStateBackend()
或者new RocksDBStateBackend(filebackend, true);【需要添加第三方依賴】
第二種:全局調(diào)整
修改flink-conf.yaml
state.backend: filesystem
state.checkpoints.dir: hdfs://namenode:9000/flink/checkpoints
注意:state.backend的值可以是下面幾種:jobmanager(MemoryStateBackend), filesystem(FsStateBackend), rocksdb(RocksDBStateBackend)
6) Checkpoint的高級(jí)選項(xiàng)
默認(rèn)checkpoint功能是disabled的,想要使用的時(shí)候需要先啟用checkpoint開啟之后,默認(rèn)的checkPointMode是Exactly-once
//配置一秒鐘開啟一個(gè)checkpoint
env.enableCheckpointing(1000)
//指定checkpoint的執(zhí)行模式
//兩種可選:
//CheckpointingMode.EXACTLY_ONCE:默認(rèn)值
//CheckpointingMode.AT_LEAST_ONCE
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
一般情況下選擇CheckpointingMode.EXACTLY_ONCE,除非場(chǎng)景要求極低的延遲(幾毫秒)
注意:如果需要保證EXACTLY_ONCE,source和sink要求必須同時(shí)保證EXACTLY_ONCE
//如果程序被cancle,保留以前做的checkpoint
env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
默認(rèn)情況下,檢查點(diǎn)不被保留,僅用于在故障中恢復(fù)作業(yè),可以啟用外部持久化檢查點(diǎn),同時(shí)指定保留策略:
ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:在作業(yè)取消時(shí)保留檢查點(diǎn),注意,在這種情況下,您必須在取消后手動(dòng)清理檢查點(diǎn)狀態(tài)
ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION:當(dāng)作業(yè)在被cancel時(shí),刪除檢查點(diǎn),檢查點(diǎn)僅在作業(yè)失敗時(shí)可用
//設(shè)置checkpoint超時(shí)時(shí)間
env.getCheckpointConfig.setCheckpointTimeout(60000)
//Checkpointing的超時(shí)時(shí)間,超時(shí)時(shí)間內(nèi)沒有完成則被終止
//Checkpointing最小時(shí)間間隔,用于指定上一個(gè)checkpoint完成之后
//最小等多久可以觸發(fā)另一個(gè)checkpoint,當(dāng)指定這個(gè)參數(shù)時(shí),maxConcurrentCheckpoints的值為1
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
//設(shè)置同一個(gè)時(shí)間是否可以有多個(gè)checkpoint執(zhí)行
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
指定運(yùn)行中的checkpoint最多可以有多少個(gè)
env.getCheckpointConfig.setFailOnCheckpointingErrors(true)
用于指定在checkpoint發(fā)生異常的時(shí)候,是否應(yīng)該fail該task,默認(rèn)是true,如果設(shè)置為false,則task會(huì)拒絕checkpoint然后繼續(xù)運(yùn)行
2. Flink的重啟策略
Flink支持不同的重啟策略,這些重啟策略控制著job失敗后如何重啟。集群可以通過默認(rèn)的重啟策略來重啟,這個(gè)默認(rèn)的重啟策略通常在未指定重啟策略的情況下使用,而如果Job提交的時(shí)候指定了重啟策略,這個(gè)重啟策略就會(huì)覆蓋掉集群的默認(rèn)重啟策略。
1) 概覽
默認(rèn)的重啟策略是通過Flink的 flink-conf.yaml 來指定的,這個(gè)配置參數(shù) restart-strategy 定義了哪種策略會(huì)被采用。如果checkpoint未啟動(dòng),就會(huì)采用 no restart 策略,如果啟動(dòng)了checkpoint機(jī)制,但是未指定重啟策略的話,就會(huì)采用 fixed-delay 策略,重試 Integer.MAX_VALUE 次。請(qǐng)參考下面的可用重啟策略來了解哪些值是支持的。
每個(gè)重啟策略都有自己的參數(shù)來控制它的行為,這些值也可以在配置文件中設(shè)置,每個(gè)重啟策略的描述都包含著各自的配置值信息。
重啟策略重啟策略值Fixed delayfixed-delayFailure ratefailure-rateNo restartNone
除了定義一個(gè)默認(rèn)的重啟策略之外,你還可以為每一個(gè)Job指定它自己的重啟策略,這個(gè)重啟策略可以在 ExecutionEnvironment 中調(diào)用 setRestartStrategy() 方法來程序化地調(diào)用,注意這種方式同樣適用于 StreamExecutionEnvironment。
下面的例子展示了如何為Job設(shè)置一個(gè)固定延遲重啟策略,一旦有失敗,系統(tǒng)就會(huì)嘗試每10秒重啟一次,重啟3次。
val env = ExecutionEnvironment.getExecutionEnvironment()
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
3, // 重啟次數(shù)
Time.of(10, TimeUnit.SECONDS) // 延遲時(shí)間間隔
))
2) 固定延遲重啟策略(Fixed Delay Restart Strategy)
固定延遲重啟策略會(huì)嘗試一個(gè)給定的次數(shù)來重啟Job,如果超過了最大的重啟次數(shù),Job最終將失敗。在連續(xù)的兩次重啟嘗試之間,重啟策略會(huì)等待一個(gè)固定的時(shí)間。
重啟策略可以配置flink-conf.yaml的下面配置參數(shù)來啟用,作為默認(rèn)的重啟策略:
restart-strategy: fixed-delay
配置參數(shù)描述默認(rèn)值restart-strategy.fixed-delay.a(chǎn)ttempts在Job最終宣告失敗之前,Flink嘗試執(zhí)行的次數(shù)1,如果啟用checkpoint的話是Integer.MAX_VALUErestart-strategy.fixed-delay.delay延遲重啟意味著一個(gè)執(zhí)行失敗之后,并不會(huì)立即重啟,而是要等待一段時(shí)間。akka.a(chǎn)sk.timeout,如果啟用checkpoint的話是1s
例子:
restart-strategy.fixed-delay.a(chǎn)ttempts: 3
restart-strategy.fixed-delay.delay: 10 s
固定延遲重啟也可以在程序中設(shè)置:
val env = ExecutionEnvironment.getExecutionEnvironment()
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
3, // 重啟次數(shù)
Time.of(10, TimeUnit.SECONDS) // 重啟時(shí)間間隔
))
3) 失敗率重啟策略
失敗率重啟策略在Job失敗后會(huì)重啟,但是超過失敗率后,Job會(huì)最終被認(rèn)定失敗。在兩個(gè)連續(xù)的重啟嘗試之間,重啟策略會(huì)等待一個(gè)固定的時(shí)間。
發(fā)表評(píng)論
請(qǐng)輸入評(píng)論內(nèi)容...
請(qǐng)輸入評(píng)論/評(píng)論長(zhǎng)度6~500個(gè)字
最新活動(dòng)更多
-
10月31日立即下載>> 【限時(shí)免費(fèi)下載】TE暖通空調(diào)系統(tǒng)高效可靠的組件解決方案
-
即日-11.13立即報(bào)名>>> 【在線會(huì)議】多物理場(chǎng)仿真助跑新能源汽車
-
11月28日立即報(bào)名>>> 2024工程師系列—工業(yè)電子技術(shù)在線會(huì)議
-
12月19日立即報(bào)名>> 【線下會(huì)議】OFweek 2024(第九屆)物聯(lián)網(wǎng)產(chǎn)業(yè)大會(huì)
-
即日-12.26火熱報(bào)名中>> OFweek2024中國(guó)智造CIO在線峰會(huì)
-
即日-2025.8.1立即下載>> 《2024智能制造產(chǎn)業(yè)高端化、智能化、綠色化發(fā)展藍(lán)皮書》
推薦專題
- 1 【一周車話】沒有方向盤和踏板的車,你敢坐嗎?
- 2 特斯拉發(fā)布無人駕駛車,還未迎來“Chatgpt時(shí)刻”
- 3 特斯拉股價(jià)大跌15%:Robotaxi離落地還差一個(gè)蘿卜快跑
- 4 馬斯克給的“驚喜”夠嗎?
- 5 打完“價(jià)格戰(zhàn)”,大模型還要比什么?
- 6 馬斯克致敬“國(guó)產(chǎn)蘿卜”?
- 7 神經(jīng)網(wǎng)絡(luò),誰(shuí)是盈利最強(qiáng)企業(yè)?
- 8 比蘋果偉大100倍!真正改寫人類歷史的智能產(chǎn)品降臨
- 9 諾獎(jiǎng)進(jìn)入“AI時(shí)代”,人類何去何從?
- 10 Open AI融資后成萬(wàn)億獨(dú)角獸,AI人才之爭(zhēng)開啟
- 高級(jí)軟件工程師 廣東省/深圳市
- 自動(dòng)化高級(jí)工程師 廣東省/深圳市
- 光器件研發(fā)工程師 福建省/福州市
- 銷售總監(jiān)(光器件) 北京市/海淀區(qū)
- 激光器高級(jí)銷售經(jīng)理 上海市/虹口區(qū)
- 光器件物理工程師 北京市/海淀區(qū)
- 激光研發(fā)工程師 北京市/昌平區(qū)
- 技術(shù)專家 廣東省/江門市
- 封裝工程師 北京市/海淀區(qū)
- 結(jié)構(gòu)工程師 廣東省/深圳市