Untitled

mail@pastecode.io avatar
unknown
plain_text
a year ago
2.2 kB
1
Indexable
Never
#!/usr/bin/env python3
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
import sys

spark = SparkSession.builder.appName('Assignment1').getOrCreate()

df_case_2012 = spark.read.option('header','true').csv(sys.argv[1],inferSchema = True).select(['ddl_case_id','state_code'])
df_case_2013 = spark.read.option('header','true').csv(sys.argv[2],inferSchema = True).select(['ddl_case_id','state_code'])
df_case_2014 = spark.read.option('header','true').csv(sys.argv[3],inferSchema = True).select(['ddl_case_id','state_code'])
df_cases_state_key = spark.read.option('header','true').csv(sys.argv[4],inferSchema = True)
df_judge_merge_key = spark.read.option('header','true').csv(sys.argv[6],inferSchema = True).select(['ddl_case_id','ddl_decision_judge_id'])
df_acts_sections = spark.read.option('header','true').csv(sys.argv[7],inferSchema = True).select(['ddl_case_id','criminal'])

output = sys.argv[8]



df_case_20121314 = df_case_2012.union(df_case_2013).union(df_case_2014)
df_total_crime = df_case_20121314.groupBy('state_code').count()

df_statekey_name = df_cases_state_key.select(['state_code','state_name']).dropDuplicates()
df_total_crime_top10 = df_total_crime.join(df_statekey_name, on=df_total_crime['state_code'] == df_statekey_name['state_code'])

df_total_crime_top10 = df_total_crime_top10.orderBy(col('count').desc()).limit(10)

top_states = df_total_crime_top10.select('state_name').rdd.flatMap(lambda x: x).collect()

criminal_case = df_acts_sections.filter(col('criminal') == 1).select('ddl_case_id').withColumnRenamed('ddl_case_id', 'criminal_ddl_case_id')


df_judge_criminal = criminal_case.join(
    df_judge_merge_key,
    on=criminal_case['criminal_ddl_case_id'] == df_judge_merge_key['ddl_case_id'],
    how='left'
).na.drop(subset=['ddl_decision_judge_id'])



df_judge_case = df_case_20121314.join(
    df_judge_criminal,
    on=df_case_20121314['ddl_case_id'] == df_judge_criminal['ddl_case_id'],
    how='left'
).na.drop(subset=['ddl_decision_judge_id'])

mode_result = df_judge_case.groupBy("ddl_decision_judge_id").count().orderBy(col("count").desc()).first()
judge = mode_result['ddl_decision_judge_id']

print((top_states,judge),file=open(output, 'w'))