Untitled

 avatar
unknown
plain_text
a month ago
2.2 kB
2
Indexable
import os
import pyarrow.parquet as pq
from pyiceberg.catalog import load_catalog
from pyiceberg.schema import Schema
from pyiceberg.types import StringType, IntegerType
from pyiceberg.io.pyarrow import PyArrowFileIO

# Configuration
CATALOG_NAME = "my_catalog"
NAMESPACE = "default"
TABLE_NAME = "my_table"
PARQUET_FILE_PATH = "data-with-nulls.parquet"

# Define Iceberg schema
def define_schema():
    return Schema(
        fields=[
            ("id", IntegerType(), False),  # Non-nullable
            ("name", StringType(), True),  # Nullable
            ("age", IntegerType(), True),  # Nullable
        ]
    )

# Read Parquet file and validate null handling
def read_parquet_file(parquet_file_path):
    table = pq.read_table(parquet_file_path)
    print("Parquet file schema:")
    print(table.schema)
    print("Data preview:")
    print(table.to_pandas())  # Preview data with nulls
    return table

# Add Parquet file to Iceberg table
def add_parquet_to_iceberg(parquet_file_path, table):
    # Use PyArrowFileIO to describe the Parquet file
    file_io = PyArrowFileIO()
    data_file = file_io.new_input(parquet_file_path).describe()

    # Register the file with Iceberg metadata
    with table.new_transaction() as transaction:
        transaction.add_file(data_file)
        transaction.commit()
    print(f"Added Parquet file to table {table.identifier}")

def main():
    # Load the Parquet file and validate
    parquet_table = read_parquet_file(PARQUET_FILE_PATH)

    # Load or create the Iceberg table
    catalog = load_catalog(CATALOG_NAME)
    try:
        iceberg_table = catalog.load_table((NAMESPACE, TABLE_NAME))
        print(f"Loaded Iceberg table {NAMESPACE}.{TABLE_NAME}.")
    except Exception:
        # Create the table if it doesn't exist
        schema = define_schema()
        iceberg_table = catalog.create_table(
            identifier=(NAMESPACE, TABLE_NAME),
            schema=schema,
            partition_spec=None,
        )
        print(f"Created Iceberg table {NAMESPACE}.{TABLE_NAME}.")

    # Add the Parquet file to the Iceberg table
    add_parquet_to_iceberg(PARQUET_FILE_PATH, iceberg_table)

if __name__ == "__main__":
    main()
Leave a Comment