Untitled
unknown
python
a year ago
7.5 kB
19
Indexable
from gzip import GzipFile from io import BytesIO from clients import MicroServicePostgresClient import boto3 import pandas as pd import pandasql as ps import json import psycopg2 # Remove this import from datetime import datetime # add imports import os from unidecode import unidecode # Standard values for the load_type, when getting the data from the datalake. # It would be a separated file with the global variables and definitions. # It's easier to find the value when you are coding in IDEs. # Example: BATCH_LOAD, INCREMENTAL_LOAD = 1, 2 from utils import BATCH_LOAD, INCREMENTAL_LOAD # Example: POSTGRES_DATABASE = 'postgres' from utils import POSTGRES_DATABASE # Add environment variables # This variable contains the string used to connect to the DB. # It's something like this: "dbname=test user=postgres password=secret..." POSTGRES_DATABASE_CONNECTION_STRING = os.environ.get("CONN_STRING") # you can probably put these keys in the aws credentials file AWS_ACCESS_KEY = os.environ.get("AWS_ACCESS_KEY") AWS_SECRET_KEY = os.environ.get("AWS_SECRET_KEY") # and I would add another parameter in the boto3 call: AWS_REGION = os.environ.get("AWS_DEFAULT_REGION") class MicroServiceQuintoAndar: # Another option is using a ConnectionPool, so you don't need to create # a connection everytime someone is performing an ETL, but I'm not that familiar with # this approach @staticmethod def get_postgres_connection(encoding, timeout): try: conn = psycopg2.connect(POSTGRES_DATABASE_CONNECTION_STRING) # probably use logging instead of printing the output if encoding is not None: print('setting client encoding to {}'.format(encoding)) conn.set_client_encoding(encoding) if timeout: # Using logging here too conn.cursor().execute("set statement_timeout = {}".format(timeout)) return conn except (Exception, psycopg2.Error) as error: print(error) raise @staticmethod def create_client(self, encoding, timeout, db_type = POSTGRES_DATABASE): if db_type == POSTGRES_DATABASE: try: conn = self.get_postgres_connection(encoding, timeout) return MicroServicePostgresClient(conn) except Exception as e: print(f"Error: {e}") raise else: raise NotImplementedError # We can split this method in 4: Extract, Transform, Load raw data and Load transformed Data # There are too many arguments in this method, but most of them can be removed by refactoring the code or # be replaced by environment variables. Another possible solution would be creating a YAML file # and setting the variables in the __init__. # In the end, this method should just connect to database and perform the ETL process. def run_etl_pipeline(self, table_name, query_file_name, bucket, file_path, database_encoding = None, timeout = 0): client = self.create_client(database_encoding, timeout) json_data = self.extract_data_from_database(client, table_name, INCREMENTAL_LOAD) transformed_data = self.transform_data_from_database(json_data, query_file_name) if json_data is not None: self.load_raw_data_to_s3(json_data, bucket, file_path, query_file_name) else: print("Something went wrong. There's no data to be sent to S3.") if transformed_data is not None: self.load_transformed_data_to_s3(transformed_data, bucket, file_path) else: print("Something went wrong. There's no transformed data to be sent to S3.") # An important note: The data might be too heavy for the memory. If so, # the data should be read in chunks. def extract_data_from_database(client, table_name, load_type = BATCH_LOAD): try: if load_type == BATCH_LOAD: return client.get_batch_data_from_table_name(table_name) elif load_type == INCREMENTAL_LOAD: return client.get_incremental_data_from_table_name(table_name) else: raise NotImplementedError except Exception as e: print(f"Error: {e}") # Using a query from a SQL file, transform the raw data into something else. # It might be a query to clean or aggregate the data. def transform_data_from_database(json_data, query_file_name): try: with open(query_file_name, 'r') as f: transformation_query = f.readlines() df = pd.DataFrame.from_records(json_data) return ps.sqldf(transformation_query, locals()) except Exception as e: print(f"Error: {e}") def load_raw_data_to_s3(self, json_data, bucket, file_path): try: self.write_raw_data_to_bronze_layer(json_data, bucket, file_path) self.write_to_silver_layer(json_data, bucket, file_path) except Exception as e: print(f"Error: {e}") def load_transformed_data_to_s3(self, transformed_data, bucket, file_path): try: self.write_to_dw(transformed_data, bucket, file_path) except Exception as e: print(f"Error: {e}") def write_raw_data_to_bronze_layer(json_data, bucket, file_path): # Maybe it's better to define the aws region try: s3 = boto3.resource( 's3' , aws_access_key_id = AWS_ACCESS_KEY , aws_secret_access_key = AWS_SECRET_KEY , region_name = AWS_REGION) except Exception as e: print(f"S3 connection error: {e}") # Just be aware that json_data might be too big for the Memory. Splitting the data # into chunks and loading into S3 might me better. try: gz_body = BytesIO() for _json in json_data: with GzipFile(fileobj = gz_body, mode = "w") as fp: fp.write((json.dumps(_json, ensure_ascii = False, cls = UnidecodeHandler)).encode("utf-8")) fp.write("\n") s3.Bucket(bucket).put_object(Body = gz_body.getvalue(), Key = f"bronze/{file_path}") except Exception as e: print(f"Failed to send to s3 bucket: {e}") # It's odd that the original code doesn't perform any transformations, and simply # loads the data as a parquet file in S3 Silver layer. def write_to_silver_layer(json_data, bucket, file_path): try: df = pd.DataFrame.from_records(json_data) df.to_parquet(f"s3:://{bucket}/silver/{file_path}.parquet.gzip", compression = 'gzip') except Exception as e: print(f"Failed to send to silver layer: {e}") def write_to_dw(transformed_json_data, bucket, file_path): try: transformed_json_data.to_parquet(f"s3:://{bucket}/dw/{file_path}.parquet.gzip", compression = 'gzip') except Exception as e: print(f"Failed to send to Data Warehouse: {e}") if __name__ == "__main__": service = MicroServiceQuintoAndar service.run_etl_pipeline("table1", "query.sql", "bucket1", "file_path")
Editor is loading...
Leave a Comment