Skip to content

TSP

sgptools.utils.tsp

resample_path(waypoints, num_inducing=10)

Resamples a given path (sequence of waypoints) to have a fixed number of num_inducing points. This is useful for standardizing path representations or for converting a path with an arbitrary number of waypoints into a fixed-size representation for models. The resampling maintains the path's shape and geometric integrity.

Parameters:

Name Type Description Default
waypoints ndarray

(num_waypoints, ndim); A NumPy array representing the waypoints of a path. num_waypoints is the original number of points, ndim is the dimensionality.

required
num_inducing int

The desired number of points in the resampled path. Defaults to 10.

10

Returns:

Type Description
ndarray

np.ndarray: (num_inducing, ndim); The resampled path with num_inducing points.

Raises:

Type Description
Exception

If the input ndim is not 2 or 3 (as shapely.geometry.LineString primarily supports 2D/3D geometries).

Usage
import numpy as np
from sgptools.utils.tsp import resample_path

# Example 2D path
original_path_2d = np.array([[0,0], [1,5], [3,0], [5,5]], dtype=np.float64)
resampled_path_2d = resample_path(original_path_2d, num_inducing=5)

# Example 3D path
original_path_3d = np.array([[0,0,0], [1,1,1], [2,0,2]], dtype=np.float64)
resampled_path_3d = resample_path(original_path_3d, num_inducing=7)
Source code in sgptools/utils/tsp.py
def resample_path(waypoints: np.ndarray, num_inducing: int = 10) -> np.ndarray:
    """Resamples a given path (sequence of waypoints) to have a fixed number of
    `num_inducing` points. This is useful for standardizing path representations
    or for converting a path with an arbitrary number of waypoints into a
    fixed-size representation for models. The resampling maintains the path's
    shape and geometric integrity.

    Args:
        waypoints (np.ndarray): (num_waypoints, ndim); A NumPy array representing the
                                waypoints of a path. `num_waypoints` is the original
                                number of points, `ndim` is the dimensionality.
        num_inducing (int): The desired number of points in the resampled path. Defaults to 10.

    Returns:
        np.ndarray: (num_inducing, ndim); The resampled path with `num_inducing` points.

    Raises:
        Exception: If the input `ndim` is not 2 or 3 (as `shapely.geometry.LineString`
                   primarily supports 2D/3D geometries).

    Usage:
        ```python
        import numpy as np
        from sgptools.utils.tsp import resample_path

        # Example 2D path
        original_path_2d = np.array([[0,0], [1,5], [3,0], [5,5]], dtype=np.float64)
        resampled_path_2d = resample_path(original_path_2d, num_inducing=5)

        # Example 3D path
        original_path_3d = np.array([[0,0,0], [1,1,1], [2,0,2]], dtype=np.float64)
        resampled_path_3d = resample_path(original_path_3d, num_inducing=7)
        ```
    """
    ndim = np.shape(waypoints)[-1]
    if not (ndim == 2 or ndim == 3):
        raise Exception(f"ndim={ndim} is not supported for path resampling!")
    line = LineString(waypoints)
    distances = np.linspace(0, line.length, num_inducing)
    points = [line.interpolate(distance) for distance in distances]
    if ndim == 2:
        resampled_points = np.array([[p.x, p.y] for p in points])
    elif ndim == 3:
        resampled_points = np.array([[p.x, p.y, p.z] for p in points])
    return resampled_points

run_tsp(nodes, num_vehicles=1, max_dist=25.0, depth=1, resample=None, start_nodes=None, end_nodes=None, time_limit=10)

Method to run TSP/VRP with arbitrary start and end nodes, and without any distance constraint.

Parameters:

Name Type Description Default
nodes ndarray

(# nodes, ndim); Nodes to visit.

required
num_vehicles int

Number of robots/vehicles.

1
max_dist float

Maximum distance allowed for each path when handling multi-robot case.

25.0
depth int

Internal parameter used to track re-try recursion depth.

1
resample Optional[int]

Each solution path will be resampled to have resample number of points.

None
start_nodes Optional[ndarray]

(# num_vehicles, ndim); Optional array of start nodes from which to start each vehicle's solution path.

None
end_nodes Optional[ndarray]

(# num_vehicles, ndim); Optional array of end nodes at which to end each vehicle's solution path.

None
time_limit int

TSP runtime time limit in seconds.

10

Returns:

Type Description
Tuple[Optional[ndarray], Optional[List[float]]]

Tuple[Optional[np.ndarray], Optional[List[float]]]: - paths (np.ndarray): Solution paths if found, otherwise None. - distances (List[float]): List of path lengths if paths are found, otherwise None.

Source code in sgptools/utils/tsp.py
def run_tsp(
    nodes: np.ndarray,
    num_vehicles: int = 1,
    max_dist: float = 25.0,
    depth: int = 1,
    resample: Optional[int] = None,
    start_nodes: Optional[np.ndarray] = None,
    end_nodes: Optional[np.ndarray] = None,
    time_limit: int = 10,
) -> Tuple[Optional[np.ndarray], Optional[List[float]]]:
    """Method to run TSP/VRP with arbitrary start and end nodes,
    and without any distance constraint.

    Args:
        nodes (np.ndarray): (# nodes, ndim); Nodes to visit.
        num_vehicles (int): Number of robots/vehicles.
        max_dist (float): Maximum distance allowed for each path when handling multi-robot case.
        depth (int): Internal parameter used to track re-try recursion depth.
        resample (Optional[int]): Each solution path will be resampled to have
                                   `resample` number of points.
        start_nodes (Optional[np.ndarray]): (# num_vehicles, ndim); Optional array of start nodes from which
                                             to start each vehicle's solution path.
        end_nodes (Optional[np.ndarray]): (# num_vehicles, ndim); Optional array of end nodes at which
                                           to end each vehicle's solution path.
        time_limit (int): TSP runtime time limit in seconds.

    Returns:
        Tuple[Optional[np.ndarray], Optional[List[float]]]:
            - paths (np.ndarray): Solution paths if found, otherwise None.
            - distances (List[float]): List of path lengths if paths are found, otherwise None.
    """
    if depth > 5:
        print('Warning: Max depth reached')
        return None, None

    # Backup original nodes
    original_nodes = np.copy(nodes)

    # Add the start and end nodes to the node list
    if end_nodes is not None:
        assert end_nodes.shape == (num_vehicles, nodes.shape[-1]), \
            "Incorrect end_nodes shape, should be (num_vehicles, ndim)!"
        nodes = np.concatenate([end_nodes, nodes])
    if start_nodes is not None:
        assert start_nodes.shape == (num_vehicles, nodes.shape[-1]), \
            "Incorrect start_nodes shape, should be (num_vehicles, ndim)!"
        nodes = np.concatenate([start_nodes, nodes])

    # Add dummy 0 location to get arbitrary start and end node sols
    if start_nodes is None or end_nodes is None:
        distance_mat = np.zeros((len(nodes) + 1, len(nodes) + 1))
        distance_mat[1:, 1:] = pairwise_distances(nodes, nodes) * 1e4
        trim_paths = True  #shift to account for dummy node
    else:
        distance_mat = pairwise_distances(nodes, nodes) * 1e4
        trim_paths = False
    distance_mat = distance_mat.astype(int)
    max_dist = int(max_dist * 1e4)

    # Get start and end node indices for ortools
    if start_nodes is None:
        start_idx = np.zeros(num_vehicles, dtype=int)
        num_start_nodes = 0
    else:
        start_idx = np.arange(num_vehicles) + int(trim_paths)
        num_start_nodes = len(start_nodes)

    if end_nodes is None:
        end_idx = np.zeros(num_vehicles, dtype=int)
    else:
        end_idx = np.arange(num_vehicles) + num_start_nodes + int(trim_paths)

    # used by ortools
    def distance_callback(from_index, to_index):
        from_node = manager.IndexToNode(from_index)
        to_node = manager.IndexToNode(to_index)
        return distance_mat[from_node][to_node]

    # num_locations, num vehicles, start, end
    manager = pywrapcp.RoutingIndexManager(len(distance_mat), num_vehicles,
                                           start_idx.tolist(),
                                           end_idx.tolist())
    routing = pywrapcp.RoutingModel(manager)
    transit_callback_index = routing.RegisterTransitCallback(distance_callback)
    routing.SetArcCostEvaluatorOfAllVehicles(transit_callback_index)

    if num_vehicles > 1:
        # Dummy distaance constraint to ensure all paths have similar length
        dimension_name = "Distance"
        routing.AddDimension(
            transit_callback_index,
            0,  # no slack
            max_dist,  # vehicle maximum travel distance
            True,  # start cumul to zero
            dimension_name,
        )
        distance_dimension = routing.GetDimensionOrDie(dimension_name)
        distance_dimension.SetGlobalSpanCostCoefficient(100)

    search_parameters = pywrapcp.DefaultRoutingSearchParameters()
    search_parameters.first_solution_strategy = (
        routing_enums_pb2.FirstSolutionStrategy.PATH_CHEAPEST_ARC)
    search_parameters.local_search_metaheuristic = (
        routing_enums_pb2.LocalSearchMetaheuristic.GUIDED_LOCAL_SEARCH)
    search_parameters.time_limit.seconds = time_limit
    solution = routing.SolveWithParameters(search_parameters)

    paths: Optional[List[np.ndarray]] = None
    distances: Optional[List[float]] = None

    if solution is not None:
        paths_indices, distances_raw = _get_routes(manager, routing, solution,
                                                   num_vehicles, start_idx,
                                                   end_idx, trim_paths)

        # Check for empty paths and retry with increased max_dist if necessary
        for path in paths_indices:
            if len(path) < 2:
                print(
                    "TSP Warning: Empty path detected, retrying with increased max_dist."
                )
                # Recalculate max_dist based on the current average distance
                mean_dist = np.mean(
                    distances_raw) / 1e4 if distances_raw else max_dist
                return run_tsp(
                    original_nodes,
                    num_vehicles,
                    mean_dist * (1.5 / depth),
                    depth + 1,
                    resample,
                    start_nodes,
                    end_nodes,
                    time_limit,
                )
        paths = [nodes[path] for path in paths_indices]
        distances = [d / 1e4 for d in distances_raw]

    else:
        print(
            "TSP Warning: No solution found, retrying with increased max_dist."
        )
        return run_tsp(
            original_nodes,
            num_vehicles,
            max_dist * 1.5,
            depth + 1,
            resample,
            start_nodes,
            end_nodes,
            time_limit,
        )

    # Resample each solution path to have resample number of points
    if resample is not None and paths is not None:
        paths = np.array([resample_path(path, resample) for path in paths])

    return paths, distances