
Tutoriel 4 : Format Input personnalisée Hadoop
Dans chaque ville tunisienne, on a un capteur de température qui envoie les données au serveur.
Sur le serveur, chaque enregistrement est stocké dans un fichier.
Malheureusement, la structure des données n’est pas la même dans toutes les villes.
Par exemple, à Sfax, chaque enregistrement est stocké comme suit (année, mois, jour, température de Sfax), par exemple (1950 4 30 sfax 30), par contre à Sousse, chaque enregistrement est stocké comme suit (température de Sousse, 1950 4 30).
Notre objectif est de calculer la moyenne des deux villes pour chaque jour.
Défi: Comment puis-je traiter des données qui ne sont pas structurées de la même manière?
Tout d’abord, vous devez vous assurer que vous avez correctement installé Hadoop sur votre machine.
Ensuite, vous devriez démarrer le démon Hadoop en appelant ces scripts:
- start-dfs.sh
- start-yarn.sh
Une dernière étape avant de commencer, vous devez copier les fichiers d’entrée dans votre système de fichiers Hadoop local et créer des répertoires dans hdfs avant de les copier.
Alors téléchargez ces deux fichiers d’entrée (ce sont de petits fichiers juste pour les tests)
Après cela, créez des chemins dans hdfs en appelant: hdfs dfs -mkdir -P / formation / lab4 / inputs /
Après cela, copiez les fichiers sur hdfs en appelant cette commande comme suit: hdfs dfs -copyFromLocal <localPathOfFiles> / training / lab4 / inputs /
Par exemple, si vous avez téléchargé les fichiers dans Téléchargements / lab4 / inputs, la ligne de commande doit être la suivante: hdfs dfs -copyFromLocal ~ / Téléchargements / lab4 / inputs / * / training / lab4 / inputs /
Maintenant que tout est déjà configuré, commençons à coder.
Commencez par créer une classe de travail qui s’étend à Configured (pour obtenir la configuration à partir des fichiers d’installation «core-site.xml etc….»)
Et implémente Tool (en procédant de la sorte. vous pouvez appeler votre travail depuis la ligne de commande via la commande hadoop jar).
En écrivant cette classe, vous donnerez à la tâche des informations sur le format d’entrée « input », le format de sortie « output », le mappeur, le réducteur, le clé et la valeur du format de sortie « output », du mappeur et du réducteur, etc..
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
public class TempuratureJob extends Configured implements Tool { @Override public int run(String[] args) throws Exception { if (args.length != 3) { System.err.println("error"); return -1; } // getConf method it's inherited from Configured class, that's why we should extends Configured Configuration conf = getConf(); Job job = Job.getInstance(conf, "tp4"); job.setJarByClass(TempuratureJob.class); // We can deal with multiple inputs in that way, so we relate each path to a specific mapper and specific input format MultipleInputs.addInputPath(job, new Path(args[0]), SfaxInputFormat.class, TempuratureMapper.class); MultipleInputs.addInputPath(job, new Path(args[1]), SousseInputFormat.class, TempuratureMapper.class); //specify the output format job.setOutputFormatClass(TextOutputFormat.class); //specify the output path TextOutputFormat.setOutputPath(job, new Path(args[2])); //specify the reducer job.setReducerClass(TempuratureReducer.class); //specify the output key that will be used as a map output key also job.setOutputKeyClass(CustomDate.class); // specify the output value that will be used as a map output value also job.setOutputValueClass(IntWritable.class); // run the job return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { String sfaxInputPath = "hdfs://localhost:9000/training/lab4/inputs/sfax.txt"; String sousseInputPath = "hdfs://localhost:9000/training/lab4/inputs/sousse.txt"; String outputPath = "hdfs://localhost:9000/training/lab4/output"; int exitCode = ToolRunner.run(new TempuratureJob(), new String[] { sfaxInputPath, sousseInputPath, outputPath }); System.exit(exitCode); } } |
Voyons maintenant comment ajouter une clé et une valeur personnalisées.
Pour créer une clé personnalisée vous devez implémenter WritableComparable et pour créer une valeur personnalisée, vous devez implémenter Writable.
Commençons avec la valeur
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 |
// each value should implements Writable public class CityTempurature implements Writable<CityTempurature>{ //fields Text city; IntWritable temperature; // A constructor with no args should be present, else hadoop will throw an error public CityTempurature() { city = new Text(); temperature = new IntWritable(); } public CityTempurature(String city, String degree) { this.city = new Text(city); this.temperature = new IntWritable(Integer.parseInt(degree)); } // this method will be used when deserialising data @Override public void readFields(DataInput dataInput) throws IOException { city.readFields(dataInput); temperature.readFields(dataInput); } // this method will be used when serialising data @Override public void write(DataOutput dataOutput) throws IOException { city.write(dataOutput); temperature.write(dataOutput); } public Text getCity() { return city; } public void setCity(Text city) { this.city = city; } @Override public int hashCode() { return city.hashCode() + temperature.hashCode(); } @Override public boolean equals(Object obj) { if (obj instanceof CityTempurature) { CityTempurature textIntWritable = (CityTempurature) obj; return this.getCity().equals(textIntWritable.getCity()) && this.getTemperature().equals(textIntWritable.getTemperature()); } return false; } @Override public String toString() { return city.toString() + " " + temperature.toString(); } public IntWritable getTemperature() { return temperature; } public void setTemperature(IntWritable temperature) { this.temperature = temperature; } } |
Ensuite, la clé doit implémenter WritableComparable car Hadoop utilisera la méthode compareTo pour trier les clés lors de l’étape de réorganisation.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 |
public class CustomDate implements WritableComparable<CustomDate>{ IntWritable year; IntWritable month; IntWritable day; public CustomDate() { year = new IntWritable(); month = new IntWritable(); day = new IntWritable(); } public CustomDate(String year, String month, String day) { this.year = new IntWritable(Integer.parseInt(year)); this.month = new IntWritable(Integer.parseInt(month)); this.day = new IntWritable(Integer.parseInt(day)); } public IntWritable getYear() { return year; } public void setYear(IntWritable year) { this.year = year; } public IntWritable getMonth() { return month; } public void setMonth(IntWritable month) { this.month = month; } public IntWritable getDay() { return day; } public void setDay(IntWritable day) { this.day = day; } @Override public int hashCode() { return year.hashCode() + month.hashCode() + day.hashCode(); } @Override public boolean equals(Object obj) { if (obj instanceof CustomDate) { CustomDate dateWritable = (CustomDate) obj; return this.getYear().equals(dateWritable.getYear()) && this.getMonth().equals(dateWritable.getMonth()) && this.getDay().equals(dateWritable.getDay()); } return false; } @Override public String toString() { return day.toString()+"/"+month.toString()+"/"+year.toString(); } @Override public void readFields(DataInput dataInput) throws IOException { day.readFields(dataInput); month.readFields(dataInput); year.readFields(dataInput); } @Override public void write(DataOutput dataOutput) throws IOException { day.write(dataOutput); month.write(dataOutput); year.write(dataOutput); } @Override public int compareTo(CustomDate dateWritable) { int yearCmp = year.compareTo(dateWritable.getYear()); if (yearCmp != 0) { return yearCmp; } int monthCmp = month.compareTo(dateWritable.getMonth()); if (monthCmp != 0) { return monthCmp; } return day.compareTo(dateWritable.getDay()); } } |
Voyons maintenant comment écrire un format d’entré » input » personnalisé, vous devez s’étend FileInputFormat< KeyType, ValueType> ( le KeyType doit implémenter WritableComparable et la ValueType doit implémenter Writable) et redéfinir la méthode createRecordReader
1 2 3 4 5 6 7 8 9 10 11 |
public class SfaxInputFormat extends FileInputFormat<CustomDate,CityTemperature> { @Override public RecordReader createRecordReader(InputSplit inputSplit,TaskAttemptContext context) throws IOException, InterruptedException { SfaxRecordReader sfaxRecordReader = new SfaxRecordReader(); sfaxRecordReader.initialize(inputSplit, context); return sfaxRecordReader; } } |
CustomInputFromat utilisera un CustomRecordReader qui transférera le fichier au format hdfs vers un enregistrement (key, value).
La méthode intialize CustomRecordReader est la première qui sera appelée par une boucle
while (getProgress !=1)
{
nextKeyValue will be called
than getCurrentKey(), getCurrentValue()
}
Finalement la méthode close() sera appelée
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
public class SfaxRecordReader<CustomDate,CityTemperature> extends RecordReader { // It’s a builtin class that split each file line by line LineRecordReader lineRecordReader; CustomDate key; CityTempurature value; @Override public void initialize(InputSplit inputSplit, TaskAttemptContext context) throws IOException, InterruptedException { lineRecordReader = new LineRecordReader(); lineRecordReader.initialize(inputSplit, context); } // It’s the method that will be used to transform the line into key value @Override public boolean nextKeyValue() throws IOException, InterruptedException { if (!lineRecordReader.nextKeyValue()) { return false; } String line = lineRecordReader.getCurrentValue().toString(); String[] keyValue = line.split("\t"); String[] keyFields = keyValue[0].split(" "); valueFields = keyValue[1].split(" "); key = new CustomDate(keyFields[0],keyFields[1],keyFields[2]); value = new CityTempurature(valueFields[0],valueFields[1]); return true; } @Override public CustomDate getCurrentKey() throws IOException, InterruptedException { return key; } @Override public CityTempurature getCurrentValue() throws IOException, InterruptedException { return value; } @Override public float getProgress() throws IOException, InterruptedException { return lineRecordReader.getProgress(); } @Override public void close() throws IOException { lineRecordReader.close(); } } |
De la même manière, vous pouvez vérifier le code source de SousseInputFormat
1 2 3 4 5 6 7 8 9 10 |
public class SousseInputFormat extends FileInputFormat<CustomDate,CityTemperature> { @Override public RecordReader createRecordReader(InputSplit inputSplit, TaskAttemptContext context) throws IOException, InterruptedException { SousseRecordReader sousseRecordReader = new SousseRecordReader(); sousseRecordReader.initialize(inputSplit, context); return sousseRecordReader; } } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
public class SousseRecordReader extends RecordReader<CustomDate,CityTemperature> { LineRecordReader lineRecordReader; CustomDate key; CityTempurature value; @Override public void initialize(InputSplit inputSplit, TaskAttemptContext context) throws IOException, InterruptedException { lineRecordReader = new LineRecordReader(); lineRecordReader.initialize(inputSplit, context); } @Override public boolean nextKeyValue() throws IOException, InterruptedException { if (!lineRecordReader.nextKeyValue()) { return false; } String line = lineRecordReader.getCurrentValue().toString(); String[] keyValue = line.split("\t"); String[] keyFields = keyValue[0].split(" "); String[] valueFields = keyValue[1].split(" "); key = new CustomDate(valueFields[0],valueFields[1],valueFields[2]); value = new CityTempurature(keyFields[0],keyFields[1]); return true; } @Override public CustomDate getCurrentKey() throws IOException, InterruptedException { return key; } @Override public CityTempurature getCurrentValue() throws IOException, InterruptedException { return value; } @Override public float getProgress() throws IOException, InterruptedException { return lineRecordReader.getProgress(); } @Override public void close() throws IOException { lineRecordReader.close(); } } |
Maintenant, le TemperatureMapper ne divisera pas les données, il obtiendra simplement la clé et la valeur sans analyser les données
1 2 3 4 5 6 7 8 |
public class TempuratureMapper extends Mapper<CustomDate,CityTemperature ,CustomDate,IntWritable>{ @Override protected void map(CustomDate key, CityTempurature value, Mapper.Context context) { context.write(key,value.getTemperature()); } } |
Et dans le réducteur, nous allons juste calculer la moyenne des deux températures
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
public class TempuratureReducer extends Reducer<CustomDate,IntWritable,CustomDate,IntWritable>{ @Override protected void reduce(CustomDate key, Iterable<IntWritable> values, Reducer.Context context) throws IOException, InterruptedException { int sum = 0; int nbr =0 ; for (IntWritable value : values) { nbr++; sum=sum+value.get(); } context.write(key, new IntWritable(sum/nbr)); } } |
Maintenant, après le codage, exportez le fichier jar en tant que fichier jar exécutable et spécifiez MinMaxJob en tant que classe principale, puis ouvrez le terminal et exécutez le travail en appelant: hadoop jar <nomOfTheJar.jar>.
Par exemple, si vous donnez au fichier jar le nom lab4.jar, la ligne de commande sera: hadoop jar lab4.jar
Regardez le résultat en appelant: hdfs dfs -cat / training / lab4 / output / part-r-00000