Untitled
unknown
plain_text
2 years ago
2.7 kB
3
Indexable
from pandas import read_sql as pd_read_sql def connect_to_database(json_data, replicationTarget): encrypt = json_data.get(replicationTarget, {}).get('ENCRYPT') host = json_data.get(replicationTarget, {}).get('DB_HOST') port = json_data.get(replicationTarget, {}).get('DB_PORT') user = json_data.get(replicationTarget, {}).get('DB_USER') db_type = json_data.get(replicationTarget, {}).get('DB_TYPE') schema = json_data.get(replicationTarget, {}).get('DB_SCHEMA') if encrypt == 'Y': password = base_b64decode(json_data.get(replicationTarget, {}).get('DB_PASSWORD')).decode('utf-8') else: password = json_data.get(replicationTarget, {}).get('DB_PASSWORD') if db_type == 'MYSQL': cnx = mysql.connector.connect(user=user, password=password, host=host, port=port) cursor = cnx.cursor() logging.info(f"connected to database server {replicationTarget}: {host}:{port}") elif db_type == 'ORACLE': import oracledb oracle_mode = oracledb.is_thin_mode() print("Oracle mode: %s" % oracle_mode) if oracle_mode: oracledb.init_oracle_client() print("Enabled python-oracledb Thick mode") else: print("Default python-oracledb Thick mode") cnx_text = ('oracle://%s:%s@%s:%s/?service_name=%s' % (user, password, host, port, schema)) cnx = sqlalchemy_create_engine(cnx_text, encoding="utf8") cnx = connection.raw_connection() cursor = cnx.cursor() return cnx, cursor, schema with open(json_file_path) as json_file: json_data = json.load(json_file) conn_ppm, cursor_ppm, schema_ppm = connect_to_database(json_data, 'PPM_PC') ##cursor_ppm = conn_ppm.cursor() primary_query = f"SELECT * FROM {schema_ppm}.etl_ppm_replication_master" cursor_ppm.execute(primary_query) logging.info(f"executed primary query - {primary_query}") query_info = f"-- ++++++++++++++++++++|PRIMARY_QUERY\n" query_info += f"-- #Query:\n{primary_query};\n" write_sql(query_info) rows = cursor_ppm.fetchall() columns = [desc[0] for desc in cursor_ppm.description] instead of using below thing cursor_ppm.execute(primary_query) logging.info(f"executed primary query - {primary_query}") query_info = f"-- ++++++++++++++++++++|PRIMARY_QUERY\n" query_info += f"-- #Query:\n{primary_query};\n" write_sql(query_info) rows = cursor_ppm.fetchall() columns = [desc[0] for desc in cursor_ppm.description] can you modify code like use pd_read_sql
Editor is loading...