Untitled
unknown
plain_text
9 months ago
1.2 kB
7
Indexable
def cleanTables(lakeName, tableName):
try:
spark.sql(f'OPTIMIZE {lakeName}.{tableName} VORDER')
spark.sql(f'VACUUM {lakeName}.{tableName} RETAIN 360 HOURS')
print(f'\nTable {lakeName}.{tableName} OPTIMIZED and VACUUMED successfully')
except Exception as e:
print(f"Error optimizing/vacuuming {lakeName}.{tableName}: {str(e)}")
def cleanLakehouse(lakeName):
try:
tables_df = spark.sql(f'SHOW TABLES IN {lakeName}')
tableNames = [row['tableName'] for row in tables_df.collect()]
for table in tableNames:
cleanTables(lakeName, table)
except Exception as e:
print(f"Error processing lakehouse {lakeName}: {str(e)}")
def listLakes():
try:
lakehouses = spark.catalog.listDatabases()
return [lakehouse.name for lakehouse in lakehouses]
except Exception as e:
print(f"Error listing lakehouses: {str(e)}")
return []
# Main Execution
spark.conf.set("spark.databricks.delta.retentionDurationCheck.enabled", "false")
lakeList = listLakes()
for lake in lakeList:
print(f"\nProcessing lakehouse: {lake}")
cleanLakehouse(lake)
Editor is loading...
Leave a Comment