Untitled

mail@pastecode.io avatar
unknown
python
23 days ago
7.5 kB
9
Indexable
Never
from gzip import GzipFile
from io import BytesIO

from clients import MicroServicePostgresClient
import boto3
import pandas as pd
import pandasql as ps
import json
import psycopg2

# Remove this import
from datetime import datetime

# add imports
import os

from unidecode import unidecode

# Standard values for the load_type, when getting the data from the datalake.
# It would be a separated file with the global variables and definitions.
# It's easier to find the value when you are coding in IDEs.

# Example: BATCH_LOAD, INCREMENTAL_LOAD = 1, 2
from utils import BATCH_LOAD, INCREMENTAL_LOAD

# Example: POSTGRES_DATABASE = 'postgres'
from utils import POSTGRES_DATABASE

# Add environment variables

# This variable contains the string used to connect to the DB.
# It's something like this: "dbname=test user=postgres password=secret..."
POSTGRES_DATABASE_CONNECTION_STRING = os.environ.get("CONN_STRING")

# you can probably put these keys in the aws credentials file
AWS_ACCESS_KEY = os.environ.get("AWS_ACCESS_KEY")
AWS_SECRET_KEY = os.environ.get("AWS_SECRET_KEY")

# and I would add another parameter in the boto3 call:
AWS_REGION = os.environ.get("AWS_DEFAULT_REGION")




class MicroServiceQuintoAndar:
    
    
    # Another option is using a ConnectionPool, so you don't need to create
    # a connection everytime someone is performing an ETL, but I'm not that familiar with
    # this approach
    @staticmethod
    def get_postgres_connection(encoding, timeout):
            
        try:
            conn = psycopg2.connect(POSTGRES_DATABASE_CONNECTION_STRING)

            # probably use logging instead of printing the output
            if encoding is not None:
                print('setting client encoding to {}'.format(encoding))
                conn.set_client_encoding(encoding)
            if timeout:
                # Using logging here too
                conn.cursor().execute("set statement_timeout = {}".format(timeout))
            return conn
        except (Exception, psycopg2.Error) as error:
            print(error)
            raise

    @staticmethod
    def create_client(self, encoding, timeout, db_type = POSTGRES_DATABASE):
        if db_type == POSTGRES_DATABASE:
            try:
                conn = self.get_postgres_connection(encoding, timeout)
                return MicroServicePostgresClient(conn)
            except Exception as e:
                print(f"Error: {e}")
                raise
        else:
            raise NotImplementedError


    # We can split this method in 4: Extract, Transform, Load raw data and Load transformed Data
    # There are too many arguments in this method, but most of them can be removed by refactoring the code or
    # be replaced by environment variables. Another possible solution would be creating a YAML file
    # and setting the variables in the __init__.
        
    # In the end, this method should just connect to database and perform the ETL process. 
        
    def run_etl_pipeline(self, table_name, query_file_name, bucket, file_path, database_encoding = None, timeout = 0):
    
        client = self.create_client(database_encoding, timeout)

        json_data = self.extract_data_from_database(client, table_name, INCREMENTAL_LOAD)
        transformed_data = self.transform_data_from_database(json_data, query_file_name)
        if json_data is not None:
            self.load_raw_data_to_s3(json_data, bucket, file_path, query_file_name)
        else:
            print("Something went wrong. There's no data to be sent to S3.")
        if transformed_data is not None:
            self.load_transformed_data_to_s3(transformed_data, bucket, file_path)
        else:
            print("Something went wrong. There's no transformed data to be sent to S3.")

    
    # An important note: The data might be too heavy for the memory. If so,
    # the data should be read in chunks.
    def extract_data_from_database(client, table_name, load_type = BATCH_LOAD):

        try:
            if load_type == BATCH_LOAD:
                return client.get_batch_data_from_table_name(table_name)
            elif load_type == INCREMENTAL_LOAD:
                return client.get_incremental_data_from_table_name(table_name)
            else:
                raise NotImplementedError
        except Exception as e:
            print(f"Error: {e}")

    
    # Using a query from a SQL file, transform the raw data into something else.
    # It might be a query to clean or aggregate the data.
    def transform_data_from_database(json_data, query_file_name):

        try:
            with open(query_file_name, 'r') as f:
                transformation_query = f.readlines()
            df = pd.DataFrame.from_records(json_data)
            return ps.sqldf(transformation_query, locals())
        except Exception as e:
            print(f"Error: {e}")

    def load_raw_data_to_s3(self, json_data, bucket, file_path):

        try:
            self.write_raw_data_to_bronze_layer(json_data, bucket, file_path)
            self.write_to_silver_layer(json_data, bucket, file_path)
        except Exception as e:
            print(f"Error: {e}")

    def load_transformed_data_to_s3(self, transformed_data, bucket, file_path):
        try:
            self.write_to_dw(transformed_data, bucket, file_path)
        except Exception as e:
            print(f"Error: {e}")

    def write_raw_data_to_bronze_layer(json_data, bucket, file_path):
        # Maybe it's better to define the aws region
        try:
            s3 = boto3.resource(
                's3'
                , aws_access_key_id = AWS_ACCESS_KEY
                , aws_secret_access_key = AWS_SECRET_KEY
                , region_name = AWS_REGION)
        except Exception as e:
            print(f"S3 connection error: {e}")
        # Just be aware that json_data might be too big for the Memory. Splitting the data
        # into chunks and loading into S3 might me better.
        
        try:
            gz_body = BytesIO()
            for _json in json_data:
                with GzipFile(fileobj = gz_body, mode = "w") as fp:
                    fp.write((json.dumps(_json, ensure_ascii = False, cls = UnidecodeHandler)).encode("utf-8"))
                    fp.write("\n")

            s3.Bucket(bucket).put_object(Body = gz_body.getvalue(), Key = f"bronze/{file_path}")
        except Exception as e:
            print(f"Failed to send to s3 bucket: {e}")

    # It's odd that the original code doesn't perform any transformations, and simply
    # loads the data as a parquet file in S3 Silver layer.
    def write_to_silver_layer(json_data, bucket, file_path):

        try:
            df = pd.DataFrame.from_records(json_data)
            df.to_parquet(f"s3:://{bucket}/silver/{file_path}.parquet.gzip", compression = 'gzip')
        except Exception as e:
            print(f"Failed to send to silver layer: {e}")

    def write_to_dw(transformed_json_data, bucket, file_path):
        try:
            transformed_json_data.to_parquet(f"s3:://{bucket}/dw/{file_path}.parquet.gzip", compression = 'gzip')
        except Exception as e:
            print(f"Failed to send to Data Warehouse: {e}")

if __name__ == "__main__":
    service = MicroServiceQuintoAndar
    service.run_etl_pipeline("table1", "query.sql", "bucket1", "file_path")
Leave a Comment