Untitled
import os import pyarrow.parquet as pq from pyiceberg.catalog import load_catalog from pyiceberg.schema import Schema from pyiceberg.types import StringType, IntegerType from pyiceberg.io.pyarrow import PyArrowFileIO # Configuration CATALOG_NAME = "my_catalog" NAMESPACE = "default" TABLE_NAME = "my_table" PARQUET_FILE_PATH = "data-with-nulls.parquet" # Define Iceberg schema def define_schema(): return Schema( fields=[ ("id", IntegerType(), False), # Non-nullable ("name", StringType(), True), # Nullable ("age", IntegerType(), True), # Nullable ] ) # Read Parquet file and validate null handling def read_parquet_file(parquet_file_path): table = pq.read_table(parquet_file_path) print("Parquet file schema:") print(table.schema) print("Data preview:") print(table.to_pandas()) # Preview data with nulls return table # Add Parquet file to Iceberg table def add_parquet_to_iceberg(parquet_file_path, table): # Use PyArrowFileIO to describe the Parquet file file_io = PyArrowFileIO() data_file = file_io.new_input(parquet_file_path).describe() # Register the file with Iceberg metadata with table.new_transaction() as transaction: transaction.add_file(data_file) transaction.commit() print(f"Added Parquet file to table {table.identifier}") def main(): # Load the Parquet file and validate parquet_table = read_parquet_file(PARQUET_FILE_PATH) # Load or create the Iceberg table catalog = load_catalog(CATALOG_NAME) try: iceberg_table = catalog.load_table((NAMESPACE, TABLE_NAME)) print(f"Loaded Iceberg table {NAMESPACE}.{TABLE_NAME}.") except Exception: # Create the table if it doesn't exist schema = define_schema() iceberg_table = catalog.create_table( identifier=(NAMESPACE, TABLE_NAME), schema=schema, partition_spec=None, ) print(f"Created Iceberg table {NAMESPACE}.{TABLE_NAME}.") # Add the Parquet file to the Iceberg table add_parquet_to_iceberg(PARQUET_FILE_PATH, iceberg_table) if __name__ == "__main__": main()
Leave a Comment