Untitled
unknown
plain_text
a year ago
2.2 kB
1
Indexable
Never
#!/usr/bin/env python3 import pyspark from pyspark.sql import SparkSession from pyspark.sql.functions import col import sys spark = SparkSession.builder.appName('Assignment1').getOrCreate() df_case_2012 = spark.read.option('header','true').csv(sys.argv[1],inferSchema = True).select(['ddl_case_id','state_code']) df_case_2013 = spark.read.option('header','true').csv(sys.argv[2],inferSchema = True).select(['ddl_case_id','state_code']) df_case_2014 = spark.read.option('header','true').csv(sys.argv[3],inferSchema = True).select(['ddl_case_id','state_code']) df_cases_state_key = spark.read.option('header','true').csv(sys.argv[4],inferSchema = True) df_judge_merge_key = spark.read.option('header','true').csv(sys.argv[6],inferSchema = True).select(['ddl_case_id','ddl_decision_judge_id']) df_acts_sections = spark.read.option('header','true').csv(sys.argv[7],inferSchema = True).select(['ddl_case_id','criminal']) output = sys.argv[8] df_case_20121314 = df_case_2012.union(df_case_2013).union(df_case_2014) df_total_crime = df_case_20121314.groupBy('state_code').count() df_statekey_name = df_cases_state_key.select(['state_code','state_name']).dropDuplicates() df_total_crime_top10 = df_total_crime.join(df_statekey_name, on=df_total_crime['state_code'] == df_statekey_name['state_code']) df_total_crime_top10 = df_total_crime_top10.orderBy(col('count').desc()).limit(10) top_states = df_total_crime_top10.select('state_name').rdd.flatMap(lambda x: x).collect() criminal_case = df_acts_sections.filter(col('criminal') == 1).select('ddl_case_id').withColumnRenamed('ddl_case_id', 'criminal_ddl_case_id') df_judge_criminal = criminal_case.join( df_judge_merge_key, on=criminal_case['criminal_ddl_case_id'] == df_judge_merge_key['ddl_case_id'], how='left' ).na.drop(subset=['ddl_decision_judge_id']) df_judge_case = df_case_20121314.join( df_judge_criminal, on=df_case_20121314['ddl_case_id'] == df_judge_criminal['ddl_case_id'], how='left' ).na.drop(subset=['ddl_decision_judge_id']) mode_result = df_judge_case.groupBy("ddl_decision_judge_id").count().orderBy(col("count").desc()).first() judge = mode_result['ddl_decision_judge_id'] print((top_states,judge),file=open(output, 'w'))