MapReduce是何實(shí)一(′ω`)種編程模型,用于處理和生成大數據集,處理在Hadoop生態(tài)系統中,輸入它被廣泛用于分布式計算,何實(shí)當處理多個(gè)輸入文件時(shí),處理可以使用MapReduce來(lái)并行處理這些文件,輸入并將??結果匯總到一個(gè)輸出文件中。何實(shí)
(圖片來(lái)源網(wǎng)絡(luò ),處理侵刪)假設我們有兩個(gè)CSV文??件作為輸入,輸入每個(gè)文件包含一些數據,我們需要將這些數據合并到一個(gè)新的CSV文件中,以下是一個(gè)簡(jiǎn)單的MapReduce程??序示例,用于處理兩個(gè)CSV文件的輸入:
1.Mapper :讀取輸入文件的每一行(xing),并將其轉換為鍵值對(keyva(?_?;)lue(′?`*) pair),在這個(gè)例子中,我們可以將每行的行號作為鍵,整行內容作為值。
import sysdef mapper(): for line in sys.stdin: # 移除行尾的換行符 line = line.strip() # 使(shi)用行號作為鍵 key = line.split(',')[0] # 輸出鍵值對 print(f&qu┐(′?`)┌ot;{ key}t{ line}(′?`*)")2.Reducer :接收Mapper輸出的鍵值對,并根據鍵進(jìn)行分組,在這個(gè)例子中,我們將具有相同鍵的所有行合并到一起,我們可以將這些行寫(xiě)入一個(gè)新的CSV文件。
import sysdef reducer(): # 初始化一個(gè)空字典來(lái)存儲鍵值對 data_dict = { } # 從標準輸入讀取鍵值對 for line in sys.stdin: key, value = line.strip().split('t') if key not in data_di( ?▽?)ct: data_dict[key] = [] data_dict[key].app??end(??value) # 輸出合并后的數據到新的CSV文件 for key, values inヾ(′▽?zhuān)?? data_dict.items(): print(f"{ key},{ ','.join(values)}(?Д?)") 3.??運行MapReduce作業(yè) :使用Hadoop Streaming工具運行MapReduce作業(yè),需要將上述P??ython腳本保存為mapper.py和reducer.py,通過(guò)以下命令運行MapRed(′?_?`)uce作業(yè):
hadoop jar /path/to/hadoopstreaming.jar n input /path/to/input1.csv,/path/to/input2.csv n output /path/to/outp??ut n mapper "pyt??hon3 mapper.py" n reducer "python??3 reducer.py" n file mapper.py n file reducer.py
Copyright ? 2012-2018 天津九安特機電工程有限公司 版權所有 備案號: