
Tutorial 1: Utiliser Combiner
Au début, on commence par un simple travail de Hadoop.
On suppose qu’on a de gros fichiers où chaque ligne contient le degré de température et qu’on veut obtenir le maximum et le minimum.
Je vous entends dire pourquoi map réduire je peux le faire dans un programme Java séquentiel!
ok alors combien de temps faut-il pour obtenir le résultat d’un fichier de plus de 4 Go par exemple !!
On commence par l’installation de Hadoop
Ensuite, vous devriez démarrer le démon Hadoop en appelant ces scripts:
- start-dfs.sh
- start-yarn.sh
Vous devez créer des répertoires dans hdfs et copier les fichiers d’entrée dans votre système de fichiers Hadoop local.
Alors téléchargez ces deux fichiers d’entrée (ce sont de petits fichiers juste pour les tests)
Après cela, créez des chemins dans hdfs en appelant: hdfs dfs -mkdir -p / training / lab1 / input /
Ensuite, copiez-les sur hdfs en appelant cette commande : hdfs dfs -copyFromLocal / training / lab1 / input /
Par exemple, si vous avez téléchargé les fichiers dans Téléchargements / lab1 / input /, la ligne de commande doit alors être: hdfs dfs -copyFromLocal ~ / Téléchargements / lab1 / input / * / training / lab1 / input /
Maintenant que tout est déjà configuré, commencez le codage.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
public class MinMaxJob extends Configured implements Tool { @Override public int run(String[] args) throws Exception { if (args.length != 2) { System.err.println("error"); return -1; } // getConf method it's inhertired from Configured class, that's why we should extends Configured Configuration conf = getConf(); Job job = Job.getInstance(conf, "TP1"); // Give the job the name of the main class job.setJarByClass(MinMaxJob.class); // Specify the input format, which will have impact on the key and the value // type of the mapper inputs. job.setInputFormatClass(TextInputFormat.class); // By specifying TextOutputFormat as an output the format of your file will be // Key.toString()than 4 spaces (\t) than value.toString(), for example 12 14 job.setOutputFormatClass(TextOutputFormat.class); // specify the input paths in the hdfs TextInputFormat.setInputPaths(job, new Path(args[0])); // because if we run a job and give it an output that is already exist, the job will fail TextOutputFormat.setOutputPath(job, new Path(args[1])); // Give the job the name of the mapper class job.setMapperClass(MinMaxMapper.class); // Give the job the name of the reducer class job.setReducerClass(MinMaxReducer.class); // set the key output type of the mapper job.setMapOutputKeyClass(IntWritable.class); // set the value output type of the mapper job.setMapOutputValueClass(IntWritable.class); // set the key output type of the reducer job.setOutputKeyClass(NullWritable.class); // set the value output type of the reducer job.setOutputValueClass(Text.class); //run the job int r = job.waitForCompletion(true) ? 0 : 1; // print the number of bad records on the screen System.out.println(job.getCounters().findCounter("UserCounters", "badInputs").getValue()); return r; } public static void main(String[] args) throws Exception { String inputPaths = "hdfs://localhost:9000/training/lab1/inputs/*"; String outputPath = "hdfs://localhost:9000/training/lab1/output"; ToolRunner.run(new MinMaxJob(), new String[] { inputPaths, outputPath }); } } |
Vous voyez maintenant le mappeur, bien avant de creuser dans les codes, une petite explication sera meilleure.
Dans notre cas, le rôle du mappeur est de filtrer les données et de préparer les données pour les réducteurs.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
public class MinMaxMapper extends Mapper<LongWritable,Text,IntWritable,IntWritable>{ @Override protected void map(LongWritable key, Text value,Mapper.Context context) throws IOException, InterruptedException { // We get the line value String sValue = value.toString(); // We try to parse it to a number, if an exception occur than it's a bad record try{ int num = Integer.parseInt(sValue); // write all values with the same key, so they will be processed by the same reducer context.write(new IntWritable(1), new IntWritable(num)); }catch(Exception e ){ context.getCounter("UserCounters","badInputs").increment(1); } } } |
Exportez le fichier jar en tant que fichier exécutable et spécifiez MinMaxJob en tant que classe principale, puis ouvrez le terminal et exécutez le travail en appelant: hadoop jar nameOfTheJar.jar
Par exemple, si vous attribuez à jar le nom lab1.jar, la ligne de commande sera : hadoop jar lab1.jar
Vous pouvez consulter le résultat en appelant: hdfs dfs -cat / training / lab1 / output / part-r-00000
Après avoir codé ce premier laboratoire, examinez-le et essayez de l’optimiser.
Dans le travail précédent, on a deux fichiers et chaque fichier sera traité par un mappeur
NB: si la taille du fichier est supérieure à 128 Mo, le fichier sera divisé en plusieurs parties et chaque partie sera traitée par un mappeur.
Chaque mappeur écrit une paire de (clé, valeur) (key, value) qui sera transférée via le réseau aux réducteurs.
Qu’en est-il de limiter la taille des données à transférer? Peut-on par exemple compter les valeurs min et max dans chaque partie?
Oui on peut le faire. Mais pas à l’intérieur du mappeur, car la fonction map sera appelée sur chaque ligne et on ne sait pas si la valeur est min ou max…
C’est ainsi que Combiner entre en action. Le rôle du combineur est de prendre certaines mesures pour limiter la taille des données à transférer (dans notre cas, compter min et max).
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
public class Combiner extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable> { @Override protected void reduce(IntWritable key, Iterable<IntWritable> values,Reducer.Context context) throws IOException, InterruptedException { // We count the min and max of the values int max = values.iterator().next().get(); int min = max; for(IntWritable value : values){ if(value.get() > max){ max = value.get(); } if(value.get() < min){ min = value.get(); } } // we write only the min and max to the output context.write( new IntWritable(1), new IntWritable(min)); context.write( new IntWritable(1), new IntWritable(max)); } } |
Et bien sûr, nous devons parler du combiner
1 |
job.setCombinerClass(Combiner.class); |