Untitled
unknown
python
a year ago
6.0 kB
10
Indexable
import pandas as pd
import requests
from bs4 import BeautifulSoup
from urllib.parse import urljoin
from concurrent.futures import ThreadPoolExecutor, as_completed
import logging
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
from multiprocessing import cpu_count
# Setup logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
@retry(
stop=stop_after_attempt(5),
wait=wait_exponential(multiplier=1, min=2, max=10),
retry=retry_if_exception_type((requests.ConnectionError, requests.Timeout, requests.HTTPError))
)
def fetch_html_content(url):
try:
response = requests.get(url, timeout=20)
response.raise_for_status()
return response.text
except (requests.ConnectionError, requests.Timeout, requests.HTTPError) as e:
logging.warning(f"Error fetching {url}: {e}")
raise
except Exception as e:
logging.error(f"Unexpected error fetching {url}: {e}")
raise
def parse_html_table(html_content, base_url, table_id='tblPermit'):
soup = BeautifulSoup(html_content, 'html.parser')
table = soup.find('table', {'id': table_id})
if table is None:
return pd.DataFrame()
headers = [th.text.strip() for th in table.find('thead').find_all('th')]
rows = []
for tr in table.find('tbody').find_all('tr'):
cells = tr.find_all('td')
row = [cell.get_text(strip=True) for cell in cells]
# Extract Permit Number URL
permit_number_url = cells[0].find('a')['href'] if cells[0].find('a') else ''
permit_number_url = urljoin(base_url, permit_number_url) # Ensure the full URL is retained
row.append(permit_number_url)
rows.append(row)
headers.append('Permit Number URL')
return pd.DataFrame(rows, columns=headers)
def extract_table_to_dataframe(url, table_id='tblPermit'):
"""Extract a table from a URL and return as a DataFrame."""
try:
html_content = fetch_html_content(url)
if html_content:
return parse_html_table(html_content, url, table_id)
except Exception as e:
logging.error(f"Failed to fetch data from {url} after retries: {e}")
return pd.DataFrame()
def fetch_all_tables(urls, max_workers=150):
"""Fetch all tables concurrently and track progress."""
url_to_df_map = {}
total_urls = len(urls)
processed_count = 0
with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_url = {executor.submit(extract_table_to_dataframe, url): url for url in urls}
for future in as_completed(future_to_url):
url = future_to_url[future]
try:
df = future.result()
url_to_df_map[url] = df
except Exception as e:
logging.error(f"Error processing {url}: {e}")
processed_count += 1
if processed_count % 100 == 0 or processed_count == total_urls:
logging.info(f"Processed {processed_count} / {total_urls} URLs")
return url_to_df_map
def process_chunk(chunk):
"""Process a chunk of the DataFrame."""
df, parcel_column, links_column, url_to_df_map = chunk
roof_df_list = []
others_df_list = []
for _, row in df.iterrows():
parcel_number = row[parcel_column]
url = row[links_column]
data = url_to_df_map.get(url, pd.DataFrame())
if not data.empty:
data['Parcel Number'] = parcel_number # Add parcel number to DataFrame
data = data[['Permit Number', 'Description', 'Issue Date', 'Estimated Value', 'Parcel Number', 'Permit Number URL']] # Reorder columns
if 'Description' in data.columns:
roof_df_list.append(data[data['Description'] == 'ROOF'])
others_df_list.append(data[data['Description'] != 'ROOF'])
else:
others_df_list.append(data)
roof_df = pd.concat(roof_df_list, ignore_index=True) if roof_df_list else pd.DataFrame()
others_df = pd.concat(others_df_list, ignore_index=True) if others_df_list else pd.DataFrame()
return roof_df, others_df
def read_and_process_csv(file_path, parcel_column, links_column):
"""Read and process a partial or entire CSV file."""
# Read the CSV file up to the specified number of lines
df = pd.read_csv(file_path)
total_records = len(df)
logging.info(f"Total records to process: {total_records}")
# Collect all unique URLs
unique_urls = df[links_column].unique()
logging.info(f"Total unique URLs: {len(unique_urls)}")
# Fetch tables for all unique URLs synchronously
url_to_df_map = fetch_all_tables(unique_urls)
# Process data in parallel using threading
chunk_size = max(1, len(df) // cpu_count())
chunks = [(df[i:i + chunk_size], parcel_column, links_column, url_to_df_map) for i in range(0, len(df), chunk_size)]
with ThreadPoolExecutor(max_workers=cpu_count()) as executor:
results = list(executor.map(process_chunk, chunks))
# Combine results
roof_df_list, others_df_list = zip(*results)
final_roof_df = pd.concat(roof_df_list, ignore_index=True) if roof_df_list else pd.DataFrame()
final_others_df = pd.concat(others_df_list, ignore_index=True) if others_df_list else pd.DataFrame()
# Write the final DataFrames to CSV files
roof_csv_path = '/Users/darpab/Desktop/roof_data.csv'
others_csv_path = '/Users/darpab/Desktop/others_data.csv'
# Write headers and data to the CSV files
final_roof_df.to_csv(roof_csv_path, mode='w', header=True, index=False)
final_others_df.to_csv(others_csv_path, mode='w', header=True, index=False)
logging.info(f"Total processed records: {total_records}")
csv_file_path = '/Users/darpab/Downloads/Scripts/Data/Address_RawData.csv' # Update with your file path
parcel_column = 'Parcel Number'
links_column = 'Property Appraiser Links'
read_and_process_csv(csv_file_path, parcel_column, links_column)Editor is loading...
Leave a Comment