Untitled

 avatar
unknown
python
3 months ago
7.6 kB
12
No Index
import requests
from bs4 import BeautifulSoup, SoupStrainer
from datetime import datetime, timedelta
from concurrent.futures import ThreadPoolExecutor
import json
import lxml
#import cchardet
from functools import wraps
import time

def timeit(func):
    @wraps(func)
    def timeit_wrapper(*args, **kwargs):
        start_time = time.perf_counter()
        result = func(*args, **kwargs)
        end_time = time.perf_counter()
        total_time = end_time - start_time
        print(f'Function {func.__name__}{args} {kwargs} Took {total_time:.4f} seconds')
        return result
    return timeit_wrapper

def get_current_date():
    """Returns the current date as a string formatted as DD/MM/YYYY."""
    return datetime.now().strftime("%d/%m/%Y")

def fetch_flight_data(direction, nexthours=None):
    """Fetch flight data and only parse the necessary information."""
    current_date = get_current_date()
    url = f"https://www.euroairport.com/en/passengers-visitors/arrivals-departures/flights/flightsearch.html?direction={direction}&date={current_date}"
    response = requests.get(url)

    soup = BeautifulSoup(response.text, 'lxml') #'html.parser')

    flights = []
    rows = soup.find_all('tr', class_='flights-table-primary')  # Get flight rows

    # Parse flight data (arrival time first to filter)
    for row in rows:
        cells = row.find_all('td')
        if len(cells) > 7:
            arrival_time = cells[1].text.strip()  # Scheduled arrival time
            expected_time = cells[2].text.strip()  # Expected time (if available)

            # Use expected time if available, otherwise use the scheduled arrival time
            flight_time_str = expected_time if expected_time else arrival_time

            try:
                flight_time = datetime.strptime(flight_time_str, "%H:%M")
                flight_time = flight_time.replace(year=datetime.now().year, month=datetime.now().month, day=datetime.now().day)

                # Skip flights that are outside the search range
                if nexthours:
                    time_limit = datetime.now() + timedelta(hours=nexthours)
                    if not (datetime.now() <= flight_time <= time_limit):
                        continue  # Skip this flight if it is outside the valid window
            except ValueError:
                continue  # Skip invalid time format

            # If the flight is within the range, proceed to gather additional info
            flight_info = {
                'date': cells[0].text.strip(),
                'arrival_time': flight_time_str,  # Use the valid arrival time (either expected or scheduled)
                'from': cells[3].text.strip(),
                'airline': cells[4].text.strip(),
                'flight_number': cells[5].text.strip(),
                'status': cells[6].text.strip(),
                'note': cells[7].text.strip(),  # Cargo or not
                'direction': "A" if direction == 'arrival' else "D"  # A for arrival, D for departure
            }

            # Look for expected time (if available)
            if expected_time:
                flight_info['arrival_time'] = expected_time  # Use the expected time if available

            # Find the secondary row for this flight to get aircraft type
            secondary_row = row.find_next('tr', class_='flights-table-secondary')
            if secondary_row:
                # Look for all occurrences of "Aircraft type" in the secondary row
                aircraft_type_info = secondary_row.find_all('div', class_='flights-table-secondary-info-content')

                # Extract the aircraft type from the second-to-last occurrence
                if len(aircraft_type_info) >= 4:
                    aircraft_type = aircraft_type_info[-2].text.strip()  # Use the second-to-last element
                    # Ensure that the extracted aircraft type is valid
                    if any(char.isalpha() for char in aircraft_type):
                        flight_info['aircraft'] = aircraft_type
                    else:
                        flight_info['aircraft'] = 'Unknown'
                else:
                    flight_info['aircraft'] = 'Unknown'

            # Check if the flight is a cargo flight (contains the word "Cargo" in the note)
            if 'Cargo' in flight_info['note']:
                flight_info['aircraft'] += " (C)"  # Append (C) if it's a cargo flight

            flights.append(flight_info)
    return flights

def filter_upcoming_flights(flights):
    """Filter out flights that have already passed and calculate time left."""
    current_time = datetime.now()
    upcoming_flights = []

    # Filter flights based on the current time
    for flight in flights:
        try:
            flight_time = datetime.strptime(flight['arrival_time'], "%H:%M")
            flight_time = flight_time.replace(year=current_time.year, month=current_time.month, day=current_time.day)

            # Skip if the flight time is already passed
            if flight_time < current_time:
                continue

            time_left = flight_time - current_time
            flight['time_left'] = time_left
            upcoming_flights.append(flight)

        except Exception as e:
            print(f"Error processing flight {flight['flight_number']}: {e}")

    return upcoming_flights

def format_flight(flight):
    """Format the flight information for output."""
    direction = flight['direction']  # Directly use 'A' or 'D'
    time_left = flight['time_left']
    arrival_time = flight['arrival_time']  # Add the arrival time (expected or scheduled)

    # Calculate hours and minutes
    hours = time_left.seconds // 3600
    minutes = (time_left.seconds // 60) % 60

    # Format the time as hhhmin or xxmin with zero-padded minutes
    if hours > 0:
        formatted_time = f"{hours}h{minutes:02}min"
    else:
        formatted_time = f"{minutes:02}min"

    return f"* {direction} - in {formatted_time} - {arrival_time} - {flight['from']} ({flight['flight_number']}) - {flight['aircraft']}"

# Define a custom function to serialize datetime objects 
def serialize_datetime(obj): 
    if isinstance(obj, timedelta):
        # Calculate hours and minutes
        hours = obj.seconds // 3600
        minutes = (obj.seconds // 60) % 60
        if hours>0:
            return f"{hours}h{minutes:02}min"
        else:
            return f"{minutes}min"
    raise TypeError("Type not serializable" + str(type(obj))) 

#@timeit
def combine_departures_and_arrivals(nexthours=None):
    """Combine and display flights within the specified hours."""
    # Fetch arrival and departure data concurrently
    with ThreadPoolExecutor(max_workers=2) as executor:
        arrivals_future = executor.submit(fetch_flight_data, 'arrival', nexthours)
        departures_future = executor.submit(fetch_flight_data, 'departure', nexthours)

        arrivals = arrivals_future.result()
        departures = departures_future.result()

    # Combine the flights and filter for the upcoming flights
    all_flights = arrivals + departures
    all_flights = filter_upcoming_flights(all_flights)

    # Sort the flights by time left and display them
    all_flights = sorted(all_flights, key=lambda x: x['time_left'])
    
    # CLI output
    #for flight in all_flights:
    #    print(format_flight(flight) )

    # json output
    print(json.dumps(all_flights, default=serialize_datetime))

# Run the function to combine and display the flights for the next 2 hours (example)
combine_departures_and_arrivals(nexthours=1)
Editor is loading...
Leave a Comment