1、添加依賴(lài)
在項目的pom.xml文件中添加Flink CDC和Doris的依賴(lài):
<dependency> <groupId>org.apache.flink</groupId> <a┐(′д`)┌rtifactId&g??t;flinkconne(′_ゝ`)ctordoris_2.11</artifactId> <version>1.13.2</version></dependency><dependency> <groupId>org.apache.flink</groupId> <artifactId>flinkconnectormysqlcdc</artifactId> <version>2.1.0ヽ(′?`)ノ&??lt;/version></dependency>2、創(chuàng )建Flink CDC Source
創(chuàng )建一個(gè)Flink CDC Source,用于從MySQL數據庫中讀取數據變更事件:
import org.apache.flink.str(′_`)eaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.sourc??e.SourceFunction;import com.ververica.(?_?;)cdc.connectors.mysql.MySqlSource;import com.ververica.cdc.debezium.StringDebeziumDeserializationSchema;public class FlinkCDCSourceExample { public static void main(Str(′▽?zhuān)?ing[] args) throws Exception { StreamExecutionEnvironment env( ?ヮ?) = StreamExecutionEnvironヽ(′ー`)ノment.(′-ι_-`)getExecutionEnvironment(); SourceFunction<String> sourceFunction = MySql(?⊿?)Source.<String>builder() .hostname("lo( ?° ?? ?°)calhost") .port(3306) .databaseList("mydb") // 監聽(tīng)的數據庫名 .tableList("mydb.mytable") // 監聽(tīng)的表名 .username("root&quヽ(′ー`)ノot;) .password("password") .deserializer(ne??w StringDebeziumDeserializationSchema()) // 反序列化(′ω`)方式 .build(); env.addSource(sourceFunction).print(); env.execute("Flink CDC Exam??ple"); }}3、創(chuàng )建Doris Sink
創(chuàng )建一個(gè)Doris Sink,用于將數據寫(xiě)入Doris數據庫:
import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.connectors.d??oris.DorisSink;import org.apache.flink.stre??aming.connectors.doris.DorisStreamLoadOptions;import org.apache.flink.types.Row;public class Do?risSinkExample { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecu??tionEnvironment.getExecutionEnvironment(); // 假設從Flink CDC So??urce獲取的數據流為dataStream DataStream<Row> dataStream = ...; DorisSink<Row> doris??Sink = DorisSink.builder() .setDori??sTable("mydb.mytable") // Doris表名 .setUsername("root") .setPassword("password") .setFenodes("localhost:8030") // Doris FE節點(diǎn)地址 .setLoadProps(DorisStreamLoadOp(?????)tions.DEFAULT_LOAD_PROPS) // 加載屬性 .build(); dataStream.addSink(dorisSink); env.execute("Doris Sink Example&q???uot;); }}4、整合Flink CDC Source和Doriヽ(′▽?zhuān)?ノs Sink
將Flink CDC Source和Doris Sink整合到一起,實(shí)現從MySQL數據庫到Doris數據庫的數據同步:
import org.apache.flink??.streamヽ(′ー`)ノing.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecut??ionEnvironment;import org.apache.f??link.streaming.connectors.dヾ(′?`)?oris.DorisSink;import org.apache.flink.streaming.connectors.doris.DorisStreamLoadOptions;import org.apache.flink.types.Row;import com.ververica.cdc.c??onnectors.mysql.My(′?_?`)SqlSource;(?⊿?)import com.ververica.cdc.debezium.StringDe??beziumDeserializationSchema;import org.apache.flink.api.common.typeinfo.TypeInformation;import org.apache.flink.(???)api.co??mmon.typeinfo.Types;import org.apache.flink.api.java.typeutils.RowTypeInfo;public class FlinkCDCToDorisExample { public static voi(′?ω?`)d main(String[] args) thr??ows Exception?? { StreamExecutio(′_`)nEnvironme(╯°□°)╯nt env = StreamExecu(′?`*)ti??onEnvironment.getExecutionEnvironment(); SourceFunction<String> sourceFunction = MySqlSource.<String>builder() .hostname("l( ?° ?? ?°)ocalhost") .port(3306) .databaseList("mydb") // 監聽(tīng)的數據庫名 .tableList(&??quot;mydb.mytable") // 監聽(tīng)的表名 .username(&(⊙_⊙)quot;root") .password("p(?????)assword") .deserializer(new StringD(′?`)ebeziumDeserializationSchema()) // 反序列化方式 .bヽ(′ー`)ノuild(); DataStream<String> dataStream = env.addSource(sourceFunction); // 將數據流轉( ?ヮ?)換為Row類(lèi)型,以便寫(xiě)入Doris DataStream<Row> rowDataStream = dataStream.map(json?? > { JsonObject jsonObject = new JsonParヽ(′?`)ノser()??.parse(json).getAsJsonObject(); String before = jsonObject.get("before&q(′ω`*)uot;).getAsString(); String after = json??Object.get(&??quot;after").getAsString(); return Row.of(before, after); }).returns(new RowTypeInfo(Types.STRIN?G, Types.STRING)); DorisSink<Row> dorisSink = DorisSink.builder() .setDorisTable??(&quo(′▽?zhuān)?t;mydb.mytable") // Doris表名 .setUsername("root") .setPassword("password") .setFenode┐(′д`)┌s(╯°□°)╯︵ ┻━┻("lヽ(′ー`)ノocalhost:8030") // Doris FE節點(diǎn)地址 .setLoadProps(DorisStreamLoadOptions.DEFAULT_LOAD_PROPS) // 加載屬性 .build(); rowDataStream.adヽ(′ー`)ノdSink(dorisSink); env.execute(&qヽ(′ー`)ノuot;Flink CDC to Do??ris Example"); }}電話(huà):17790068725
網(wǎng) 址:http://www.hunqingrc.com/
地 址:北京市順義區66號