Untitled
def cleanTables(lakeName, tableName): try: spark.sql(f'OPTIMIZE {lakeName}.{tableName} VORDER') spark.sql(f'VACUUM {lakeName}.{tableName} RETAIN 360 HOURS') print(f'\nTable {lakeName}.{tableName} OPTIMIZED and VACUUMED successfully') except Exception as e: print(f"Error optimizing/vacuuming {lakeName}.{tableName}: {str(e)}") def cleanLakehouse(lakeName): try: tables_df = spark.sql(f'SHOW TABLES IN {lakeName}') tableNames = [row['tableName'] for row in tables_df.collect()] for table in tableNames: cleanTables(lakeName, table) except Exception as e: print(f"Error processing lakehouse {lakeName}: {str(e)}") def listLakes(): try: lakehouses = spark.catalog.listDatabases() return [lakehouse.name for lakehouse in lakehouses] except Exception as e: print(f"Error listing lakehouses: {str(e)}") return [] # Main Execution spark.conf.set("spark.databricks.delta.retentionDurationCheck.enabled", "false") lakeList = listLakes() for lake in lakeList: print(f"\nProcessing lakehouse: {lake}") cleanLakehouse(lake)
Leave a Comment