
05
Dec
Tutorial 1: Using Combiner
in Hadoop labs
Comments
At the beginning we will start with a simple hadoop job. Suppose that we have some big files where each line contain temperature degree, and we want to get the maximum and minimum.
-
-
- I hear you saying why map reduce I can do it in a sequence java program, ok then how much time does it take to get the result from a file higher then 4GB for example …
-
-
-
- Let’s start with hadoop installation by folling this link .
-
-
-
- Then you should start hadoop daemon by invoking this scripts:
start-dfs.sh
start-yarn.sh
- Then you should start hadoop daemon by invoking this scripts:
-
-
-
- You need to create some directories in hdfs and copy the input files into your locale hadoop file system.
-
-
-
- So download the two input files (they are small files just for testing) : download link
-
-
-
- After that, create paths in hdfs by invoking : hdfs dfs -mkdir -p /training/lab1/inputs/
-
-
-
- Then, copy them to hdfs by invoking a command like this: hdfs dfs -copyFromLocal /training/lab1/inputs/
-
for example if you downloaded the files into Downloads/lab1/inputs/, then the command line should be: hdfs dfs -copyFromLocal ~/Downloads/lab1/inputs/* /training/lab1/inputs/
-
-
- Now that everything is already setup, let’s start coding, First you should create a Job class that extends Configured class and implements Tool interface. By writing this class you will give the job all the information about the input format, output format, the mapper, the reducer, the key and value output format of mapper and reducer etc … яндекс
-
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
public class MinMaxJob extends Configured implements Tool { @Override public int run(String[] args) throws Exception { if (args.length != 2) { System.err.println("error"); return -1; } // getConf method it's inhertired from Configured class, that's why we should extends Configured Configuration conf = getConf(); Job job = Job.getInstance(conf, "TP1"); // Give the job the name of the main class job.setJarByClass(MinMaxJob.class); // Specify the input format, which will have impact on the key and the value // type of the mapper inputs. job.setInputFormatClass(TextInputFormat.class); // By specifying TextOutputFormat as an output the format of your file will be // Key.toString()than 4 spaces (\t) than value.toString(), for example 12 14 job.setOutputFormatClass(TextOutputFormat.class); // specify the input paths in the hdfs TextInputFormat.setInputPaths(job, new Path(args[0])); // because if we run a job and give it an output that is already exist, the job will fail TextOutputFormat.setOutputPath(job, new Path(args[1])); // Give the job the name of the mapper class job.setMapperClass(MinMaxMapper.class); // Give the job the name of the reducer class job.setReducerClass(MinMaxReducer.class); // set the key output type of the mapper job.setMapOutputKeyClass(IntWritable.class); // set the value output type of the mapper job.setMapOutputValueClass(IntWritable.class); // set the key output type of the reducer job.setOutputKeyClass(NullWritable.class); // set the value output type of the reducer job.setOutputValueClass(Text.class); //run the job int r = job.waitForCompletion(true) ? 0 : 1; // print the number of bad records on the screen System.out.println(job.getCounters().findCounter("UserCounters", "badInputs").getValue()); return r; } public static void main(String[] args) throws Exception { String inputPaths = "hdfs://localhost:9000/training/lab1/inputs/*"; String outputPath = "hdfs://localhost:9000/training/lab1/output"; ToolRunner.run(new MinMaxJob(), new String[] { inputPaths, outputPath }); } } |
-
-
- Now let’s have a look at the mapper, well before digging into codes a small explanation will be better.
-
-
-
- In our case the role of the mapper is to filter the data and prepare data for reducers.
-
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
public class MinMaxMapper extends Mapper<LongWritable,Text,IntWritable,IntWritable>{ @Override protected void map(LongWritable key, Text value,Mapper.Context context) throws IOException, InterruptedException { // We get the line value String sValue = value.toString(); // We try to parse it to a number, if an exception occur than it's a bad record try{ int num = Integer.parseInt(sValue); // write all values with the same key, so they will be processed by the same reducer context.write(new IntWritable(1), new IntWritable(num)); }catch(Exception e ){ context.getCounter("UserCounters","badInputs").increment(1); } } } |
-
-
- Now let’s have a look at the reducer, the KeyInputFormat and ValueInputFormat of the reucer should be equals to the KeyOutputFormat and ValueOutputFormat of the mapper.
-
-
-
- In our case the role of the reducer is to count the minimum and maximum.
-
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
public class MinMaxReducer extends Reducer<IntWritable,IntWritable,NullWritable,Text>{ // this method will be called on each key @Override protected void reduce(IntWritable key, Iterable<IntWritable> values,Reducer.Context context) throws IOException, InterruptedException { // We count the min and max of the values int max = values.iterator().next().get(); int min = max; for(IntWritable value : values){ if(value.get() > max){ max = value.get(); } if(value.get() < min){ min = value.get(); } } // because we would like that the separator between the min and max is only one space // using TextFileOutputFormat the separator will be four spaces // so we set the key as NullWritable so TextFileOutputFormat will write only the value with no space before context.write( NullWritable.get(), new Text(min + " " + max)); } } |
-
-
- Export the jar as a runnable jar and specify MinMaxJob as a main classn then open terminal and run the job by invoking : hadoop jar nameOfTheJar.jar
-
-
-
- for example if you give the jar the name lab1.jar then the command line will be : hadoop jar lab1.jar
-
-
-
- You can have a look on the result by invoking : hdfs dfs -cat /training/lab1/output/part-r-00000
-
-
-
- Now after coding our first lab, let’s have a look and try to optimize it. In the previous job, we have two files and each file will be treated by a mapper (if the size of the file is greater then 128 MB, then the file will be split into parts and each part will be processed by a mapper).
-
-
-
- Each mapper write a pairs of (key, value) that will be transferred via network to reducers. What about limiting the size of data to transfer ? can we for example count the min and max value in each part ?
-
-
-
-
- Yes, we can do it. But not inside the mapper, because the map function will be called on each line and we don’t know if the value is min or max …
That’s how Combiner come into action, the role of combiner is to perform some action to limit the size of the data to transfer (in our case counting min and max).
- Yes, we can do it. But not inside the mapper, because the map function will be called on each line and we don’t know if the value is min or max …
-
-
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
public class Combiner extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable> { @Override protected void reduce(IntWritable key, Iterable<IntWritable> values,Reducer.Context context) throws IOException, InterruptedException { // We count the min and max of the values int max = values.iterator().next().get(); int min = max; for(IntWritable value : values){ if(value.get() > max){ max = value.get(); } if(value.get() < min){ min = value.get(); } } // we write only the min and max to the output context.write( new IntWritable(1), new IntWritable(min)); context.write( new IntWritable(1), new IntWritable(max)); } } |
-
-
-
- And of corse we need to tell the job about the combiner:
-
-
1 |
job.setCombinerClass(Combiner.class); |