Untitled

mail@pastecode.io avatar
unknown
plain_text
14 days ago
1.3 kB
2
Indexable
Never
import os
import sys
import datetime
import pandas as pd
from pyspark.sql import SparkSession
import pyspark
#iteritems is removed from pandas 2.0
#assign DataFrame.items to DataFrame.iteritems
pd.DataFrame.iteritems = pd.DataFrame.items

os.environ['HADOOP_CONF_DIR'] = '/etc/hadoop/conf'
os.environ['YARN_CONF_DIR'] = '/etc/hadoop/conf'
os.environ['PYSPARK_PYTHON'] = '/usr/bin/python3'

def main():
    print("Loading city geo data")
    local_path = sys.argv[1]
    hdfs_path = sys.argv[2]
    current_time = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
    print(f'local_path ->> {local_path}, hdfs_path ->> {hdfs_path}, current_time ->> {current_time}')
    spark = SparkSession.builder \
                    .master("yarn") \
                    .appName(f"load_city_geo_{current_time}")\
                    .getOrCreate()
    #get local data
    cities_geo_local_df = pd.read_csv(local_path, sep = ';')
    cities_geo_local_spark_df = spark.createDataFrame(cities_geo_local_df)
    #save local data to hdfs
    cities_geo_local_spark_df.write\
                             .mode("overwrite")\
                             .parquet(hdfs_path)
    return "City geo data loaded"
    
if __name__ == '__main__':
    main()   
Leave a Comment