Untitled

mail@pastecode.io avatar
unknown
python
a month ago
4.0 kB
8
Indexable
Never
from collections import defaultdict
from typing import Dict, Tuple, List


# edge of directed graph
class Edge:
    def __init__(
        self,
        u: int,
        v: int,
        weight: int,
        flow: int = 0,
    ):
        self.u = u
        self.v = v
        self.weight = weight
        self.flow = flow

    def add_flow(self, flow: int):
        self.flow += flow


def convert_edge_to_map(edge_graph: List[Edge]) -> Dict[int, Edge]:
    graph = defaultdict(list)
    for edge in edge_graph:
        graph[edge.u].append(edge)
    return graph


def build_init_graph() -> List[Edge]:
    return [
        Edge(1, 2, 4),
        Edge(1, 3, 3),
        Edge(2, 4, 4),
        Edge(4, 3, 3),
        Edge(3, 5, 6),
        Edge(4, 6, 2),
        Edge(5, 6, 6),
    ]


def build_init_graph_1() -> List[Edge]:
    return []


def build_init_graph_2() -> List[Edge]:
    return [
        Edge(1, 2, 8),
        Edge(1, 3, 7),
        Edge(1, 4, 4),
        Edge(2, 3, 2),
        Edge(3, 4, 5),
        Edge(2, 5, 3),
        Edge(2, 6, 9),
        Edge(3, 6, 6),
        Edge(4, 6, 7),
        Edge(4, 7, 2),
        Edge(6, 5, 3),
        Edge(6, 7, 4),
        Edge(5, 8, 9),
        Edge(6, 8, 5),
        Edge(7, 8, 8),
    ]


# find augmented path using BFS
def find_augmented_path(
    map_graph: Dict[int, Edge], source_node: int, dest_node: int
) -> List[Edge]:

    def trace_path(
        path_mapper: Dict[Tuple[int, int], Edge],
        visited_parent: List[int],
        target: int,
        source: int,
    ):
        path = []
        while target != source:
            parent = visited_parent[target]
            path.append(path_mapper[(parent, target)])
            target = parent
        return path

    queue = []
    queue.append(source_node)
    visited = set()
    path_mapper = dict()
    parent_visited = [-1] * (len(map_graph) + 1)
    while queue:
        current_node = queue.pop(0)
        if current_node == dest_node:
            return trace_path(path_mapper, parent_visited, dest_node, source_node)
        visited.add(current_node)
        neighbors = map_graph[current_node]
        for neighbor in neighbors:
            if neighbor.flow > 0 and neighbor.v not in visited:
                queue.append(neighbor.v)
                path_mapper[(current_node, neighbor.v)] = neighbor
                parent_visited[neighbor.v] = current_node
    return []


def get_min_value_of_path(edges: List[Edge]) -> int:
    if not edges:
        return -1
    return min(edges, key=lambda x: x.flow).flow


def find_residual_graph(edges: List[Edge]) -> List[Edge]:
    return [Edge(e.v, e.u, e.weight, e.weight - e.flow) for e in edges]


def alter_flow(edges: List[Edge], paths: List[Edge], residual_value: int) -> None:
    edge_mapper = set()
    for path in paths:
        edge_mapper.add((path.v, path.u))
    for edge in edges:
        if (edge.u, edge.v) in edge_mapper:
            edge.add_flow(residual_value)


# ford-folkerson algorithm, O(V^2 * E)
def min_max_flow(edges: List[Edge], source: int, target: int) -> int:
    def find_residual_aug_path(residual_graph: List[Edge]):
        return find_augmented_path(convert_edge_to_map(residual_graph), target, source)

    residual_graph = find_residual_graph(edges)
    path = find_residual_aug_path(residual_graph)
    flow = 0
    while path:
        residual_capacity = get_min_value_of_path(path)
        flow += residual_capacity
        alter_flow(edges, path, residual_capacity)
        path = find_residual_aug_path(find_residual_graph(edges))
    return flow


# BFS to find the augmented path

if __name__ == "__main__":
    edge_graph = build_init_graph()
    print(min_max_flow(edge_graph, 1, 6))
    print(min_max_flow(build_init_graph_1(), 1, 6))
    print(min_max_flow(build_init_graph_2(), 1, 8))
Leave a Comment