KeyValue_KafkaStreams樣例
KeyValue_KafkaStreams樣例是樣例一個(gè)展示如何使用Kafka Streams API處(chu)理鍵值對數據的示例。該樣例演示了如何創(chuàng )建一個(gè)簡(jiǎn)單的樣例Kafka Streams應用程序,用于讀取、樣例轉換和寫(xiě)入鍵值對數據到Kafka主題。樣例
KeyValue KafkaStreams 樣例
(圖片來(lái)源網(wǎng)絡(luò ),樣例侵刪)以下是樣例一個(gè)使用 Java 編寫(xiě)的簡(jiǎn)單 KeyVa(′?ω?`)lue KafkaStreams 示(shi)例,這個(gè)示例展示??了如何從 Kafka 主題中讀取數據,樣例對數據進(jìn)行處理,樣例(′?`)并將處理后的樣例數??據寫(xiě)入另一個(gè) Kafka 主題。
確保你的樣例項目中包含了以下依賴(lài):
<dependen(╯°□°)╯︵ ┻━┻cies(′▽?zhuān)?)> <dependency> <groupId>org.apache.kafka(′?`*)</groupId> <artifactId>kafkastreams</ar??tifactId> <version>2.8.0&(′_`)lt;/version> </dependency> <dependen(′?ω?`)cy> <groupId>org.apache.kafka</groupId> <artifactId&??gt;kafkaclients</artifac??tId> <version>2.8.0</version> </dependency>&l??t;/dependencies>代碼示例
import org.apa??che.kafka(′?ω?`).co??mmon.serialization.Se??rdes;import org.apache.kafka.st(′_ゝ`)reams.KafkaStreams;import org.apache.kafka.streams.KeyValue;import org.apache.kaf??ka.streams.StreamsBuildeヽ(′▽?zhuān)?ノr;import org.apache.kafka.streams.StreamsConfig;import org.apache.kafka.streams.kstream.KStream;import org.apache.kafka.strea??mヽ(′▽?zhuān)?ノs.kstream.Printed;import java(′ω`).util.Properties;public class KeyValueKafkaStr??eamsExample { public static void main(String[] args) { // 配置 Kafka Streams Properties config = new Prop(╯°□°)╯︵ ┻━┻erties(); config.put(StreamsCon( ???)fig.APPLICATION_ID_CONFIG, "keyvalueexample"( ?° ?? ?°)); config.put(Stre??amsConfig.BOOTSTRAP_SERVERS_CONFIG, &q(′?`*)u(′_ゝ`)ot;localhost:9092"); config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().??getClass()); config.put(StreamsConfigヽ(′ー`)ノ.DEFAULT_(′ω`)VALUE_SERDE_CLASS_CONFIG, Serdes.Stヽ(′?`)ノring().getClass()); // 構建 StreamsBuilder StreamsBu(′_`)ilder builder = new StreamsBuilder(); //?? 從源主題讀取數據 KStream<String, String> sourceStream = builder.str??eam("sourcetopic"); // 對數據??進(jìn)行處理(這里只是簡(jiǎn)單地將鍵和值連接起來(lái)) KStream<String, String> processedStream = sourceStream.map((key, value) > new KeyValue&??lt;>(key, key + "" + value)); // 將處理后的數據寫(xiě)入目標主題 processedStream.to("targettopic", Printed.toSysOut().withLabel("Processed Stream")); // 啟動(dòng) Kafka Streams 應用程序 KafkaStreams streams = new KafkaStreams(builder.build(), config); streams.start(); // 添加關(guān)閉鉤子以?xún)?yōu)雅地??關(guān)閉應用程序 Runtime.get(╯‵□′)╯Runtime().ad??dShutdownHook(new Thread(streams::close)); }}??單元表格
| 序號 | 功能描述 |
| 1 | 導入所需的庫 |
| 2 | 設置 Kaf(?????)ka Stream??s 的配置??參數 |
| 3 | 創(chuàng )建 StreamsBuilder 實(shí)例 |
| 4 | 從源主題讀取數據 |
| 5 | 對數據進(jìn)行處理(這里是一個(gè)簡(jiǎn)單的字符串拼接操作) |
| 6 | 將處理后的數據寫(xiě)入目標主(′?ω?`)題 |
| 7 | 啟動(dòng) Kafka Streams 應用程(cheng)序 |
| 8 | 添加關(guān)閉鉤子以確保應用程序在退??出時(shí)正確關(guān)閉 |
下面是一個(gè)關(guān)于KeyVa??lue對在Kafka Streams中使用的示例介紹,在這個(gè)場(chǎng)景ヽ(′?`)ノ中,樣例假設我們有一??個(gè)簡(jiǎn)單的樣例應用程序,它從一個(gè)(′?`*)主題接收消息??,樣例處理這些消息,樣例并將結果寫(xiě)入另一個(gè)主題。樣例
(圖片來(lái)源網(wǎng)絡(luò ),侵刪)| 步驟 | 描述 | Kafka Streams代碼樣例 |
| 1. 創(chuàng )建(′_`)流處理拓撲 | 定義一個(gè)流處理拓撲,該拓撲會(huì )從一個(gè)主題接收數據。 | KStream |
| 2. 處理( ?ω?)消息 | 對接收到的每條(tiao)消息進(jìn)行處理,這里使(′?_?`)用簡(jiǎn)單的(de)map函數來(lái)演示。 | KStream |
| 3. 轉換鍵值對 | 可以選擇對ヽ(′▽?zhuān)?ノ鍵(Key)或值(Value)進(jìn)行轉換,以下示例僅更改了鍵。 | KStream |
| 4. 數據聚合 | 可以按照鍵對數據進(jìn)行聚合,這里假設我們正在對相同的鍵進(jìn)行計數。 | KTab(′ω`)le |
| 5. 將結果(guo)寫(xiě)回主題 | 將處理后的數據寫(xiě)回到Kafka主題。 | aggregatedStream.toStream().to("out??put_topic", Produced.with(Serdes.Integer(), Serdes.Long())); |
| 6. 啟動(dòng)流處理 | 啟動(dòng)流ヾ(′ω`)?處理應用程序。 | Stre??amsConfig config = new StreamsConfig(props); |
在這個(gè)介紹中,我們定(????)義了一個(gè)流處理拓撲,它接收鍵值對(在本例中,鍵和值都是字符串類(lèi)型),并執行以下操作:
使用mapValues對每個(gè)值應用一個(gè)函數(本例中是計算字符串長(cháng)度)。
使用selectKey改變鍵(本例中是根據處理后的值來(lái)重新定義鍵)。
使用groupByKey和count按新鍵進(jìn)行聚合??。(′?ω?`)
將處理后的流(一個(gè)鍵和計數值的流)寫(xiě)入新的Kafka主題。
