在Flink 1.7中,請里要將checkpoint持久化到阿里云OSS,何設需要進(jìn)行以(yi)下步驟:
(圖片來(lái)源網(wǎng)絡(luò ),置持侵刪)1、久化引入相關(guān)依賴(lài)
在項目的請里pom.xml文件中添加Flink OSS Connector的依賴(lài):
<dependency> <groupId>com.alヽ(′▽?zhuān)?ノibaba.ververica</groupId> <artifactId>flinkconnect??oross_2.11</artifactId> <version>1.0.0</version></??dependency>
2、配置OSS參數
在Flink的何設配置文件(如flinkconf.yaml)中添加以下OS(?????)S相關(guān)參數:
OS┐(′ー`)┌S Access Key IDfli(╯°□°)╯nk.oss.accesskeyid: <your_access_key_id>OSS Access Key Secretflink.oss.ac(′?ω?`)cesskeysecret: <yoヽ(′▽?zhuān)?ノur_access???_key_secret>OSS Endpointflink.oss.endpoint: <your_endpoint&g(′▽?zhuān)?)t;OSS Bucketflヽ(′▽?zhuān)?/ink.oss.bucket: <your_bucket>OSS Path Prefixflink.oss.pathprefix: <yo(?????)ur_patヽ(′?`)ノh_(′▽?zhuān)?prefix>OSS File System Typeflink.oss.filesysteヾ(^-^)ノmtype: <your_filesystem_type>
3、設置Checkpoint配置
在Flink程序中設置Checkpoint配置,置持將Checkpoi(′?`)nt存儲到OS(′ω`)S:
im(′▽?zhuān)?port org.apache.flink.contrib.str??eaming.state.RocksDBStateBackend;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.checkpoint.CheckpointConfig;import org.apache.flink.streaming.api.checkpoint.CheckpointingMode;import or(????)g.apache??.flink.streaming.api(′?_?`).checkpoint.CheckpointStorageLocation??;import org.apache.flink.streaming.api.checkpoint.Checkpo??intWriter;public class FlinkCDCJob { public stati(′▽?zhuān)?)c void main(String[] args) throws Exception { // 創(chuàng )建執行環(huán)境 StreamExecutionEnviron┐(′д`)┌ment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 設置Checkpoint配置 CheckpointConfig config = env.getCheckpointConfig(); config.setCheckpointingMode(CheckpointingMode.E??XACTLY_ONCE); config.setCheckpointInterval(60000); // 檢查點(diǎn)間隔為1分鐘 config.setMaxConcur(°□°)rentCheckp(′ω`)oints(1); // 最多同時(shí)進(jìn)行一個(gè)檢查點(diǎn) config.setMinPauseBetweenCheckpoints(30000); // 檢查點(diǎn)之間的久化最小暫停時(shí)間為30秒 con??fig.setCheckpoi(′_ゝ`)nt(′▽?zhuān)?Timeout(10000); // 檢查點(diǎn)超時(shí)時(shí)間為10秒 config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); // 設置Checkpoint存儲位置 CheckpointStorageLocation storageLocation = new CheckpointStorageLocation("hdfs://localhヾ(′?`)?ost:9000??/flink/checkpoints"); storageLocation.setDefaultFilesystemType("oss&qu??ot;); storageLocation.setFilesystemURI("oss://your_bucket/your_path_prefix"??);?? // 設置Ch(′ω`*)eckpoint寫(xiě)入器 env.setStateBackend(new RocksDBSt??ateBackend(storageLocation)); env.se??tCheckpointWriterFactory(new Checkp(′▽?zhuān)?oin???tWriterFactory<>(storageLoca??tion)); // 啟動(dòng)作業(yè) env.execute("Flink CDC Job"); }}通過(guò)以上步驟,可以將Flin(′ω`*)k 1.7的請里checkpoint持久化到阿里??云OSS。
何設