Untitled

 avatar
unknown
plain_text
2 months ago
5.8 kB
5
Indexable
from typing import List, Union
from graphite.solvers.base_solver import BaseSolver
from graphite.protocol import GraphV1Problem, GraphV2Problem
from graphite.utils.graph_utils import timeout
import numpy as np
import time
import asyncio
import random

import bittensor as bt
import os
import subprocess
import tempfile
from io import StringIO
# from greedy_solver import NearestNeighbourSolver

class LKHSolver(BaseSolver):
    def __init__(self, problem_types:List[Union[GraphV1Problem, GraphV2Problem]]=[GraphV1Problem(n_nodes=2), GraphV1Problem(n_nodes=2, directed=True, problem_type='General TSP')]):
        super().__init__(problem_types=problem_types)
        self.lkh_path = "./LKH-3.0.11/LKH"
    
    def create_problem_file(self, distance_matrix):
        dimension = len(distance_matrix)
        problem_file_content = f"""NAME: TSP
        TYPE: TSP
        DIMENSION: {dimension}
        EDGE_WEIGHT_TYPE: EXPLICIT
        EDGE_WEIGHT_FORMAT: FULL_MATRIX
        EDGE_WEIGHT_SECTION
        """
        # Sử dụng StringIO và np.savetxt để tạo chuỗi
        buffer = StringIO()
        np.savetxt(buffer, distance_matrix, fmt='%d', delimiter=' ')
        matrix_string = buffer.getvalue().strip()
        problem_file_content += matrix_string + "\nEOF\n"
        return problem_file_content
    
    def create_parameter_file(self, problem_file_path, tour_file_path, nodes=5000):
        trial = int(500 * 5000 / nodes)
        parameter_file_content = f"""PROBLEM_FILE = {problem_file_path}
        TOUR_FILE = {tour_file_path}
        INITIAL_PERIOD = 100
        PRECISION = 1e-04
        RUNS = 1
        INITIAL_TOUR_ALGORITHM = GREEDY
        MAX_TRIALS = {trial}
        TIME_LIMIT = 23
        TOTAL_TIME_LIMIT = 23
        """
        return parameter_file_content
    
    async def solve(self, formatted_problem, future_id:int)->List[int]:
        with tempfile.NamedTemporaryFile('w+', prefix='problem_', suffix='.txt', delete=False) as problem_file, \
            tempfile.NamedTemporaryFile('w+', prefix='param_', suffix='.txt', delete=False) as parameter_file, \
            tempfile.NamedTemporaryFile('r+', prefix='tour_', suffix='.txt', delete=False) as tour_file:

            problem_file_content = self.create_problem_file(formatted_problem)
            problem_file.write(problem_file_content)
            problem_file.flush()

            parameter_file_content = self.create_parameter_file(problem_file.name, tour_file.name, len(formatted_problem))
            parameter_file.write(parameter_file_content)
            parameter_file.flush()

            # Run LKH
            subprocess.run([self.lkh_path, parameter_file.name], check=True)
            # Read the tour file
            tour_file.seek(0)
            tour = self.parse_tour_file(tour_file.name)

        # Clean up temporary files
        os.remove(problem_file.name)
        os.remove(parameter_file.name)
        os.remove(tour_file.name)

        # total_distance = self.calculate_total_distance(tour, formatted_problem)

        # return tour
        return tour
    
    def calculate_total_distance(self, tour, distance_matrix):
        total_distance = 0
        for i in range(len(tour)):
            total_distance += distance_matrix[tour[i]][tour[(i + 1) % len(tour)]]
        return total_distance
    
    def parse_tour_file(self, tour_file_path):
        tour = []
        with open(tour_file_path, 'r') as file:
            in_tour_section = False
            for line in file:
                if line.strip() == 'TOUR_SECTION':
                    in_tour_section = True
                elif line.strip() == '-1':
                    break
                elif in_tour_section:
                    tour.append(int(line.strip()) - 1)  # LKH uses 1-based indexing
        tour.append(tour[0])
        return tour

    def problem_transformations(self, problem: Union[GraphV1Problem, GraphV2Problem]):
        return problem.edges
    
if __name__ == "__main__":
    ## Test case for GraphV2Problem
    from graphite.data.distance import geom_edges, man_2d_edges, euc_2d_edges
    loaded_datasets = {}
    with np.load('dataset/Asia_MSB.npz') as f:
        loaded_datasets["Asia_MSB"] = np.array(f['data'])

    def recreate_edges(problem: GraphV2Problem):
        node_coords_np = loaded_datasets[problem.dataset_ref]
        node_coords = np.array([node_coords_np[i][1:] for i in problem.selected_ids])
        if problem.cost_function == "Geom":
            return geom_edges(node_coords)
        elif problem.cost_function == "Euclidean2D":
            return euc_2d_edges(node_coords)
        elif problem.cost_function == "Manhatten2D":
            return man_2d_edges(node_coords)
        else:
            return "Only Geom, Euclidean2D, and Manhatten2D supported for now."
      
    n_nodes = 3272
    # randomly select n_nodes indexes from the selected graph
    selected_node_idxs = random.sample(range(26000000), n_nodes)
    test_problem = GraphV2Problem(problem_type="Metric TSP", n_nodes=n_nodes, selected_ids=selected_node_idxs, cost_function="Geom", dataset_ref="Asia_MSB")
    
    if isinstance(test_problem, GraphV2Problem):
        test_problem.edges = recreate_edges(test_problem)
    
    print("edges", test_problem.edges)
    print("Problem", test_problem)

    lkh_solver = LKHSolver(problem_types=[test_problem])
    start_time = time.time()

    # Run the solver to get the tour
    route = asyncio.run(lkh_solver.solve_problem(test_problem))

    # Calculate total distance of the tour
    total_distance = lkh_solver.calculate_total_distance(route, test_problem.edges)

    print(f"{lkh_solver.__class__.__name__} Tour: {route}")
    print(f"Total distance of the tour: {total_distance}")
    print(f"{lkh_solver.__class__.__name__} Time Taken for {n_nodes} Nodes: {time.time()-start_time}")
Leave a Comment