
05
Dec
Tutorial 2 : Hadoop Map Reduce Global variable
Objective :
Write a MapReduce program that searches for occurrences of a given string in a large file. Probably you may think about using a grep command line and that’s it. But what if the size of the file was too large it will take too much time.
- First of all you need to insure that you successfully installed hadoop on your machine . Check this link if you need to know how to install it.
- Then you should start hadoop daemon by invoking this scripts:
start-dfs.sh
start-yarn.sh
- Ok, one last step before starting, you need to copy the input files into your locale hadoop file system, and create some directories in hdfs before copying. Download the two input files (they are small files just for testing) : download link
- After that create paths in hdfs by invoking :
hdfs dfs -mkdir -P /training/lab2/inputs/
- After that copy files to hdfs by invoking this command like : hdfs dfs -copyFromLocal <localPathOfFiles> /training/lab2/inputs/
- For example if you downloaded the files into Downloads/lab2/inputs, than the command line should be: hdfs dfs -copyFromLocal ~/Downloads/lab2/inputs/* /training/lab2/inputs/
- Now that everything is already setup, let’s start coding, First you should create a Job class that extends Configured (so you get the configuration from the installation files “core-site.xml etc ….”) and implements Tool (By doing this you can invoke your job from command line via hadoop jar command). By writing this class you will give the job information about the input format, output format, the mapper, the reducer, the key and value output format of mapper and reducer etc …
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
public class SearchJob extends Configured implements Tool { @Override public int run(String[] args) throws Exception { if (args.length != 3) { System.err.println("error"); return -1; } // getConf method it's inherited from Configured class, that's why we should extends Configured Configuration conf = getConf(); // we need to set the search word into configuration, so we can get it in the mapper conf.set("searchingWord", args[2]); Job job = Job.getInstance(conf, "TP2"); //Give the job the name of the main class job.setJarByClass(SearchJob.class); // Specify the input format job.setInputFormatClass(TextInputFormat.class); // Specify the output format job.setOutputFormatClass(TextOutputFormat.class); // Specify the output paths in the hdfs TextOutputFormat.setOutputPath(job, new Path(args[1])); // specify the input paths in the hdfs TextInputFormat.setInputPaths(job, new Path(args[0])); // Give the job the name of the mapper class job.setMapperClass(SearchMapper.class); // Give the job the name of the reducer class job.setReducerClass(SearchReducer.class); // Give the job the name of the combiner class job.setCombinerClass(SearchCombiner.class); // set the key output type of the reducer job.setOutputKeyClass(Text.class); // set the value output type of the reducer job.setOutputValueClass(IntWritable.class); // because the mapper output key and value are same to the reducer output key and value there is no need to add // job.setMapOutputKeyClass(Text.class); // job.setMapOutputValueClass(IntWritable.class); // run the job int r = job.waitForCompletion(true) ? 0 : 1; System.out.println(job.getCounters().findCounter("UserCounters","numberOfLines").getValue()); return r; } public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new SearchJob(), new String[] { "hdfs://localhost:9000/training/lab2/inputs","hdfs://localhost:9000/training/lab2/output","the" }); System.exit(exitCode); } } |
- Now let’s have a look on the mapper, the role of the mapper is to search for the word.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
// the setup method will be executed first // than on each line the map function will be executed public class SearchMapper extends Mapper<LongWritable,Text,Text,IntWritable>{ // to store the searching word private String searchString; @Override protected void setup(Mapper.Context context) throws IOException, InterruptedException { // get the searchingWord from configuration searchString = context.getConfiguration().get("searchingWord"); } @Override protected void map(LongWritable key, Text value, Mapper.Context context) { // split line into words separated by a " " String [] words = value.toString().split(" "); // count the number of occurrence int counter = 0; for (int i=0; i< words.length;i++) { if(searchString.equals(words[i])){ counter++; } } // write the word as a key and the number of occurrence as a value context.write(new Text(searchString), new IntWritable(counter)); } } |
- If the file was too large, it will be split on multiple block, and the number of mappers will be equal to the number of blocks. So to minimize the transfer of data we could write a combiner
1 2 3 4 5 6 7 8 9 10 11 |
public class SearchCombiner extends Reducer<Text, IntWritable, Text, IntWritable< { @Override protected void reduce(Text key, Iterable<IntWritable> values, Reducer.Context context) throws IOException, InterruptedException { int counter = 0; for (IntWritable value : values) { counter += value.get(); } context.write(key , new IntWritable(counter)); } } |
- Now let’s have a look at the reducer, the role of the reducer will be to sum the values, that’s it :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
public class SearchReducer extends Reducer<Text, IntWritable, Text, IntWritable< { @Override protected void reduce(Text key, Iterable<IntWritable> values, Reducer.Context context) throws IOException, InterruptedException { int counter = 0; for (IntWritable value : values) { counter += value.get(); } context.write(key , new IntWritable(counter)); } } |
- Note: we could instead of writing our own reducer we could use IntSumReducer<Key>
1 2 3 |
java.lang.Object org.apache.hadoop.mapreduce.Reducer org.apache.hadoop.mapreduce.lib.reduce.IntSumReducer |
and use Text as a key
- Note: also to minimise the transfer of data, in the case of a line where there is no appearance of the searching word, our mapper will write (“searchingWord”,0) so it would be better to write nothing in this case, so we should add
1 2 3 |
if (counter!=0) { context.write(key , new IntWritable(counter)); } |
- Now after coding, export the jar as a runnable jar and specify MinMaxJob as a main class, then open terminal and run the job by invoking : hadoop jar nameOfTheJar.jar
for example if you give the jar the name lab1.jar than the command line will be : hadoop jar lab2.jar and have a look on the result by invoking : hdfs dfs -cat /training/lab2/output/part-r-00000
Ananthi
Great and interesting article to read.. i Gathered more useful and new information from this article.thanks a lot for sharing this article to us..
big data training in chennai
Credo Systemz
Interesting article to read.. I got more useful information about mapreduce from this article.. thanks a lot for sharing
hadoop training