Untitled
unknown
python
a year ago
4.0 kB
14
Indexable
from collections import defaultdict from typing import Dict, Tuple, List # edge of directed graph class Edge: def __init__( self, u: int, v: int, weight: int, flow: int = 0, ): self.u = u self.v = v self.weight = weight self.flow = flow def add_flow(self, flow: int): self.flow += flow def convert_edge_to_map(edge_graph: List[Edge]) -> Dict[int, Edge]: graph = defaultdict(list) for edge in edge_graph: graph[edge.u].append(edge) return graph def build_init_graph() -> List[Edge]: return [ Edge(1, 2, 4), Edge(1, 3, 3), Edge(2, 4, 4), Edge(4, 3, 3), Edge(3, 5, 6), Edge(4, 6, 2), Edge(5, 6, 6), ] def build_init_graph_1() -> List[Edge]: return [] def build_init_graph_2() -> List[Edge]: return [ Edge(1, 2, 8), Edge(1, 3, 7), Edge(1, 4, 4), Edge(2, 3, 2), Edge(3, 4, 5), Edge(2, 5, 3), Edge(2, 6, 9), Edge(3, 6, 6), Edge(4, 6, 7), Edge(4, 7, 2), Edge(6, 5, 3), Edge(6, 7, 4), Edge(5, 8, 9), Edge(6, 8, 5), Edge(7, 8, 8), ] # find augmented path using BFS def find_augmented_path( map_graph: Dict[int, Edge], source_node: int, dest_node: int ) -> List[Edge]: def trace_path( path_mapper: Dict[Tuple[int, int], Edge], visited_parent: List[int], target: int, source: int, ): path = [] while target != source: parent = visited_parent[target] path.append(path_mapper[(parent, target)]) target = parent return path queue = [] queue.append(source_node) visited = set() path_mapper = dict() parent_visited = [-1] * (len(map_graph) + 1) while queue: current_node = queue.pop(0) if current_node == dest_node: return trace_path(path_mapper, parent_visited, dest_node, source_node) visited.add(current_node) neighbors = map_graph[current_node] for neighbor in neighbors: if neighbor.flow > 0 and neighbor.v not in visited: queue.append(neighbor.v) path_mapper[(current_node, neighbor.v)] = neighbor parent_visited[neighbor.v] = current_node return [] def get_min_value_of_path(edges: List[Edge]) -> int: if not edges: return -1 return min(edges, key=lambda x: x.flow).flow def find_residual_graph(edges: List[Edge]) -> List[Edge]: return [Edge(e.v, e.u, e.weight, e.weight - e.flow) for e in edges] def alter_flow(edges: List[Edge], paths: List[Edge], residual_value: int) -> None: edge_mapper = set() for path in paths: edge_mapper.add((path.v, path.u)) for edge in edges: if (edge.u, edge.v) in edge_mapper: edge.add_flow(residual_value) # ford-folkerson algorithm, O(V^2 * E) def min_max_flow(edges: List[Edge], source: int, target: int) -> int: def find_residual_aug_path(residual_graph: List[Edge]): return find_augmented_path(convert_edge_to_map(residual_graph), target, source) residual_graph = find_residual_graph(edges) path = find_residual_aug_path(residual_graph) flow = 0 while path: residual_capacity = get_min_value_of_path(path) flow += residual_capacity alter_flow(edges, path, residual_capacity) path = find_residual_aug_path(find_residual_graph(edges)) return flow # BFS to find the augmented path if __name__ == "__main__": edge_graph = build_init_graph() print(min_max_flow(edge_graph, 1, 6)) print(min_max_flow(build_init_graph_1(), 1, 6)) print(min_max_flow(build_init_graph_2(), 1, 8))
Editor is loading...
Leave a Comment