Untitled

mail@pastecode.io avatar
unknown
plain_text
3 years ago
4.7 kB
1
Indexable
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 
 public class ReduceChain {
	 

 public static class CustsMapper extends Mapper <Object, Text, Text, Text>
 {
 public void map(Object key, Text value, Context context)
 throws IOException, InterruptedException 
 {
 String record = value.toString();
 String[] parts = record.split(",");
 context.write(new Text(parts[0]), new Text("age   " + parts[3]));
 }
 }
 
 public static class TxnsMapper extends Mapper <Object, Text, Text, Text>
 {
 public void map(Object key, Text value, Context context) 
 throws IOException, InterruptedException 
 {
 String record = value.toString();
 String[] parts = record.split(",");
 context.write(new Text(parts[2]), new Text("spt   " + parts[4]));
 }
 }
 
 public static class ReduceChainReducer extends Reducer <Text, Text, Text, Text>
 {
 public void reduce(Text key, Iterable<Text> values, Context context)
 throws IOException, InterruptedException 
 {
 String age = "";
 String sport="";
 for (Text t : values) 
 { 
 String parts[] = t.toString().split("   ");
 if (parts[0].equals("spt")) 
 {
	 if(!sport.contains(parts[1]))
 sport += parts[1]+",";
 } 
 else if (parts[0].equals("age")) 
 {
 age = parts[1];
 }
 }
 sport = sport.substring(0, sport.length() - 1);
 context.write(new Text(age), new Text(sport));
 }
 }
 
 public static class GameMapper extends Mapper <Object, Text, Text, Text>
 {
 public void map(Object key, Text value, Context context)
 throws IOException, InterruptedException 
 {
	 String record = value.toString();
	 String record2 = record.replace("\t", ",");
	 String parts[] = record2.split(",");
	 
	 for (String t : parts) 
	 {
		 if(t!=parts[0])
		 {
		 context.write(new Text(t), new Text(parts[0]));}
	 }
 }
 }
 
 public static class GameReducer extends Reducer <Text, Text, Text, Text>
 {
 double lowestAvg=100000;
 String listgame;
 public void reduce(Text key, Iterable<Text> values, Context context)
 throws IOException, InterruptedException 
 {
 int min = 100000000,max = -1;
 int count=0;
 double avg=0.0;
 for (Text val : values) 
 {  
	 String t=val.toString();
	 int temp = Integer.parseInt(t);
	 if(temp<min)
		 min=temp;
	 if(temp>max)
		 max=temp;
	 avg += temp;
	 count++;	 
 }
 avg = avg/count;
 if(avg < lowestAvg)
 {
	 lowestAvg=avg; 
	 listgame=key.toString();
 }
 else 
 {
	 if(avg==lowestAvg)
	 {
	 listgame= listgame + ", " + key.toString();
	 }
 }
 
 String str1 = Integer.toString(min);
 String str2 = Integer.toString(max);
 String str3 = Double.toString(avg);
 String str = "[Min = " + str1 + ", Max = " + str2 + ", Avg = " + str3 +"]";
 context.write(new Text(key.toString()), new Text(str));
 }
 
 protected void cleanup (Context context) 
		 throws IOException, InterruptedException{
	 String Gametype = Double.toString(lowestAvg) + ": " + listgame;
	 
	 context.write(new Text("Game Type with lowest average age of"),new Text(Gametype));
 }
 }
 
 
 public static void main(String[] args) throws Exception {
 Configuration conf1 = new Configuration();
 Job job1 = new Job(conf1, "Reduce-side join");
 job1.setJarByClass(ReduceChain.class);
 job1.setReducerClass(ReduceChainReducer.class);
 job1.setOutputKeyClass(Text.class);
 job1.setOutputValueClass(Text.class);
 MultipleInputs.addInputPath(job1, new Path(args[0]),TextInputFormat.class, CustsMapper.class);
 MultipleInputs.addInputPath(job1, new Path(args[1]),TextInputFormat.class, TxnsMapper.class);
 Path outputPath = new Path(args[2]);
 FileOutputFormat.setOutputPath(job1, outputPath);
 outputPath.getFileSystem(conf1).delete(outputPath);
 job1.waitForCompletion(true) ;
 
 Configuration conf2 = new Configuration();
 Job job2 = Job.getInstance(conf2, "Reduce-side join 2");
 job2.setJarByClass(ReduceChain.class);
 job2.setMapperClass(GameMapper.class);
 job2.setReducerClass(GameReducer.class);
 job2.setOutputKeyClass(Text.class);
 job2.setOutputValueClass(Text.class);
 job2.setNumReduceTasks(1);
 FileInputFormat.addInputPath(job2, new Path(args[2]));
 FileOutputFormat.setOutputPath(job2, new Path(args[3]));
 System.exit(job2.waitForCompletion(true) ? 0 : 1);
 }
 }