import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class ReduceChain {
public static class CustsMapper extends Mapper <Object, Text, Text, Text>
{
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException
{
String record = value.toString();
String[] parts = record.split(",");
context.write(new Text(parts[0]), new Text("age " + parts[3]));
}
}
public static class TxnsMapper extends Mapper <Object, Text, Text, Text>
{
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException
{
String record = value.toString();
String[] parts = record.split(",");
context.write(new Text(parts[2]), new Text("spt " + parts[4]));
}
}
public static class ReduceChainReducer extends Reducer <Text, Text, Text, Text>
{
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException
{
String age = "";
String sport="";
for (Text t : values)
{
String parts[] = t.toString().split(" ");
if (parts[0].equals("spt"))
{
if(!sport.contains(parts[1]))
sport += parts[1]+",";
}
else if (parts[0].equals("age"))
{
age = parts[1];
}
}
sport = sport.substring(0, sport.length() - 1);
context.write(new Text(age), new Text(sport));
}
}
public static class GameMapper extends Mapper <Object, Text, Text, Text>
{
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException
{
String record = value.toString();
String record2 = record.replace("\t", ",");
String parts[] = record2.split(",");
for (String t : parts)
{
if(t!=parts[0])
{
context.write(new Text(t), new Text(parts[0]));}
}
}
}
public static class GameReducer extends Reducer <Text, Text, Text, Text>
{
double lowestAvg=100000;
String listgame;
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException
{
int min = 100000000,max = -1;
int count=0;
double avg=0.0;
for (Text val : values)
{
String t=val.toString();
int temp = Integer.parseInt(t);
if(temp<min)
min=temp;
if(temp>max)
max=temp;
avg += temp;
count++;
}
avg = avg/count;
if(avg < lowestAvg)
{
lowestAvg=avg;
listgame=key.toString();
}
else
{
if(avg==lowestAvg)
{
listgame= listgame + ", " + key.toString();
}
}
String str1 = Integer.toString(min);
String str2 = Integer.toString(max);
String str3 = Double.toString(avg);
String str = "[Min = " + str1 + ", Max = " + str2 + ", Avg = " + str3 +"]";
context.write(new Text(key.toString()), new Text(str));
}
protected void cleanup (Context context)
throws IOException, InterruptedException{
String Gametype = Double.toString(lowestAvg) + ": " + listgame;
context.write(new Text("Game Type with lowest average age of"),new Text(Gametype));
}
}
public static void main(String[] args) throws Exception {
Configuration conf1 = new Configuration();
Job job1 = new Job(conf1, "Reduce-side join");
job1.setJarByClass(ReduceChain.class);
job1.setReducerClass(ReduceChainReducer.class);
job1.setOutputKeyClass(Text.class);
job1.setOutputValueClass(Text.class);
MultipleInputs.addInputPath(job1, new Path(args[0]),TextInputFormat.class, CustsMapper.class);
MultipleInputs.addInputPath(job1, new Path(args[1]),TextInputFormat.class, TxnsMapper.class);
Path outputPath = new Path(args[2]);
FileOutputFormat.setOutputPath(job1, outputPath);
outputPath.getFileSystem(conf1).delete(outputPath);
job1.waitForCompletion(true) ;
Configuration conf2 = new Configuration();
Job job2 = Job.getInstance(conf2, "Reduce-side join 2");
job2.setJarByClass(ReduceChain.class);
job2.setMapperClass(GameMapper.class);
job2.setReducerClass(GameReducer.class);
job2.setOutputKeyClass(Text.class);
job2.setOutputValueClass(Text.class);
job2.setNumReduceTasks(1);
FileInputFormat.addInputPath(job2, new Path(args[2]));
FileOutputFormat.setOutputPath(job2, new Path(args[3]));
System.exit(job2.waitForCompletion(true) ? 0 : 1);
}
}