Untitled

 avatar
unknown
python
a year ago
6.0 kB
7
Indexable
import pandas as pd
import requests
from bs4 import BeautifulSoup
from urllib.parse import urljoin
from concurrent.futures import ThreadPoolExecutor, as_completed
import logging
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
from multiprocessing import cpu_count

# Setup logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

@retry(
    stop=stop_after_attempt(5), 
    wait=wait_exponential(multiplier=1, min=2, max=10),
    retry=retry_if_exception_type((requests.ConnectionError, requests.Timeout, requests.HTTPError))
)
def fetch_html_content(url):
    try:
        response = requests.get(url, timeout=20)
        response.raise_for_status()
        return response.text
    except (requests.ConnectionError, requests.Timeout, requests.HTTPError) as e:
        logging.warning(f"Error fetching {url}: {e}")
        raise
    except Exception as e:
        logging.error(f"Unexpected error fetching {url}: {e}")
        raise

def parse_html_table(html_content, base_url, table_id='tblPermit'):
    soup = BeautifulSoup(html_content, 'html.parser')
    table = soup.find('table', {'id': table_id})
    if table is None:
        return pd.DataFrame()

    headers = [th.text.strip() for th in table.find('thead').find_all('th')]
    rows = []
    for tr in table.find('tbody').find_all('tr'):
        cells = tr.find_all('td')
        row = [cell.get_text(strip=True) for cell in cells]
        # Extract Permit Number URL
        permit_number_url = cells[0].find('a')['href'] if cells[0].find('a') else ''
        permit_number_url = urljoin(base_url, permit_number_url)  # Ensure the full URL is retained
        row.append(permit_number_url)
        rows.append(row)

    headers.append('Permit Number URL')
    return pd.DataFrame(rows, columns=headers)

def extract_table_to_dataframe(url, table_id='tblPermit'):
    """Extract a table from a URL and return as a DataFrame."""
    try:
        html_content = fetch_html_content(url)
        if html_content:
            return parse_html_table(html_content, url, table_id)
    except Exception as e:
        logging.error(f"Failed to fetch data from {url} after retries: {e}")
    return pd.DataFrame()

def fetch_all_tables(urls, max_workers=150):
    """Fetch all tables concurrently and track progress."""
    url_to_df_map = {}
    total_urls = len(urls)
    processed_count = 0

    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        future_to_url = {executor.submit(extract_table_to_dataframe, url): url for url in urls}
        for future in as_completed(future_to_url):
            url = future_to_url[future]
            try:
                df = future.result()
                url_to_df_map[url] = df
            except Exception as e:
                logging.error(f"Error processing {url}: {e}")
            processed_count += 1
            if processed_count % 100 == 0 or processed_count == total_urls:
                logging.info(f"Processed {processed_count} / {total_urls} URLs")

    return url_to_df_map

def process_chunk(chunk):
    """Process a chunk of the DataFrame."""
    df, parcel_column, links_column, url_to_df_map = chunk
    roof_df_list = []
    others_df_list = []

    for _, row in df.iterrows():
        parcel_number = row[parcel_column]
        url = row[links_column]
        data = url_to_df_map.get(url, pd.DataFrame())
        if not data.empty:
            data['Parcel Number'] = parcel_number  # Add parcel number to DataFrame
            data = data[['Permit Number', 'Description', 'Issue Date', 'Estimated Value', 'Parcel Number', 'Permit Number URL']]  # Reorder columns
            if 'Description' in data.columns:
                roof_df_list.append(data[data['Description'] == 'ROOF'])
                others_df_list.append(data[data['Description'] != 'ROOF'])
            else:
                others_df_list.append(data)

    roof_df = pd.concat(roof_df_list, ignore_index=True) if roof_df_list else pd.DataFrame()
    others_df = pd.concat(others_df_list, ignore_index=True) if others_df_list else pd.DataFrame()

    return roof_df, others_df

def read_and_process_csv(file_path, parcel_column, links_column):
    """Read and process a partial or entire CSV file."""
    # Read the CSV file up to the specified number of lines
    df = pd.read_csv(file_path)
    total_records = len(df)
    logging.info(f"Total records to process: {total_records}")

    # Collect all unique URLs
    unique_urls = df[links_column].unique()
    logging.info(f"Total unique URLs: {len(unique_urls)}")

    # Fetch tables for all unique URLs synchronously
    url_to_df_map = fetch_all_tables(unique_urls)

    # Process data in parallel using threading
    chunk_size = max(1, len(df) // cpu_count())
    chunks = [(df[i:i + chunk_size], parcel_column, links_column, url_to_df_map) for i in range(0, len(df), chunk_size)]
    
    with ThreadPoolExecutor(max_workers=cpu_count()) as executor:
        results = list(executor.map(process_chunk, chunks))

    # Combine results
    roof_df_list, others_df_list = zip(*results)
    final_roof_df = pd.concat(roof_df_list, ignore_index=True) if roof_df_list else pd.DataFrame()
    final_others_df = pd.concat(others_df_list, ignore_index=True) if others_df_list else pd.DataFrame()

    # Write the final DataFrames to CSV files
    roof_csv_path = '/Users/darpab/Desktop/roof_data.csv'
    others_csv_path = '/Users/darpab/Desktop/others_data.csv'

    # Write headers and data to the CSV files
    final_roof_df.to_csv(roof_csv_path, mode='w', header=True, index=False)
    final_others_df.to_csv(others_csv_path, mode='w', header=True, index=False)

    logging.info(f"Total processed records: {total_records}")

csv_file_path = '/Users/darpab/Downloads/Scripts/Data/Address_RawData.csv'  # Update with your file path
parcel_column = 'Parcel Number'
links_column = 'Property Appraiser Links'

read_and_process_csv(csv_file_path, parcel_column, links_column)
Editor is loading...
Leave a Comment