?

MapReduce是作業(yè)中一種編程模型,用于(yu)處理和生成大數據集,效管它由兩個(gè)主要(yao)階段組成:Map階段和Reduce階段,作業(yè)中在Map階段,效管輸入數據被分割成多個(gè)獨立的作業(yè)中??塊,然后(hou)每(mei)個(gè)塊被映射到一個(gè)鍵值對,效管在Reduce階段,作業(yè)中所有具有相同??鍵的鍵值對被組合在一起,并應用一個(gè)規約函數以生成最終結果。
(圖片來(lái)源網(wǎng)絡(luò ),侵刪)以下是一個(gè)簡(jiǎn)單的MapReduce示例,使用??Java編寫(xiě),并使用了Hadoop庫來(lái)執行MapReduce任務(wù),在這個(gè)示例中,我們將計算文本文件中單詞的出現次數。
我們需要創(chuàng )建一個(gè)名為WordCou(/ω\)ntMapper的類(lèi),該類(lèi)繼承自Mapper類(lèi),并實(shí)現map方法。map方法接收一個(gè)鍵值對作為輸入,其中鍵是輸入數據的偏移量,值是(???)輸入數據的一部分,在這個(gè)例子中,我們不需要鍵,所以我們將其忽略。
import java.io.IOException;import org.apa(′Д` )che(′?ω?`).hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;(/ω\)import org.apache.had??oop.io.Text;import org.apache.hadoop.mapreduce.Mapper;public class WordCountM(′?`)apper extends Map(?????)per&l??t;LongWritable, Text, Text, IntWritable> { private final stati(╯°□°)╯︵ ┻━┻c(′ω`) IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(LongWritable key, Text value, Context context) thr??ows IOException, InterruptedException { String[] words = value.toString().split("??;\s+"); for (String w : wo??rds) { word.set(w); context.wri??t??e(word, one); } }}我們需要創(chuàng )建一個(gè)名為Wor??dCountReducer的類(lèi),該類(lèi)繼承自Reducer類(lèi),并實(shí)現reduce方法。red??uce方法接收一個(gè)鍵和一個(gè)迭代器,其??中迭代器包含與該鍵關(guān)聯(lián)的(de)所有值,在(zai)這個(gè)例子中,我們將累加這些值以計算單詞的出現次數。
import java.io.IOE??xception;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapre(′?`)duce.Reducer;public class WordCountReducer extends?? Reducer<Text, IntWritable, Text, IntWritable> { public void reduce(Text key, Iterable<In(′?_?`)tWritable> va(╬ ò﹏ó)lues, Context context) throws IOExceptio??n, InterruptedException { int sum = 0; for (IntWritable val : values(╯°□°)╯︵ ┻━┻) { sum += val.get(); } context.write(key, new IntWritable(sum)); }}我們需要創(chuàng )建一個(gè)名為WordCount的主類(lèi),該類(lèi)繼承自Configured類(lèi),并實(shí)現run方法,在這個(gè)方法中,我們將配置和運行MapReduce作業(yè)。
import org.apache.hadoop.conf.Configuration;i??m??port org.apache.hadoop.fs.Path??;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.m(╯°□°)╯apreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutput(′?`*)Format;??import org.apache.hadoop.util.Ge?nericOptionsParser;public?? class WordCount extends Configured implements Tool { public int run(St??ring[] args) throws Exception { Configuration conf = getConf(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 2) { System.err.prin(′?ω?`)tln("Usage: wordcount <in> <out>(′_ゝ`);"); System.exit(2); } Job job = Job.getInstance(conf, "word count"); job.setJarByClass(WordCount.class); job.setMapperC(◎_◎;)lass(WordCountMapper.class); job.setCombinerClass(WordCountReducer.class); job.setReducerClass(WordCountReducer.class); jo(′?_?`)b.setOutput??KeyClass(Text.(′?`)class); job.setOutputValueClass(IntWr(?????)itable??.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOut(???)putFormat.??setOutputPath(job, new Path(otherArgs[1])); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] argsヾ(′?`)?) throws Exception { int ex??itCode = ToolRunner.run(new WordCount(), args); System.exit(exitCode); }}hadoop jar wordcount.jar WordCount input.t??xt output這將計算input.txt文件中每個(gè)單詞的出現次數,并將結果寫(xiě)入output目錄。
友情鏈接:
東方來(lái)隆網(wǎng)絡(luò )科技有限公司
© 2013-2025.Company name All rights reserved.網(wǎng)站地圖 天津九安特機電工程有限公司-More Templates