Untitled

 avatar
unknown
plain_text
13 days ago
3.8 kB
1
Indexable
MatrixMapper.java

package hadoop.matrix;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class MatrixMapper extends Mapper<Object, Text, Text, Text> {
@Override
public void map(Object key, Text value, Context context) throws IOException, 
InterruptedException {
// Input format: "MatrixName i j value"
// Example: "A 0 1 5" or "B 1 0 6"
String[] parts = value.toString().split("\\s+");
String matrixName = parts[0];
int i = Integer.parseInt(parts[1]);
int j = Integer.parseInt(parts[2]);
String cellValue = parts[3];
if (matrixName.equals("A")) {
for (int k = 0; k < context.getConfiguration().getInt("matrix.B.columns", 0); k++) {
context.write(new Text(i + "," + k), new Text("A," + j + "," + cellValue));
}
} else if (matrixName.equals("B")) {
for (int k = 0; k < context.getConfiguration().getInt("matrix.A.rows", 0); k++) {
context.write(new Text(k + "," + j), new Text("B," + i + "," + cellValue));
}
}
}
}
MatrixReducer.java
package hadoop.matrix;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.util.HashMap;
public class MatrixReducer extends Reducer<Text, Text, Text, Text> {
 @Override
 public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, 
InterruptedException {
 HashMap<Integer, Float> aMap = new HashMap<>(HashMap<Integer, Float> bMap = new HashMap<>();
 for (Text value : values) {
 String[] parts = value.toString().split(",");
 if (parts[0].equals("A")) {
 aMap.put(Integer.parseInt(parts[1]), Float.parseFloat(parts[2]));
 } else if (parts[0].equals("B")) {
 bMap.put(Integer.parseInt(parts[1]), Float.parseFloat(parts[2]));
 }
 }
 float sum = 0.0f;
 for (int k : aMap.keySet()) {
 if (bMap.containsKey(k)) {
 sum += aMap.get(k) * bMap.get(k);
 }
 }
 context.write(key, new Text(String.valueOf(sum)));
 }
}
MatrixMultiplicationDriver.java
package hadoop.matrix;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MatrixMultiplicationDriver {
 public static void main(String[] args) throws Exception {
 if (args.length != 4) {
 System.out.println("Usage: MatrixMultiplicationDriver <input path> <output path> <rows A> 
<columns B>");
 System.exit(-1);
 }
 Configuration conf = new Configuration();
 conf.setInt("matrix.A.rows", Integer.parseInt(args[2]));
 conf.setInt("matrix.B.columns", Integer.parseInt(args[3]));
 Job job = Job.getInstance(conf);
 job.setJarByClass(MatrixMultiplicationDriver.class);
 job.setJobName("Matrix Multiplication");
 job.setMapperClass(MatrixMapper.class);
 job.setReducerClass(MatrixReducer.class);
 job.setOutputKeyClass(Text.class););
job.setOutputValueClass(Text.class);

 FileInputFormat.addInputPath(job, new Path(args[0]));

 FileOutputFormat.setOutputPath(job, new Path(args[1]));
 System.exit(job.waitForCompletion(true) ? 0 : 1);
 }
}
Input and Execution
Input Format:
MatrixName RowIndex ColumnIndex Value
A 0 0 1
A 0 1 2
B 0 0 3
B 1 0 4
Execution:
Compile the Java files: 
javac -classpath $(hadoop classpath) -d . MatrixMapper.java MatrixReducer.java 
MatrixMultiplicationDriver.java
jar -cvf matrixmultiplication.jar *.class
Create input directory in HDFS and upload input file: 
dfs dfs -mkdir /matrixinput
hdfs dfs -put input.txt /matrixinput
Run the MapReduce job: 
hadoop jar matrixmultiplication.jar hadoop.matrix.MatrixMultiplicationDriver /matrixinput 
/matrixoutput 2 2
View the results: 
1hdfs dfs -cat /matrixoutput/part-r-00000
Editor is loading...
Leave a Comment