
Tutoriel 2 : Variable globale Hadoop Map Reduce
Objectif du tutoriel:
Écrivez un programme MapReduce qui recherche les occurrences d’une chaîne donnée dans un fichier volumineux.
Vous pouvez probablement penser à utiliser une ligne de commande grep et c’est tout.
Mais que se passe-t-il si la taille du fichier est trop grande, cela prendra trop de temps.
Tout d’abord, vous devez vous assurer que vous avez correctement installé Hadoop sur votre machine.
Ensuite, vous devriez démarrer le démon Hadoop en appelant ces scripts:
- start-dfs.sh
- start-yarn.sh
On termine par cette dernière étape avant de commencer, vous devez copier les fichiers d’entrée dans votre système de fichiers Hadoop local et créer des répertoires dans hdfs avant de les copier.
Alors téléchargez ces deux fichiers d’entrée (ce sont de petits fichiers juste pour les tests)
Après cela, créez des chemins dans hdfs en appelant: hdfs dfs -mkdir -p / training / lab2 / input /
Ensuite, copiez-les sur hdfs en appelant cette commande : hdfs dfs -copyFromLocal / training / lab2 / input /
Par exemple, si vous avez téléchargé les fichiers dans Téléchargements / lab2 / input /, la ligne de commande doit alors être: hdfs dfs -copyFromLocal ~ / Téléchargements / lab2 / input / * / training / lab1 / input /
Maintenant que tout est déjà configuré, commencez le codage.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
public class SearchJob extends Configured implements Tool { @Override public int run(String[] args) throws Exception { if (args.length != 3) { System.err.println("error"); return -1; } // getConf method it's inherited from Configured class, that's why we should extends Configured Configuration conf = getConf(); // we need to set the search word into configuration, so we can get it in the mapper conf.set("searchingWord", args[2]); Job job = Job.getInstance(conf, "TP2"); //Give the job the name of the main class job.setJarByClass(SearchJob.class); // Specify the input format job.setInputFormatClass(TextInputFormat.class); // Specify the output format job.setOutputFormatClass(TextOutputFormat.class); // Specify the output paths in the hdfs TextOutputFormat.setOutputPath(job, new Path(args[1])); // specify the input paths in the hdfs TextInputFormat.setInputPaths(job, new Path(args[0])); // Give the job the name of the mapper class job.setMapperClass(SearchMapper.class); // Give the job the name of the reducer class job.setReducerClass(SearchReducer.class); // Give the job the name of the combiner class job.setCombinerClass(SearchCombiner.class); // set the key output type of the reducer job.setOutputKeyClass(Text.class); // set the value output type of the reducer job.setOutputValueClass(IntWritable.class); // because the mapper output key and value are same to the reducer output key and value there is no need to add // job.setMapOutputKeyClass(Text.class); // job.setMapOutputValueClass(IntWritable.class); // run the job int r = job.waitForCompletion(true) ? 0 : 1; System.out.println(job.getCounters().findCounter("UserCounters","numberOfLines").getValue()); return r; } public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new SearchJob(), new String[] { "hdfs://localhost:9000/training/lab2/inputs","hdfs://localhost:9000/training/lab2/output","the" }); System.exit(exitCode); } } |
On voie maintenant le mappeur. Le rôle du mappeur est de rechercher le mot.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
// the setup method will be executed first // than on each line the map function will be executed public class SearchMapper extends Mapper<LongWritable,Text,Text,IntWritable>{ // to store the searching word private String searchString; @Override protected void setup(Mapper.Context context) throws IOException, InterruptedException { // get the searchingWord from configuration searchString = context.getConfiguration().get("searchingWord"); } @Override protected void map(LongWritable key, Text value, Mapper.Context context) { // split line into words separated by a " " String [] words = value.toString().split(" "); // count the number of occurrence int counter = 0; for (int i=0; i< words.length;i++) { if(searchString.equals(words[i])){ counter++; } } // write the word as a key and the number of occurrence as a value context.write(new Text(searchString), new IntWritable(counter)); } } |
Si le fichier était trop volumineux, il sera divisé en plusieurs blocs et le nombre de mappeurs sera égal au nombre de blocs.
Donc, pour minimiser le transfert de données, on peut écrire un combinateur
1 2 3 4 5 6 7 8 9 10 11 |
public class SearchCombiner extends Reducer<Text, IntWritable, Text, IntWritable< { @Override protected void reduce(Text key, Iterable<IntWritable> values, Reducer.Context context) throws IOException, InterruptedException { int counter = 0; for (IntWritable value : values) { counter += value.get(); } context.write(key , new IntWritable(counter)); } } |
On voie maintenant le réducteur, son rôle sera de faire la somme des valeurs, c’est ça:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
public class SearchReducer extends Reducer<Text, IntWritable, Text, IntWritable< { @Override protected void reduce(Text key, Iterable<IntWritable> values, Reducer.Context context) throws IOException, InterruptedException { int counter = 0; for (IntWritable value : values) { counter += value.get(); } context.write(key , new IntWritable(counter)); } } |
Remarque: au lieu d’écrire notre propre réducteur, on peut utiliser IntSumReducer <Key>.
1 2 3 |
java.lang.Object org.apache.hadoop.mapreduce.Reducer org.apache.hadoop.mapreduce.lib.reduce.IntSumReducer |
et utiliser le texte comme clé
Remarque: également pour minimiser le transfert de données, dans le cas d’une ligne où le mot recherché n’apparaîtra pas, notre mappeur écrira («searchWord», 0), il serait donc préférable de ne rien écrire dans ce cas, on doit donc ajouter
1 2 3 |
if (counter!=0) { context.write(key , new IntWritable(counter)); } |
Maintenant, après le codage, exportez le fichier jar en tant que fichier jar exécutable et spécifiez MinMaxJob en tant que classe principale, puis ouvrez le terminal et exécutez le travail en appelant: hadoop jar nameOfTheJar.jar.
Par exemple, si vous attribuez le nom lab2.jar au jar, la ligne de commande sera: hadoop jar lab2.jar et examinez le résultat en appelant: hdfs dfs -cat / training / lab2 / output / part-r-00000.