Untitled
unknown
plain_text
9 months ago
3.8 kB
3
Indexable
MatrixMapper.java
package hadoop.matrix;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class MatrixMapper extends Mapper<Object, Text, Text, Text> {
@Override
public void map(Object key, Text value, Context context) throws IOException,
InterruptedException {
// Input format: "MatrixName i j value"
// Example: "A 0 1 5" or "B 1 0 6"
String[] parts = value.toString().split("\\s+");
String matrixName = parts[0];
int i = Integer.parseInt(parts[1]);
int j = Integer.parseInt(parts[2]);
String cellValue = parts[3];
if (matrixName.equals("A")) {
for (int k = 0; k < context.getConfiguration().getInt("matrix.B.columns", 0); k++) {
context.write(new Text(i + "," + k), new Text("A," + j + "," + cellValue));
}
} else if (matrixName.equals("B")) {
for (int k = 0; k < context.getConfiguration().getInt("matrix.A.rows", 0); k++) {
context.write(new Text(k + "," + j), new Text("B," + i + "," + cellValue));
}
}
}
}
MatrixReducer.java
package hadoop.matrix;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.util.HashMap;
public class MatrixReducer extends Reducer<Text, Text, Text, Text> {
@Override
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException,
InterruptedException {
HashMap<Integer, Float> aMap = new HashMap<>(HashMap<Integer, Float> bMap = new HashMap<>();
for (Text value : values) {
String[] parts = value.toString().split(",");
if (parts[0].equals("A")) {
aMap.put(Integer.parseInt(parts[1]), Float.parseFloat(parts[2]));
} else if (parts[0].equals("B")) {
bMap.put(Integer.parseInt(parts[1]), Float.parseFloat(parts[2]));
}
}
float sum = 0.0f;
for (int k : aMap.keySet()) {
if (bMap.containsKey(k)) {
sum += aMap.get(k) * bMap.get(k);
}
}
context.write(key, new Text(String.valueOf(sum)));
}
}
MatrixMultiplicationDriver.java
package hadoop.matrix;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MatrixMultiplicationDriver {
public static void main(String[] args) throws Exception {
if (args.length != 4) {
System.out.println("Usage: MatrixMultiplicationDriver <input path> <output path> <rows A>
<columns B>");
System.exit(-1);
}
Configuration conf = new Configuration();
conf.setInt("matrix.A.rows", Integer.parseInt(args[2]));
conf.setInt("matrix.B.columns", Integer.parseInt(args[3]));
Job job = Job.getInstance(conf);
job.setJarByClass(MatrixMultiplicationDriver.class);
job.setJobName("Matrix Multiplication");
job.setMapperClass(MatrixMapper.class);
job.setReducerClass(MatrixReducer.class);
job.setOutputKeyClass(Text.class););
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
Input and Execution
Input Format:
MatrixName RowIndex ColumnIndex Value
A 0 0 1
A 0 1 2
B 0 0 3
B 1 0 4
Execution:
Compile the Java files:
javac -classpath $(hadoop classpath) -d . MatrixMapper.java MatrixReducer.java
MatrixMultiplicationDriver.java
jar -cvf matrixmultiplication.jar *.class
Create input directory in HDFS and upload input file:
dfs dfs -mkdir /matrixinput
hdfs dfs -put input.txt /matrixinput
Run the MapReduce job:
hadoop jar matrixmultiplication.jar hadoop.matrix.MatrixMultiplicationDriver /matrixinput
/matrixoutput 2 2
View the results:
1hdfs dfs -cat /matrixoutput/part-r-00000Editor is loading...
Leave a Comment