Untitled
unknown
python
a year ago
6.0 kB
7
Indexable
import pandas as pd import requests from bs4 import BeautifulSoup from urllib.parse import urljoin from concurrent.futures import ThreadPoolExecutor, as_completed import logging from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type from multiprocessing import cpu_count # Setup logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') @retry( stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, min=2, max=10), retry=retry_if_exception_type((requests.ConnectionError, requests.Timeout, requests.HTTPError)) ) def fetch_html_content(url): try: response = requests.get(url, timeout=20) response.raise_for_status() return response.text except (requests.ConnectionError, requests.Timeout, requests.HTTPError) as e: logging.warning(f"Error fetching {url}: {e}") raise except Exception as e: logging.error(f"Unexpected error fetching {url}: {e}") raise def parse_html_table(html_content, base_url, table_id='tblPermit'): soup = BeautifulSoup(html_content, 'html.parser') table = soup.find('table', {'id': table_id}) if table is None: return pd.DataFrame() headers = [th.text.strip() for th in table.find('thead').find_all('th')] rows = [] for tr in table.find('tbody').find_all('tr'): cells = tr.find_all('td') row = [cell.get_text(strip=True) for cell in cells] # Extract Permit Number URL permit_number_url = cells[0].find('a')['href'] if cells[0].find('a') else '' permit_number_url = urljoin(base_url, permit_number_url) # Ensure the full URL is retained row.append(permit_number_url) rows.append(row) headers.append('Permit Number URL') return pd.DataFrame(rows, columns=headers) def extract_table_to_dataframe(url, table_id='tblPermit'): """Extract a table from a URL and return as a DataFrame.""" try: html_content = fetch_html_content(url) if html_content: return parse_html_table(html_content, url, table_id) except Exception as e: logging.error(f"Failed to fetch data from {url} after retries: {e}") return pd.DataFrame() def fetch_all_tables(urls, max_workers=150): """Fetch all tables concurrently and track progress.""" url_to_df_map = {} total_urls = len(urls) processed_count = 0 with ThreadPoolExecutor(max_workers=max_workers) as executor: future_to_url = {executor.submit(extract_table_to_dataframe, url): url for url in urls} for future in as_completed(future_to_url): url = future_to_url[future] try: df = future.result() url_to_df_map[url] = df except Exception as e: logging.error(f"Error processing {url}: {e}") processed_count += 1 if processed_count % 100 == 0 or processed_count == total_urls: logging.info(f"Processed {processed_count} / {total_urls} URLs") return url_to_df_map def process_chunk(chunk): """Process a chunk of the DataFrame.""" df, parcel_column, links_column, url_to_df_map = chunk roof_df_list = [] others_df_list = [] for _, row in df.iterrows(): parcel_number = row[parcel_column] url = row[links_column] data = url_to_df_map.get(url, pd.DataFrame()) if not data.empty: data['Parcel Number'] = parcel_number # Add parcel number to DataFrame data = data[['Permit Number', 'Description', 'Issue Date', 'Estimated Value', 'Parcel Number', 'Permit Number URL']] # Reorder columns if 'Description' in data.columns: roof_df_list.append(data[data['Description'] == 'ROOF']) others_df_list.append(data[data['Description'] != 'ROOF']) else: others_df_list.append(data) roof_df = pd.concat(roof_df_list, ignore_index=True) if roof_df_list else pd.DataFrame() others_df = pd.concat(others_df_list, ignore_index=True) if others_df_list else pd.DataFrame() return roof_df, others_df def read_and_process_csv(file_path, parcel_column, links_column): """Read and process a partial or entire CSV file.""" # Read the CSV file up to the specified number of lines df = pd.read_csv(file_path) total_records = len(df) logging.info(f"Total records to process: {total_records}") # Collect all unique URLs unique_urls = df[links_column].unique() logging.info(f"Total unique URLs: {len(unique_urls)}") # Fetch tables for all unique URLs synchronously url_to_df_map = fetch_all_tables(unique_urls) # Process data in parallel using threading chunk_size = max(1, len(df) // cpu_count()) chunks = [(df[i:i + chunk_size], parcel_column, links_column, url_to_df_map) for i in range(0, len(df), chunk_size)] with ThreadPoolExecutor(max_workers=cpu_count()) as executor: results = list(executor.map(process_chunk, chunks)) # Combine results roof_df_list, others_df_list = zip(*results) final_roof_df = pd.concat(roof_df_list, ignore_index=True) if roof_df_list else pd.DataFrame() final_others_df = pd.concat(others_df_list, ignore_index=True) if others_df_list else pd.DataFrame() # Write the final DataFrames to CSV files roof_csv_path = '/Users/darpab/Desktop/roof_data.csv' others_csv_path = '/Users/darpab/Desktop/others_data.csv' # Write headers and data to the CSV files final_roof_df.to_csv(roof_csv_path, mode='w', header=True, index=False) final_others_df.to_csv(others_csv_path, mode='w', header=True, index=False) logging.info(f"Total processed records: {total_records}") csv_file_path = '/Users/darpab/Downloads/Scripts/Data/Address_RawData.csv' # Update with your file path parcel_column = 'Parcel Number' links_column = 'Property Appraiser Links' read_and_process_csv(csv_file_path, parcel_column, links_column)
Editor is loading...
Leave a Comment