Untitled

 avatar
unknown
plain_text
17 days ago
1.2 kB
5
Indexable
def cleanTables(lakeName, tableName):
    try:
        spark.sql(f'OPTIMIZE {lakeName}.{tableName} VORDER')
        spark.sql(f'VACUUM {lakeName}.{tableName} RETAIN 360 HOURS')
        print(f'\nTable {lakeName}.{tableName} OPTIMIZED and VACUUMED successfully')
    except Exception as e:
        print(f"Error optimizing/vacuuming {lakeName}.{tableName}: {str(e)}")

def cleanLakehouse(lakeName):
    try:
        tables_df = spark.sql(f'SHOW TABLES IN {lakeName}')
        tableNames = [row['tableName'] for row in tables_df.collect()]
        for table in tableNames:
            cleanTables(lakeName, table)
    except Exception as e:
        print(f"Error processing lakehouse {lakeName}: {str(e)}")

def listLakes():
    try:
        lakehouses = spark.catalog.listDatabases()
        return [lakehouse.name for lakehouse in lakehouses]
    except Exception as e:
        print(f"Error listing lakehouses: {str(e)}")
        return []

# Main Execution
spark.conf.set("spark.databricks.delta.retentionDurationCheck.enabled", "false")

lakeList = listLakes()

for lake in lakeList:
    print(f"\nProcessing lakehouse: {lake}")
    cleanLakehouse(lake)
Leave a Comment