
Tutorial 4 : Hadoop Custom Input Format
In each city in Tunisia, we have a temperature sensor that send data to server. In the server each record it’s stored in a file. Unfortunately the structure of the data is not the same in all cities. For example in Sfax each record is stored as follow (year month day Sfax temperature) for example (1950 4 30 sfax 30) and in Sousse each record is stored as follow (sousse temperature 1950 4 30).
Our objective is to calculate the average of both cities for each day.
Challenge:
How can I deal with data that’s not structured in the same way ?
- You need to make sure that hadoop was installed successfully on your machine. Check this link for hadoop installation.
- Then you should start hadoop daemon by invoking this scripts:
start-dfs.sh
start-yarn.sh
- Before copying copy the input files into your locale hadoop file system and create some directories in hdfs .
So download the two input files (they are small files just for testing) : download link
- After that create paths in hdfs by invoking : hdfs dfs -mkdir -P /training/lab4/inputs/
- After that copy them to hdfs by invoking a command like this : hdfs dfs -copyFromLocal <localPathOfFiles> /training/lab4/inputs/
- For example the command line should be like this in case you downloaded the files into Downloads/lab4/inputs/ : hdfs dfs -copyFromLocal ~/Downloads/lab3/inputs/* /training/lab3/inputs/
- Now that everything is already set up, let’s start coding, First you should create a Job class that extends Configured (so you get the configuration from the installation files “core-site.xml etc ….”) and implements Tool (By doing this you can invoke your job from command line via hadoop jar command). By writing this class you will give the job information about the input format, output format, the mapper, the reducer, the key and value output format of mapper and reducer etc …
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
public class TempuratureJob extends Configured implements Tool { @Override public int run(String[] args) throws Exception { if (args.length != 3) { System.err.println("error"); return -1; } // getConf method it's inherited from Configured class, that's why we should extends Configured Configuration conf = getConf(); Job job = Job.getInstance(conf, "tp4"); job.setJarByClass(TempuratureJob.class); // We can deal with multiple inputs in that way, so we relate each path to a specific mapper and specific input format MultipleInputs.addInputPath(job, new Path(args[0]), SfaxInputFormat.class, TempuratureMapper.class); MultipleInputs.addInputPath(job, new Path(args[1]), SousseInputFormat.class, TempuratureMapper.class); //specify the output format job.setOutputFormatClass(TextOutputFormat.class); //specify the output path TextOutputFormat.setOutputPath(job, new Path(args[2])); //specify the reducer job.setReducerClass(TempuratureReducer.class); //specify the output key that will be used as a map output key also job.setOutputKeyClass(CustomDate.class); // specify the output value that will be used as a map output value also job.setOutputValueClass(IntWritable.class); // run the job return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { String sfaxInputPath = "hdfs://localhost:9000/training/lab4/inputs/sfax.txt"; String sousseInputPath = "hdfs://localhost:9000/training/lab4/inputs/sousse.txt"; String outputPath = "hdfs://localhost:9000/training/lab4/output"; int exitCode = ToolRunner.run(new TempuratureJob(), new String[] { sfaxInputPath, sousseInputPath, outputPath }); System.exit(exitCode); } } |
- Now let’s have a look at how to add a custom key and value. To create a custom key you should implements WritableComparable and to create a custom value you should implements Writable, let’s start with the value:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 |
// each value should implements Writable public class CityTempurature implements Writable<CityTempurature>{ //fields Text city; IntWritable temperature; // A constructor with no args should be present, else hadoop will throw an error public CityTempurature() { city = new Text(); temperature = new IntWritable(); } public CityTempurature(String city, String degree) { this.city = new Text(city); this.temperature = new IntWritable(Integer.parseInt(degree)); } // this method will be used when deserialising data @Override public void readFields(DataInput dataInput) throws IOException { city.readFields(dataInput); temperature.readFields(dataInput); } // this method will be used when serialising data @Override public void write(DataOutput dataOutput) throws IOException { city.write(dataOutput); temperature.write(dataOutput); } public Text getCity() { return city; } public void setCity(Text city) { this.city = city; } @Override public int hashCode() { return city.hashCode() + temperature.hashCode(); } @Override public boolean equals(Object obj) { if (obj instanceof CityTempurature) { CityTempurature textIntWritable = (CityTempurature) obj; return this.getCity().equals(textIntWritable.getCity()) && this.getTemperature().equals(textIntWritable.getTemperature()); } return false; } @Override public String toString() { return city.toString() + " " + temperature.toString(); } public IntWritable getTemperature() { return temperature; } public void setTemperature(IntWritable temperature) { this.temperature = temperature; } } |
Than the key should implements WritableComparable because hadoop will use the compareTo method to sort keys in the shuffling step
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 |
public class CustomDate implements WritableComparable<CustomDate>{ IntWritable year; IntWritable month; IntWritable day; public CustomDate() { year = new IntWritable(); month = new IntWritable(); day = new IntWritable(); } public CustomDate(String year, String month, String day) { this.year = new IntWritable(Integer.parseInt(year)); this.month = new IntWritable(Integer.parseInt(month)); this.day = new IntWritable(Integer.parseInt(day)); } public IntWritable getYear() { return year; } public void setYear(IntWritable year) { this.year = year; } public IntWritable getMonth() { return month; } public void setMonth(IntWritable month) { this.month = month; } public IntWritable getDay() { return day; } public void setDay(IntWritable day) { this.day = day; } @Override public int hashCode() { return year.hashCode() + month.hashCode() + day.hashCode(); } @Override public boolean equals(Object obj) { if (obj instanceof CustomDate) { CustomDate dateWritable = (CustomDate) obj; return this.getYear().equals(dateWritable.getYear()) && this.getMonth().equals(dateWritable.getMonth()) && this.getDay().equals(dateWritable.getDay()); } return false; } @Override public String toString() { return day.toString()+"/"+month.toString()+"/"+year.toString(); } @Override public void readFields(DataInput dataInput) throws IOException { day.readFields(dataInput); month.readFields(dataInput); year.readFields(dataInput); } @Override public void write(DataOutput dataOutput) throws IOException { day.write(dataOutput); month.write(dataOutput); year.write(dataOutput); } @Override public int compareTo(CustomDate dateWritable) { int yearCmp = year.compareTo(dateWritable.getYear()); if (yearCmp != 0) { return yearCmp; } int monthCmp = month.compareTo(dateWritable.getMonth()); if (monthCmp != 0) { return monthCmp; } return day.compareTo(dateWritable.getDay()); } } |
Now let’s have a look at how to write a custom input format, you need to extends FileInputFormat<KeyType,ValueType> (the KeyType should implements WritableComparable and the ValueType should implements Writable) and override the createRecordReader method:
1 2 3 4 5 6 7 8 9 10 11 |
public class SfaxInputFormat extends FileInputFormat<CustomDate,CityTemperature> { @Override public RecordReader createRecordReader(InputSplit inputSplit,TaskAttemptContext context) throws IOException, InterruptedException { SfaxRecordReader sfaxRecordReader = new SfaxRecordReader(); sfaxRecordReader.initialize(inputSplit, context); return sfaxRecordReader; } } |
The CustomInputFromat will use a CustomRecordReader that will transfer the file in hdfs to a (key, value) records. The CustomRecordReader intialize method is the first one that will be called afet that a loop
while (getProgress !=1) {
nextKeyValue will be called
than getCurrentKey(), getCurrentValue()
}
finally the close() method is called
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
public class SfaxRecordReader<CustomDate,CityTemperature> extends RecordReader { // It’s a builtin class that split each file line by line LineRecordReader lineRecordReader; CustomDate key; CityTempurature value; @Override public void initialize(InputSplit inputSplit, TaskAttemptContext context) throws IOException, InterruptedException { lineRecordReader = new LineRecordReader(); lineRecordReader.initialize(inputSplit, context); } // It’s the method that will be used to transform the line into key value @Override public boolean nextKeyValue() throws IOException, InterruptedException { if (!lineRecordReader.nextKeyValue()) { return false; } String line = lineRecordReader.getCurrentValue().toString(); String[] keyValue = line.split("\t"); String[] keyFields = keyValue[0].split(" "); valueFields = keyValue[1].split(" "); key = new CustomDate(keyFields[0],keyFields[1],keyFields[2]); value = new CityTempurature(valueFields[0],valueFields[1]); return true; } @Override public CustomDate getCurrentKey() throws IOException, InterruptedException { return key; } @Override public CityTempurature getCurrentValue() throws IOException, InterruptedException { return value; } @Override public float getProgress() throws IOException, InterruptedException { return lineRecordReader.getProgress(); } @Override public void close() throws IOException { lineRecordReader.close(); } } |
In the same way you can check the source code of SousseInputFormat
1 2 3 4 5 6 7 8 9 10 |
public class SousseInputFormat extends FileInputFormat<CustomDate,CityTemperature> { @Override public RecordReader createRecordReader(InputSplit inputSplit, TaskAttemptContext context) throws IOException, InterruptedException { SousseRecordReader sousseRecordReader = new SousseRecordReader(); sousseRecordReader.initialize(inputSplit, context); return sousseRecordReader; } } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
public class SousseRecordReader extends RecordReader<CustomDate,CityTemperature> { LineRecordReader lineRecordReader; CustomDate key; CityTempurature value; @Override public void initialize(InputSplit inputSplit, TaskAttemptContext context) throws IOException, InterruptedException { lineRecordReader = new LineRecordReader(); lineRecordReader.initialize(inputSplit, context); } @Override public boolean nextKeyValue() throws IOException, InterruptedException { if (!lineRecordReader.nextKeyValue()) { return false; } String line = lineRecordReader.getCurrentValue().toString(); String[] keyValue = line.split("\t"); String[] keyFields = keyValue[0].split(" "); String[] valueFields = keyValue[1].split(" "); key = new CustomDate(valueFields[0],valueFields[1],valueFields[2]); value = new CityTempurature(keyFields[0],keyFields[1]); return true; } @Override public CustomDate getCurrentKey() throws IOException, InterruptedException { return key; } @Override public CityTempurature getCurrentValue() throws IOException, InterruptedException { return value; } @Override public float getProgress() throws IOException, InterruptedException { return lineRecordReader.getProgress(); } @Override public void close() throws IOException { lineRecordReader.close(); } } |
Now the TemperatureMapper will not split the data, it will just get the key and value without parsing data:
1 2 3 4 5 6 7 8 |
public class TempuratureMapper extends Mapper<CustomDate,CityTemperature ,CustomDate,IntWritable>{ @Override protected void map(CustomDate key, CityTempurature value, Mapper.Context context) { context.write(key,value.getTemperature()); } } |
And in the reducer, we will juste calculate the mean of the two temperature :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
public class TempuratureReducer extends Reducer<CustomDate,IntWritable,CustomDate,IntWritable>{ @Override protected void reduce(CustomDate key, Iterable<IntWritable> values, Reducer.Context context) throws IOException, InterruptedException { int sum = 0; int nbr =0 ; for (IntWritable value : values) { nbr++; sum=sum+value.get(); } context.write(key, new IntWritable(sum/nbr)); } } |
- Now after coding, export the jar as a runnable jar and specify MinMaxJob as a main class, then open terminal and run the job by invoking : hadoop jar <nameOfTheJar.jar>. For example if you give the jar the name lab1.jar than the command line will be : hadoop jar lab4.jar
- Have a look on the result by invoking : hdfs dfs -cat /training/lab4/output/part-r-00000