Untitled
unknown
plain_text
10 months ago
2.2 kB
6
Indexable
import os
import pyarrow.parquet as pq
from pyiceberg.catalog import load_catalog
from pyiceberg.schema import Schema
from pyiceberg.types import StringType, IntegerType
from pyiceberg.io.pyarrow import PyArrowFileIO
# Configuration
CATALOG_NAME = "my_catalog"
NAMESPACE = "default"
TABLE_NAME = "my_table"
PARQUET_FILE_PATH = "data-with-nulls.parquet"
# Define Iceberg schema
def define_schema():
return Schema(
fields=[
("id", IntegerType(), False), # Non-nullable
("name", StringType(), True), # Nullable
("age", IntegerType(), True), # Nullable
]
)
# Read Parquet file and validate null handling
def read_parquet_file(parquet_file_path):
table = pq.read_table(parquet_file_path)
print("Parquet file schema:")
print(table.schema)
print("Data preview:")
print(table.to_pandas()) # Preview data with nulls
return table
# Add Parquet file to Iceberg table
def add_parquet_to_iceberg(parquet_file_path, table):
# Use PyArrowFileIO to describe the Parquet file
file_io = PyArrowFileIO()
data_file = file_io.new_input(parquet_file_path).describe()
# Register the file with Iceberg metadata
with table.new_transaction() as transaction:
transaction.add_file(data_file)
transaction.commit()
print(f"Added Parquet file to table {table.identifier}")
def main():
# Load the Parquet file and validate
parquet_table = read_parquet_file(PARQUET_FILE_PATH)
# Load or create the Iceberg table
catalog = load_catalog(CATALOG_NAME)
try:
iceberg_table = catalog.load_table((NAMESPACE, TABLE_NAME))
print(f"Loaded Iceberg table {NAMESPACE}.{TABLE_NAME}.")
except Exception:
# Create the table if it doesn't exist
schema = define_schema()
iceberg_table = catalog.create_table(
identifier=(NAMESPACE, TABLE_NAME),
schema=schema,
partition_spec=None,
)
print(f"Created Iceberg table {NAMESPACE}.{TABLE_NAME}.")
# Add the Parquet file to the Iceberg table
add_parquet_to_iceberg(PARQUET_FILE_PATH, iceberg_table)
if __name__ == "__main__":
main()
Editor is loading...
Leave a Comment