Untitled
unknown
plain_text
14 days ago
1.3 kB
2
Indexable
Never
import os import sys import datetime import pandas as pd from pyspark.sql import SparkSession import pyspark #iteritems is removed from pandas 2.0 #assign DataFrame.items to DataFrame.iteritems pd.DataFrame.iteritems = pd.DataFrame.items os.environ['HADOOP_CONF_DIR'] = '/etc/hadoop/conf' os.environ['YARN_CONF_DIR'] = '/etc/hadoop/conf' os.environ['PYSPARK_PYTHON'] = '/usr/bin/python3' def main(): print("Loading city geo data") local_path = sys.argv[1] hdfs_path = sys.argv[2] current_time = datetime.datetime.now().strftime("%Y%m%d%H%M%S") print(f'local_path ->> {local_path}, hdfs_path ->> {hdfs_path}, current_time ->> {current_time}') spark = SparkSession.builder \ .master("yarn") \ .appName(f"load_city_geo_{current_time}")\ .getOrCreate() #get local data cities_geo_local_df = pd.read_csv(local_path, sep = ';') cities_geo_local_spark_df = spark.createDataFrame(cities_geo_local_df) #save local data to hdfs cities_geo_local_spark_df.write\ .mode("overwrite")\ .parquet(hdfs_path) return "City geo data loaded" if __name__ == '__main__': main()
Leave a Comment