Untitled
unknown
python
2 years ago
4.0 kB
15
Indexable
from collections import defaultdict
from typing import Dict, Tuple, List
# edge of directed graph
class Edge:
def __init__(
self,
u: int,
v: int,
weight: int,
flow: int = 0,
):
self.u = u
self.v = v
self.weight = weight
self.flow = flow
def add_flow(self, flow: int):
self.flow += flow
def convert_edge_to_map(edge_graph: List[Edge]) -> Dict[int, Edge]:
graph = defaultdict(list)
for edge in edge_graph:
graph[edge.u].append(edge)
return graph
def build_init_graph() -> List[Edge]:
return [
Edge(1, 2, 4),
Edge(1, 3, 3),
Edge(2, 4, 4),
Edge(4, 3, 3),
Edge(3, 5, 6),
Edge(4, 6, 2),
Edge(5, 6, 6),
]
def build_init_graph_1() -> List[Edge]:
return []
def build_init_graph_2() -> List[Edge]:
return [
Edge(1, 2, 8),
Edge(1, 3, 7),
Edge(1, 4, 4),
Edge(2, 3, 2),
Edge(3, 4, 5),
Edge(2, 5, 3),
Edge(2, 6, 9),
Edge(3, 6, 6),
Edge(4, 6, 7),
Edge(4, 7, 2),
Edge(6, 5, 3),
Edge(6, 7, 4),
Edge(5, 8, 9),
Edge(6, 8, 5),
Edge(7, 8, 8),
]
# find augmented path using BFS
def find_augmented_path(
map_graph: Dict[int, Edge], source_node: int, dest_node: int
) -> List[Edge]:
def trace_path(
path_mapper: Dict[Tuple[int, int], Edge],
visited_parent: List[int],
target: int,
source: int,
):
path = []
while target != source:
parent = visited_parent[target]
path.append(path_mapper[(parent, target)])
target = parent
return path
queue = []
queue.append(source_node)
visited = set()
path_mapper = dict()
parent_visited = [-1] * (len(map_graph) + 1)
while queue:
current_node = queue.pop(0)
if current_node == dest_node:
return trace_path(path_mapper, parent_visited, dest_node, source_node)
visited.add(current_node)
neighbors = map_graph[current_node]
for neighbor in neighbors:
if neighbor.flow > 0 and neighbor.v not in visited:
queue.append(neighbor.v)
path_mapper[(current_node, neighbor.v)] = neighbor
parent_visited[neighbor.v] = current_node
return []
def get_min_value_of_path(edges: List[Edge]) -> int:
if not edges:
return -1
return min(edges, key=lambda x: x.flow).flow
def find_residual_graph(edges: List[Edge]) -> List[Edge]:
return [Edge(e.v, e.u, e.weight, e.weight - e.flow) for e in edges]
def alter_flow(edges: List[Edge], paths: List[Edge], residual_value: int) -> None:
edge_mapper = set()
for path in paths:
edge_mapper.add((path.v, path.u))
for edge in edges:
if (edge.u, edge.v) in edge_mapper:
edge.add_flow(residual_value)
# ford-folkerson algorithm, O(V^2 * E)
def min_max_flow(edges: List[Edge], source: int, target: int) -> int:
def find_residual_aug_path(residual_graph: List[Edge]):
return find_augmented_path(convert_edge_to_map(residual_graph), target, source)
residual_graph = find_residual_graph(edges)
path = find_residual_aug_path(residual_graph)
flow = 0
while path:
residual_capacity = get_min_value_of_path(path)
flow += residual_capacity
alter_flow(edges, path, residual_capacity)
path = find_residual_aug_path(find_residual_graph(edges))
return flow
# BFS to find the augmented path
if __name__ == "__main__":
edge_graph = build_init_graph()
print(min_max_flow(edge_graph, 1, 6))
print(min_max_flow(build_init_graph_1(), 1, 6))
print(min_max_flow(build_init_graph_2(), 1, 8))
Editor is loading...
Leave a Comment