#!/usr/bin/env python3
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
import sys
spark = SparkSession.builder.appName('Assignment1').getOrCreate()
df_case_2012 = spark.read.option('header','true').csv(sys.argv[1],inferSchema = True).select(['ddl_case_id','state_code'])
df_case_2013 = spark.read.option('header','true').csv(sys.argv[2],inferSchema = True).select(['ddl_case_id','state_code'])
df_case_2014 = spark.read.option('header','true').csv(sys.argv[3],inferSchema = True).select(['ddl_case_id','state_code'])
df_cases_state_key = spark.read.option('header','true').csv(sys.argv[4],inferSchema = True)
df_judge_merge_key = spark.read.option('header','true').csv(sys.argv[6],inferSchema = True).select(['ddl_case_id','ddl_decision_judge_id'])
df_acts_sections = spark.read.option('header','true').csv(sys.argv[7],inferSchema = True).select(['ddl_case_id','criminal'])
output = sys.argv[8]
df_case_20121314 = df_case_2012.union(df_case_2013).union(df_case_2014)
df_total_crime = df_case_20121314.groupBy('state_code').count()
df_statekey_name = df_cases_state_key.select(['state_code','state_name']).dropDuplicates()
df_total_crime_top10 = df_total_crime.join(df_statekey_name, on=df_total_crime['state_code'] == df_statekey_name['state_code'])
df_total_crime_top10 = df_total_crime_top10.orderBy(col('count').desc()).limit(10)
top_states = df_total_crime_top10.select('state_name').rdd.flatMap(lambda x: x).collect()
criminal_case = df_acts_sections.filter(col('criminal') == 1).select('ddl_case_id').withColumnRenamed('ddl_case_id', 'criminal_ddl_case_id')
df_judge_criminal = criminal_case.join(
df_judge_merge_key,
on=criminal_case['criminal_ddl_case_id'] == df_judge_merge_key['ddl_case_id'],
how='left'
).na.drop(subset=['ddl_decision_judge_id'])
df_judge_case = df_case_20121314.join(
df_judge_criminal,
on=df_case_20121314['ddl_case_id'] == df_judge_criminal['ddl_case_id'],
how='left'
).na.drop(subset=['ddl_decision_judge_id'])
mode_result = df_judge_case.groupBy("ddl_decision_judge_id").count().orderBy(col("count").desc()).first()
judge = mode_result['ddl_decision_judge_id']
print((top_states,judge),file=open(output, 'w'))