Untitled
unknown
plain_text
4 years ago
4.7 kB
3
Indexable
import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.MultipleInputs; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class ReduceChain { public static class CustsMapper extends Mapper <Object, Text, Text, Text> { public void map(Object key, Text value, Context context) throws IOException, InterruptedException { String record = value.toString(); String[] parts = record.split(","); context.write(new Text(parts[0]), new Text("age " + parts[3])); } } public static class TxnsMapper extends Mapper <Object, Text, Text, Text> { public void map(Object key, Text value, Context context) throws IOException, InterruptedException { String record = value.toString(); String[] parts = record.split(","); context.write(new Text(parts[2]), new Text("spt " + parts[4])); } } public static class ReduceChainReducer extends Reducer <Text, Text, Text, Text> { public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { String age = ""; String sport=""; for (Text t : values) { String parts[] = t.toString().split(" "); if (parts[0].equals("spt")) { if(!sport.contains(parts[1])) sport += parts[1]+","; } else if (parts[0].equals("age")) { age = parts[1]; } } sport = sport.substring(0, sport.length() - 1); context.write(new Text(age), new Text(sport)); } } public static class GameMapper extends Mapper <Object, Text, Text, Text> { public void map(Object key, Text value, Context context) throws IOException, InterruptedException { String record = value.toString(); String record2 = record.replace("\t", ","); String parts[] = record2.split(","); for (String t : parts) { if(t!=parts[0]) { context.write(new Text(t), new Text(parts[0]));} } } } public static class GameReducer extends Reducer <Text, Text, Text, Text> { double lowestAvg=100000; String listgame; public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { int min = 100000000,max = -1; int count=0; double avg=0.0; for (Text val : values) { String t=val.toString(); int temp = Integer.parseInt(t); if(temp<min) min=temp; if(temp>max) max=temp; avg += temp; count++; } avg = avg/count; if(avg < lowestAvg) { lowestAvg=avg; listgame=key.toString(); } else { if(avg==lowestAvg) { listgame= listgame + ", " + key.toString(); } } String str1 = Integer.toString(min); String str2 = Integer.toString(max); String str3 = Double.toString(avg); String str = "[Min = " + str1 + ", Max = " + str2 + ", Avg = " + str3 +"]"; context.write(new Text(key.toString()), new Text(str)); } protected void cleanup (Context context) throws IOException, InterruptedException{ String Gametype = Double.toString(lowestAvg) + ": " + listgame; context.write(new Text("Game Type with lowest average age of"),new Text(Gametype)); } } public static void main(String[] args) throws Exception { Configuration conf1 = new Configuration(); Job job1 = new Job(conf1, "Reduce-side join"); job1.setJarByClass(ReduceChain.class); job1.setReducerClass(ReduceChainReducer.class); job1.setOutputKeyClass(Text.class); job1.setOutputValueClass(Text.class); MultipleInputs.addInputPath(job1, new Path(args[0]),TextInputFormat.class, CustsMapper.class); MultipleInputs.addInputPath(job1, new Path(args[1]),TextInputFormat.class, TxnsMapper.class); Path outputPath = new Path(args[2]); FileOutputFormat.setOutputPath(job1, outputPath); outputPath.getFileSystem(conf1).delete(outputPath); job1.waitForCompletion(true) ; Configuration conf2 = new Configuration(); Job job2 = Job.getInstance(conf2, "Reduce-side join 2"); job2.setJarByClass(ReduceChain.class); job2.setMapperClass(GameMapper.class); job2.setReducerClass(GameReducer.class); job2.setOutputKeyClass(Text.class); job2.setOutputValueClass(Text.class); job2.setNumReduceTasks(1); FileInputFormat.addInputPath(job2, new Path(args[2])); FileOutputFormat.setOutputPath(job2, new Path(args[3])); System.exit(job2.waitForCompletion(true) ? 0 : 1); } }
Editor is loading...