Untitled
unknown
plain_text
a year ago
1.3 kB
11
Indexable
import os
import sys
import datetime
import pandas as pd
from pyspark.sql import SparkSession
import pyspark
#iteritems is removed from pandas 2.0
#assign DataFrame.items to DataFrame.iteritems
pd.DataFrame.iteritems = pd.DataFrame.items
os.environ['HADOOP_CONF_DIR'] = '/etc/hadoop/conf'
os.environ['YARN_CONF_DIR'] = '/etc/hadoop/conf'
os.environ['PYSPARK_PYTHON'] = '/usr/bin/python3'
def main():
print("Loading city geo data")
local_path = sys.argv[1]
hdfs_path = sys.argv[2]
current_time = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
print(f'local_path ->> {local_path}, hdfs_path ->> {hdfs_path}, current_time ->> {current_time}')
spark = SparkSession.builder \
.master("yarn") \
.appName(f"load_city_geo_{current_time}")\
.getOrCreate()
#get local data
cities_geo_local_df = pd.read_csv(local_path, sep = ';')
cities_geo_local_spark_df = spark.createDataFrame(cities_geo_local_df)
#save local data to hdfs
cities_geo_local_spark_df.write\
.mode("overwrite")\
.parquet(hdfs_path)
return "City geo data loaded"
if __name__ == '__main__':
main() Editor is loading...
Leave a Comment