Untitled
unknown
plain_text
10 days ago
9.9 kB
2
Indexable
Never
# Databricks notebook source # COMMAND ---------- # Import necessary modules and functions from pyspark.sql import SparkSession from pyspark.sql.functions import col, explode_outer, from_json from pyspark.sql.types import ( StructType, StructField, StringType, ArrayType, BooleanType, IntegerType, TimestampType ) # COMMAND ---------- # Adjust the system path to include the directory with common functions import sys sys.path.append( "/Workspace" + dbutils.notebook.entry_point.getDbutils() .notebook() .getContext() .notebookPath() .get() .rsplit("/", 1)[0] + "/../common" ) # COMMAND ---------- # Define table name and create the Sports table table_name = "slvr_crtd_nrt_back.tab_info_service_sports" # Replace 'your_database' with your actual database name spark.sql(f""" CREATE TABLE IF NOT EXISTS {table_name} ( id STRING NOT NULL, name STRING, displayName STRING, spectrumId STRING, sameGame BOOLEAN, self_link STRING, selfTemplate_link STRING, competitions_link STRING, footytab_link STRING, tournaments_link STRING, matches_link STRING, markets_link STRING, markets_all_channels_link STRING ) USING DELTA CLUSTER BY (id); """) # COMMAND ---------- # Add comments to each column in the Sports table spark.sql(f"""ALTER TABLE {table_name} ALTER COLUMN id COMMENT 'Unique identifier for the sport';""") spark.sql(f"""ALTER TABLE {table_name} ALTER COLUMN name COMMENT 'Name of the sport';""") spark.sql(f"""ALTER TABLE {table_name} ALTER COLUMN displayName COMMENT 'Display name of the sport';""") spark.sql(f"""ALTER TABLE {table_name} ALTER COLUMN spectrumId COMMENT 'Spectrum identifier';""") spark.sql(f"""ALTER TABLE {table_name} ALTER COLUMN sameGame COMMENT 'Indicator if same game is available';""") # Add comments for links spark.sql(f"""ALTER TABLE {table_name} ALTER COLUMN self_link COMMENT 'Self link';""") spark.sql(f"""ALTER TABLE {table_name} ALTER COLUMN selfTemplate_link COMMENT 'Self template link';""") spark.sql(f"""ALTER TABLE {table_name} ALTER COLUMN competitions_link COMMENT 'Competitions link';""") spark.sql(f"""ALTER TABLE {table_name} ALTER COLUMN footytab_link COMMENT 'Footytab link';""") spark.sql(f"""ALTER TABLE {table_name} ALTER COLUMN tournaments_link COMMENT 'Tournaments link';""") spark.sql(f"""ALTER TABLE {table_name} ALTER COLUMN matches_link COMMENT 'Matches link';""") spark.sql(f"""ALTER TABLE {table_name} ALTER COLUMN markets_link COMMENT 'Markets link';""") spark.sql(f"""ALTER TABLE {table_name} ALTER COLUMN markets_all_channels_link COMMENT 'Markets all channels link';""") # COMMAND ---------- # List of all column names for the Sports table columns = [ "id", "name", "displayName", "spectrumId", "sameGame", "self_link", "selfTemplate_link", "competitions_link", "footytab_link", "tournaments_link", "matches_link", "markets_link", "markets_all_channels_link" ] # Set 'Internal' classification for all columns in the Sports table for col_name in columns: spark.sql(f"""ALTER TABLE {table_name} ALTER COLUMN {col_name} SET TAGS ('Classification' = 'Internal');""") # COMMAND ---------- # Update table properties for Delta Lake features in the Sports table spark.sql(f"""ALTER TABLE {table_name} SET TBLPROPERTIES ( delta.enableChangeDataFeed = true, delta.logRetentionDuration = 'interval 90 days', delta.deletedFileRetentionDuration = 'interval 90 days' );""") # COMMAND ---------- # Define table name and create the Competitions table table_name = "slvr_crtd_nrt_back.tab_info_service_competitions" spark.sql(f""" CREATE TABLE IF NOT EXISTS {table_name} ( sport_id STRING NOT NULL, id STRING NOT NULL, name STRING, spectrumId STRING, hasMarkets BOOLEAN, sameGame BOOLEAN, self_link STRING, selfTemplate_link STRING, competitions_link STRING, footytab_link STRING, tournaments_link STRING, matches_link STRING, markets_link STRING, markets_all_channels_link STRING ) USING DELTA CLUSTER BY (sport_id, id); """) # COMMAND ---------- # Add comments to each column in the Competitions table spark.sql(f"""ALTER TABLE {table_name} ALTER COLUMN sport_id COMMENT 'Foreign key referencing sports.id';""") spark.sql(f"""ALTER TABLE {table_name} ALTER COLUMN id COMMENT 'Unique identifier for the competition';""") spark.sql(f"""ALTER TABLE {table_name} ALTER COLUMN name COMMENT 'Name of the competition';""") spark.sql(f"""ALTER TABLE {table_name} ALTER COLUMN spectrumId COMMENT 'Spectrum identifier';""") spark.sql(f"""ALTER TABLE {table_name} ALTER COLUMN hasMarkets COMMENT 'Indicator if markets are available';""") spark.sql(f"""ALTER TABLE {table_name} ALTER COLUMN sameGame COMMENT 'Indicator if same game is available';""") # Add comments for links spark.sql(f"""ALTER TABLE {table_name} ALTER COLUMN self_link COMMENT 'Self link';""") spark.sql(f"""ALTER TABLE {table_name} ALTER COLUMN selfTemplate_link COMMENT 'Self template link';""") spark.sql(f"""ALTER TABLE {table_name} ALTER COLUMN competitions_link COMMENT 'Competitions link';""") spark.sql(f"""ALTER TABLE {table_name} ALTER COLUMN footytab_link COMMENT 'Footytab link';""") spark.sql(f"""ALTER TABLE {table_name} ALTER COLUMN tournaments_link COMMENT 'Tournaments link';""") spark.sql(f"""ALTER TABLE {table_name} ALTER COLUMN matches_link COMMENT 'Matches link';""") spark.sql(f"""ALTER TABLE {table_name} ALTER COLUMN markets_link COMMENT 'Markets link';""") spark.sql(f"""ALTER TABLE {table_name} ALTER COLUMN markets_all_channels_link COMMENT 'Markets all channels link';""") # COMMAND ---------- # List of all column names for the Competitions table columns = [ "sport_id", "id", "name", "spectrumId", "hasMarkets", "sameGame", "self_link", "selfTemplate_link", "competitions_link", "footytab_link", "tournaments_link", "matches_link", "markets_link", "markets_all_channels_link" ] # Set 'Internal' classification for all columns in the Competitions table for col_name in columns: spark.sql(f"""ALTER TABLE {table_name} ALTER COLUMN {col_name} SET TAGS ('Classification' = 'Internal');""") # COMMAND ---------- # Update table properties for Delta Lake features in the Competitions table spark.sql(f"""ALTER TABLE {table_name} SET TBLPROPERTIES ( delta.enableChangeDataFeed = true, delta.logRetentionDuration = 'interval 90 days', delta.deletedFileRetentionDuration = 'interval 90 days' );""") # COMMAND ---------- # Define table name and create the Tournaments table table_name = "slvr_crtd_nrt_back.tab_info_service_tournaments" spark.sql(f""" CREATE TABLE IF NOT EXISTS {table_name} ( sport_id STRING NOT NULL, competition_id STRING NOT NULL, id STRING NOT NULL, name STRING, spectrumId STRING, sameGame BOOLEAN, self_link STRING, selfTemplate_link STRING, competitions_link STRING, footytab_link STRING, tournaments_link STRING, matches_link STRING, markets_link STRING, markets_all_channels_link STRING ) USING DELTA CLUSTER BY (sport_id, competition_id, id); """) # COMMAND ---------- # Add comments to each column in the Tournaments table spark.sql(f"""ALTER TABLE {table_name} ALTER COLUMN sport_id COMMENT 'Foreign key referencing sports.id';""") spark.sql(f"""ALTER TABLE {table_name} ALTER COLUMN competition_id COMMENT 'Foreign key referencing competitions.id';""") spark.sql(f"""ALTER TABLE {table_name} ALTER COLUMN id COMMENT 'Unique identifier for the tournament';""") spark.sql(f"""ALTER TABLE {table_name} ALTER COLUMN name COMMENT 'Name of the tournament';""") spark.sql(f"""ALTER TABLE {table_name} ALTER COLUMN spectrumId COMMENT 'Spectrum identifier';""") spark.sql(f"""ALTER TABLE {table_name} ALTER COLUMN sameGame COMMENT 'Indicator if same game is available';""") # Add comments for links spark.sql(f"""ALTER TABLE {table_name} ALTER COLUMN self_link COMMENT 'Self link';""") spark.sql(f"""ALTER TABLE {table_name} ALTER COLUMN selfTemplate_link COMMENT 'Self template link';""") spark.sql(f"""ALTER TABLE {table_name} ALTER COLUMN competitions_link COMMENT 'Competitions link';""") spark.sql(f"""ALTER TABLE {table_name} ALTER COLUMN footytab_link COMMENT 'Footytab link';""") spark.sql(f"""ALTER TABLE {table_name} ALTER COLUMN tournaments_link COMMENT 'Tournaments link';""") spark.sql(f"""ALTER TABLE {table_name} ALTER COLUMN matches_link COMMENT 'Matches link';""") spark.sql(f"""ALTER TABLE {table_name} ALTER COLUMN markets_link COMMENT 'Markets link';""") spark.sql(f"""ALTER TABLE {table_name} ALTER COLUMN markets_all_channels_link COMMENT 'Markets all channels link';""") # COMMAND ---------- # List of all column names for the Tournaments table columns = [ "sport_id", "competition_id", "id", "name", "spectrumId", "sameGame", "self_link", "selfTemplate_link", "competitions_link", "footytab_link", "tournaments_link", "matches_link", "markets_link", "markets_all_channels_link" ] # Set 'Internal' classification for all columns in the Tournaments table for col_name in columns: spark.sql(f"""ALTER TABLE {table_name} ALTER COLUMN {col_name} SET TAGS ('Classification' = 'Internal');""") # COMMAND ---------- # Update table properties for Delta Lake features in the Tournaments table spark.sql(f"""ALTER TABLE {table_name} SET TBLPROPERTIES ( delta.enableChangeDataFeed = true, delta.logRetentionDuration = 'interval 90 days', delta.deletedFileRetentionDuration = 'interval 90 days' );""") # COMMAND ---------- # Add any additional steps or functions below # For example, you might want to verify table creation or handle exceptions
Leave a Comment