Untitled
unknown
plain_text
13 days ago
3.8 kB
1
Indexable
MatrixMapper.java package hadoop.matrix; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class MatrixMapper extends Mapper<Object, Text, Text, Text> { @Override public void map(Object key, Text value, Context context) throws IOException, InterruptedException { // Input format: "MatrixName i j value" // Example: "A 0 1 5" or "B 1 0 6" String[] parts = value.toString().split("\\s+"); String matrixName = parts[0]; int i = Integer.parseInt(parts[1]); int j = Integer.parseInt(parts[2]); String cellValue = parts[3]; if (matrixName.equals("A")) { for (int k = 0; k < context.getConfiguration().getInt("matrix.B.columns", 0); k++) { context.write(new Text(i + "," + k), new Text("A," + j + "," + cellValue)); } } else if (matrixName.equals("B")) { for (int k = 0; k < context.getConfiguration().getInt("matrix.A.rows", 0); k++) { context.write(new Text(k + "," + j), new Text("B," + i + "," + cellValue)); } } } } MatrixReducer.java package hadoop.matrix; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; import java.util.HashMap; public class MatrixReducer extends Reducer<Text, Text, Text, Text> { @Override public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { HashMap<Integer, Float> aMap = new HashMap<>(HashMap<Integer, Float> bMap = new HashMap<>(); for (Text value : values) { String[] parts = value.toString().split(","); if (parts[0].equals("A")) { aMap.put(Integer.parseInt(parts[1]), Float.parseFloat(parts[2])); } else if (parts[0].equals("B")) { bMap.put(Integer.parseInt(parts[1]), Float.parseFloat(parts[2])); } } float sum = 0.0f; for (int k : aMap.keySet()) { if (bMap.containsKey(k)) { sum += aMap.get(k) * bMap.get(k); } } context.write(key, new Text(String.valueOf(sum))); } } MatrixMultiplicationDriver.java package hadoop.matrix; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class MatrixMultiplicationDriver { public static void main(String[] args) throws Exception { if (args.length != 4) { System.out.println("Usage: MatrixMultiplicationDriver <input path> <output path> <rows A> <columns B>"); System.exit(-1); } Configuration conf = new Configuration(); conf.setInt("matrix.A.rows", Integer.parseInt(args[2])); conf.setInt("matrix.B.columns", Integer.parseInt(args[3])); Job job = Job.getInstance(conf); job.setJarByClass(MatrixMultiplicationDriver.class); job.setJobName("Matrix Multiplication"); job.setMapperClass(MatrixMapper.class); job.setReducerClass(MatrixReducer.class); job.setOutputKeyClass(Text.class);); job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } } Input and Execution Input Format: MatrixName RowIndex ColumnIndex Value A 0 0 1 A 0 1 2 B 0 0 3 B 1 0 4 Execution: Compile the Java files: javac -classpath $(hadoop classpath) -d . MatrixMapper.java MatrixReducer.java MatrixMultiplicationDriver.java jar -cvf matrixmultiplication.jar *.class Create input directory in HDFS and upload input file: dfs dfs -mkdir /matrixinput hdfs dfs -put input.txt /matrixinput Run the MapReduce job: hadoop jar matrixmultiplication.jar hadoop.matrix.MatrixMultiplicationDriver /matrixinput /matrixoutput 2 2 View the results: 1hdfs dfs -cat /matrixoutput/part-r-00000
Editor is loading...
Leave a Comment