
Tutoriel 0 : Hadoop Map Reduce Partitionner
Au début, on commence par un simple travail de Hadoop.
Supposons qu’on a un gros fichier contenant de nombreux mots séparés par un espace et qu’on souhaite connaître le numéro d’apparition de chaque mot.
On a également besoin que les mots de [A-L] soient dans la première partie et les autres dans la deuxième partie.
Commençons par l’installation Hadoop en suivant ce lien.
Ensuite, vous devriez démarrer le démon Hadoop en appelant ces scripts:
1 2 |
start-dfs.sh start-yarn.sh |
une dernière étape avant de commencer, vous devez copier les fichiers d’entrée dans votre système de fichiers Hadoop local et créer des répertoires dans hdfs avant de les copier.
alors téléchargez les deux fichiers d’entrée (un petit fichier à tester): lien de téléchargement
Après cela, créez des chemins dans hdfs en appelant:
1 |
hdfs dfs -mkdir -p /training/lab0/inputs/ |
Ensuite, copiez-les sur hdfs en appelant une commande comme celle-ci:
1 |
hdfs dfs -copyFromLocal ... /training/lab0/inputs/ |
Par exemple, si vous avez téléchargé les fichiers dans Téléchargements / lab0 / input /, la ligne de commande doit alors être:
1 |
hdfs dfs -copyFromLocal ~/Downloads/lab0/inputs/* /training/lab0/inputs/ |
Vous devez d’abord créer une classe de travail qui étend la classe configurée et implémente l’interface de l’outil.
En écrivant cette classe, vous donnerez au travail toutes les informations sur le format d’entrée, le format de sortie, le mappeur, le réducteur, le format de sortie clé et valeur du mappeur et du réducteur, etc…
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 |
public class WordCountJob extends Configured implements Tool { @Override public int run(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "WordCountJob"); // Give the job the name of the main class job.setJarByClass(WordCountJob.class); // Specify the input format, which will have impact on the key and the value // type of the mapper inputs. job.setInputFormatClass(TextInputFormat.class); // By specifying TextOutputFormat as an output the format of your file will be // Key.toString()than 4 spaces (\t) than value.toString(), for example 12 14 job.setOutputFormatClass(TextOutputFormat.class); // because if we run a job and give it an output that is already exist, the job will fail TextOutputFormat.setOutputPath(job, new Path(args[1])); // specify the input paths in the hdfs TextInputFormat.setInputPaths(job, new Path(args[0])); job.setOutputFormatClass(TextOutputFormat.class); TextOutputFormat.setOutputPath(job, new Path(args[1])); // Give the job the name of the mapper class job.setMapperClass(WordCountMapper.class); // Give the job the name of the reducer class job.setReducerClass(WordCountReducer.class); // Give the job the name of the partitioner class job.setPartitionerClass(WordCountPartitioner.class); // Give the job the number of reducers // The first one will treat the words in [A,L] // The second one will treat others job.setNumReduceTasks(2); // set the key output type job.setOutputKeyClass(Text.class); // set the value output type job.setOutputValueClass(IntWritable.class); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new WordCountJob(), new String [] { "hdfs://localhost:9000/training/lab0/inputs*", "hdfs://localhost:9000/training/lab0/output/" }); System.exit(exitCode); } } |
Voyons comment fonctionne le mappeur.
Dans notre cas, le rôle du mappeur est d’écrire 1 en tant que valeur pour chaque mot (en tant que clé).
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> { private final static IntWritable ONE = new IntWritable(1); @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException { // We split by white space each line we read on the value String[] words = value.toString().split(" "); for (int i = 0; i < words.length; i++) { // write each word on the key with a 1 on the value context.write(new Text(words[i].toLowerCase()), ONE); } } } |
Voyons maintenant le partitioner, il devrait étendre le partitioner <MapperOutPutKeyType, MapperOutPutValueType>.
Dans notre cas, il faut transmettre chaque clé du mappeur à un réducteur spécifique.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
public class WordCountPartitioner extends Partitioner<Text, IntWritable<{ @Override public int getPartition(Text key, IntWritable value, int numPartitions) { // Get the first char of the word char decider = key.toString().toUpperCase().charAt(0); char A = 'A'; char L = 'L'; // if first char in [A,L] then go to the first reducer if((A<=decider) && (decider<=L)) { return 0; // else go to the second reducer } else { return 1; } } } |
Examinons maintenant le réducteur. KeyInputFormat et FileInputFormat du filtre doivent être égaux à KeyOutputFormat et FileOutputFormat du mappeur.
Dans notre cas, le rôle du réducteur consiste à additionner la valeur de chaque mot (clé).
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> { @Override protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException { int sum = 0; // We sum the values for (IntWritable value : values) { sum = sum + value.get(); } //We write the word followed by the sum context.write(key, new IntWritable(sum)); } } |
Exportez le fichier jar en tant que fichier jar exécutable et spécifiez WordCountJob en tant que classe principale, puis ouvrez le terminal et exécutez le travail en appelant:
1 |
hadoop jar nameOfTheJar.jar |
Par exemple, si vous attribuez le nom lab0.jar au fichier jar, la ligne de commande est la suivante:
1 |
hadoop jar lab0.jar |
Vous pouvez consulter le résultat en appelant:
1 |
hdfs dfs -ls /training/lab0/output |