Skip to content

CMA

sgptools.methods.CMA

Bases: Method

Informative sensor placement / path optimization using CMA-ES (Covariance Matrix Adaptation Evolution Strategy).

CMA-ES is a derivative-free, population-based genetic optimizer well-suited for non-convex, non-smooth objectives. Here, it searches over the flattened vector of sensing locations / waypoints.

Reference

  • Hitz et al., 2017. Adaptive Continuous-Space Informative Path Planning for Online Environmental Monitoring.

Attributes

objective: Objective object to evaluate information gain. transform: Optional transform applied to candidate solutions (e.g., for IPP / FoV). X_init: Flattened initial guess of the sensing locations. pbounds: Convex hull of the X_objective points, used as an implicit geometric bound (not enforced directly by CMA-ES).

Source code in sgptools/methods.py
class CMA(Method):
    """
    Informative sensor placement / path optimization using CMA-ES
    (Covariance Matrix Adaptation Evolution Strategy).

    CMA-ES is a derivative-free, population-based genetic optimizer well-suited for
    non-convex, non-smooth objectives. Here, it searches over the flattened
    vector of sensing locations / waypoints.

    Reference
    ---------
    - Hitz et al., 2017. *Adaptive Continuous-Space Informative Path Planning
      for Online Environmental Monitoring.*

    Attributes
    ----------
    objective:
        Objective object to evaluate information gain.
    transform:
        Optional transform applied to candidate solutions (e.g., for IPP / FoV).
    X_init:
        Flattened initial guess of the sensing locations.
    pbounds:
        Convex hull of the `X_objective` points, used as an implicit geometric
        bound (not enforced directly by CMA-ES).
    """

    def __init__(self,
                 num_sensing: int,
                 X_objective: np.ndarray,
                 kernel: gpflow.kernels.Kernel,
                 noise_variance: float,
                 transform: Optional[Transform] = None,
                 num_robots: int = 1,
                 X_candidates: Optional[np.ndarray] = None,
                 num_dim: Optional[int] = None,
                 objective: Union[str, Objective] = 'SLogMI',
                 X_init: Optional[np.ndarray] = None,
                 **kwargs: Any):
        """
        Initialize a CMA-ES-based optimization method.

        Parameters
        ----------
        num_sensing:
            Number of sensing locations per robot.
        X_objective:
            Array of shape `(n, d)` used to define the GP objective and
            to build the convex hull bounds.
        kernel:
            GPflow kernel used inside the objective.
        noise_variance:
            Observation noise variance used inside the objective.
        transform:
            Optional transform applied to candidate solutions before objective
            evaluation and constraints.
        num_robots:
            Number of robots / agents. Defaults to 1.
        X_candidates:
            Optional discrete candidate set of locations with shape `(c, d)`.
            If provided, continuous solutions are snapped to candidates.
        num_dim:
            Dimensionality of sensing locations. If `None`, defaults to
            `X_objective.shape[-1]`, or to `X_init.shape[-1]` if `X_init`
            is provided.
        objective:
            Objective specification, either a string key for
            `get_objective` or a pre-instantiated `Objective`.
        X_init:
            Initial guess for the sensing locations, with shape
            `(num_sensing * num_robots, num_dim)`. If `None`, an initial set
            is selected via `get_inducing_pts`.
        **kwargs:
            Extra keyword arguments forwarded to the objective constructor
            when `objective` is a string.
        """
        super().__init__(num_sensing, X_objective, kernel, noise_variance,
                         transform, num_robots, X_candidates, num_dim)
        self.transform = transform
        if X_init is None:
            X_init = get_inducing_pts(X_objective,
                                      num_sensing * self.num_robots)
        else:
            # Override num_dim with the dimensionality of the initial solution
            self.num_dim = X_init.shape[-1]

        self.X_init: np.ndarray = X_init.reshape(-1)  # Flattened initial guess

        if isinstance(objective, str):
            self.objective = get_objective(objective)(X_objective, kernel,
                                                      noise_variance, **kwargs)
        else:
            self.objective = objective

        # Use the convex hull of the objective inputs as a geometric bound
        self.pbounds = geometry.MultiPoint([[p[0], p[1]]
                                            for p in X_objective]).convex_hull

    def update(self, kernel: gpflow.kernels.Kernel,
               noise_variance: float) -> None:
        """
        Update the kernel and noise variance used by the objective.

        Parameters
        ----------
        kernel:
            New GPflow kernel instance.
        noise_variance:
            New observation noise variance.
        """
        self.objective.update(kernel, noise_variance)

    def get_hyperparameters(self) -> Tuple[gpflow.kernels.Kernel, float]:
        """
        Return the current kernel and noise variance used by the objective.

        Returns
        -------
        (gpflow.kernels.Kernel, float)
            A deep copy of the kernel and the current noise variance.
        """
        return deepcopy(self.objective.kernel), \
               self.objective.noise_variance

    def optimize(self,
                 max_steps: int = 500,
                 tol: float = 1e-6,
                 verbose: bool = False,
                 seed: Optional[int] = None,
                 restarts: int = 5,
                 **kwargs: Any) -> np.ndarray:
        """
        Run CMA-ES to obtain informative sensing locations.

        Parameters
        ----------
        max_steps:
            Maximum number of function evaluations (CMA-ES iterations). Defaults
            to 500.
        tol:
            Function-value tolerance for termination (stopping criterion
            passed to CMA). Defaults to `1e-6`.
        verbose:
            If `True`, CMA-ES prints progress messages.
        seed:
            Optional random seed for reproducibility.
        restarts:
            Number of CMA-ES restarts allowed. Defaults to 5.
        **kwargs:
            Additional keyword arguments forwarded to `cma.fmin2` (currently
            unused in this wrapper but accepted for flexibility).

        Returns
        -------
        np.ndarray
            Array of shape `(num_robots, num_sensing, num_dim)` containing the
            optimized sensing locations.
        """
        sigma0 = 1.0
        verbose = 1 if verbose else 0
        sol, _ = cma.fmin2(self._objective,
                           self.X_init,
                           sigma0,
                           options={
                               'maxfevals': max_steps,
                               'verb_disp': verbose,
                               'tolfun': tol,
                               'seed': seed
                           },
                           restarts=restarts)

        sol_np = np.array(sol).reshape(-1, self.num_dim)
        if self.transform is not None:
            sol_np = self.transform.expand(sol_np,
                                           expand_sensor_model=False)
            if not isinstance(sol_np, np.ndarray):
                sol_np = sol_np.numpy()

        # Snap to candidate set if provided
        if self.X_candidates is not None:
            sol_np = cont2disc(sol_np, self.X_candidates)

        sol_np = sol_np.reshape(self.num_robots, -1, self.num_dim)
        return sol_np

    def _objective(self, X: np.ndarray) -> float:
        """
        Objective function passed to CMA-ES (to be *minimized*).

        The internal objective (e.g., mutual information) is naturally
        maximized. CMA-ES, however, minimizes. To reconcile this, the method
        returns `-reward` (plus any constraint penalty), where `reward`
        is the value returned by the underlying objective.

        Steps:
        1. Reshape the flattened input `X` to `(num_points, num_dim)`.
        2. Optionally apply the transform (including constraints).
        3. Evaluate the GP-based objective (reward).
        4. Add the constraint penalty.
        5. Return the negative of this value as a Python float.

        Parameters
        ----------
        X:
            Flattened array of length `num_sensing * num_robots * num_dim`
            containing the current candidate solution.

        Returns
        -------
        float
            Negative objective value to be minimized by CMA-ES. Large positive
            returns correspond to poor solutions; large negative returns
            correspond to good solutions.
        """
        X_reshaped = np.array(X).reshape(-1, self.num_dim)
        constraint_penality: float = 0.0
        if self.transform is not None:
            X_expanded = self.transform.expand(X_reshaped)
            constraint_penality = self.transform.constraints(X_reshaped)
            reward = self.objective(X_expanded)  # maximize
        else:
            reward = self.objective(X_reshaped)  # maximize
        if not np.isfinite(reward):
            reward = -1e6  # CMA does not handle inf values

        # Transform constraints are typically <= 0; adding them penalizes violations.
        reward += constraint_penality
        return -reward.numpy()  # CMA-ES minimizes

    def update_transform(self, transform: Transform) -> None:
        """
        Replace the transform used by the CMA-ES method.

        Parameters
        ----------
        transform:
            New `Transform` instance to use for expansion and constraints.
        """
        self.transform = transform

    def get_transform(self) -> Transform:
        """
        Return a deep copy of the transform used by this method.

        Returns
        -------
        Transform
            Deep copy of the current transform.
        """
        return deepcopy(self.transform)

__init__(num_sensing, X_objective, kernel, noise_variance, transform=None, num_robots=1, X_candidates=None, num_dim=None, objective='SLogMI', X_init=None, **kwargs)

Initialize a CMA-ES-based optimization method.

Parameters

num_sensing: Number of sensing locations per robot. X_objective: Array of shape (n, d) used to define the GP objective and to build the convex hull bounds. kernel: GPflow kernel used inside the objective. noise_variance: Observation noise variance used inside the objective. transform: Optional transform applied to candidate solutions before objective evaluation and constraints. num_robots: Number of robots / agents. Defaults to 1. X_candidates: Optional discrete candidate set of locations with shape (c, d). If provided, continuous solutions are snapped to candidates. num_dim: Dimensionality of sensing locations. If None, defaults to X_objective.shape[-1], or to X_init.shape[-1] if X_init is provided. objective: Objective specification, either a string key for get_objective or a pre-instantiated Objective. X_init: Initial guess for the sensing locations, with shape (num_sensing * num_robots, num_dim). If None, an initial set is selected via get_inducing_pts. **kwargs: Extra keyword arguments forwarded to the objective constructor when objective is a string.

Source code in sgptools/methods.py
def __init__(self,
             num_sensing: int,
             X_objective: np.ndarray,
             kernel: gpflow.kernels.Kernel,
             noise_variance: float,
             transform: Optional[Transform] = None,
             num_robots: int = 1,
             X_candidates: Optional[np.ndarray] = None,
             num_dim: Optional[int] = None,
             objective: Union[str, Objective] = 'SLogMI',
             X_init: Optional[np.ndarray] = None,
             **kwargs: Any):
    """
    Initialize a CMA-ES-based optimization method.

    Parameters
    ----------
    num_sensing:
        Number of sensing locations per robot.
    X_objective:
        Array of shape `(n, d)` used to define the GP objective and
        to build the convex hull bounds.
    kernel:
        GPflow kernel used inside the objective.
    noise_variance:
        Observation noise variance used inside the objective.
    transform:
        Optional transform applied to candidate solutions before objective
        evaluation and constraints.
    num_robots:
        Number of robots / agents. Defaults to 1.
    X_candidates:
        Optional discrete candidate set of locations with shape `(c, d)`.
        If provided, continuous solutions are snapped to candidates.
    num_dim:
        Dimensionality of sensing locations. If `None`, defaults to
        `X_objective.shape[-1]`, or to `X_init.shape[-1]` if `X_init`
        is provided.
    objective:
        Objective specification, either a string key for
        `get_objective` or a pre-instantiated `Objective`.
    X_init:
        Initial guess for the sensing locations, with shape
        `(num_sensing * num_robots, num_dim)`. If `None`, an initial set
        is selected via `get_inducing_pts`.
    **kwargs:
        Extra keyword arguments forwarded to the objective constructor
        when `objective` is a string.
    """
    super().__init__(num_sensing, X_objective, kernel, noise_variance,
                     transform, num_robots, X_candidates, num_dim)
    self.transform = transform
    if X_init is None:
        X_init = get_inducing_pts(X_objective,
                                  num_sensing * self.num_robots)
    else:
        # Override num_dim with the dimensionality of the initial solution
        self.num_dim = X_init.shape[-1]

    self.X_init: np.ndarray = X_init.reshape(-1)  # Flattened initial guess

    if isinstance(objective, str):
        self.objective = get_objective(objective)(X_objective, kernel,
                                                  noise_variance, **kwargs)
    else:
        self.objective = objective

    # Use the convex hull of the objective inputs as a geometric bound
    self.pbounds = geometry.MultiPoint([[p[0], p[1]]
                                        for p in X_objective]).convex_hull

get_hyperparameters()

Return the current kernel and noise variance used by the objective.

Returns

(gpflow.kernels.Kernel, float) A deep copy of the kernel and the current noise variance.

Source code in sgptools/methods.py
def get_hyperparameters(self) -> Tuple[gpflow.kernels.Kernel, float]:
    """
    Return the current kernel and noise variance used by the objective.

    Returns
    -------
    (gpflow.kernels.Kernel, float)
        A deep copy of the kernel and the current noise variance.
    """
    return deepcopy(self.objective.kernel), \
           self.objective.noise_variance

get_transform()

Return a deep copy of the transform used by this method.

Returns

Transform Deep copy of the current transform.

Source code in sgptools/methods.py
def get_transform(self) -> Transform:
    """
    Return a deep copy of the transform used by this method.

    Returns
    -------
    Transform
        Deep copy of the current transform.
    """
    return deepcopy(self.transform)

optimize(max_steps=500, tol=1e-06, verbose=False, seed=None, restarts=5, **kwargs)

Run CMA-ES to obtain informative sensing locations.

Parameters

max_steps: Maximum number of function evaluations (CMA-ES iterations). Defaults to 500. tol: Function-value tolerance for termination (stopping criterion passed to CMA). Defaults to 1e-6. verbose: If True, CMA-ES prints progress messages. seed: Optional random seed for reproducibility. restarts: Number of CMA-ES restarts allowed. Defaults to 5. **kwargs: Additional keyword arguments forwarded to cma.fmin2 (currently unused in this wrapper but accepted for flexibility).

Returns

np.ndarray Array of shape (num_robots, num_sensing, num_dim) containing the optimized sensing locations.

Source code in sgptools/methods.py
def optimize(self,
             max_steps: int = 500,
             tol: float = 1e-6,
             verbose: bool = False,
             seed: Optional[int] = None,
             restarts: int = 5,
             **kwargs: Any) -> np.ndarray:
    """
    Run CMA-ES to obtain informative sensing locations.

    Parameters
    ----------
    max_steps:
        Maximum number of function evaluations (CMA-ES iterations). Defaults
        to 500.
    tol:
        Function-value tolerance for termination (stopping criterion
        passed to CMA). Defaults to `1e-6`.
    verbose:
        If `True`, CMA-ES prints progress messages.
    seed:
        Optional random seed for reproducibility.
    restarts:
        Number of CMA-ES restarts allowed. Defaults to 5.
    **kwargs:
        Additional keyword arguments forwarded to `cma.fmin2` (currently
        unused in this wrapper but accepted for flexibility).

    Returns
    -------
    np.ndarray
        Array of shape `(num_robots, num_sensing, num_dim)` containing the
        optimized sensing locations.
    """
    sigma0 = 1.0
    verbose = 1 if verbose else 0
    sol, _ = cma.fmin2(self._objective,
                       self.X_init,
                       sigma0,
                       options={
                           'maxfevals': max_steps,
                           'verb_disp': verbose,
                           'tolfun': tol,
                           'seed': seed
                       },
                       restarts=restarts)

    sol_np = np.array(sol).reshape(-1, self.num_dim)
    if self.transform is not None:
        sol_np = self.transform.expand(sol_np,
                                       expand_sensor_model=False)
        if not isinstance(sol_np, np.ndarray):
            sol_np = sol_np.numpy()

    # Snap to candidate set if provided
    if self.X_candidates is not None:
        sol_np = cont2disc(sol_np, self.X_candidates)

    sol_np = sol_np.reshape(self.num_robots, -1, self.num_dim)
    return sol_np

update(kernel, noise_variance)

Update the kernel and noise variance used by the objective.

Parameters

kernel: New GPflow kernel instance. noise_variance: New observation noise variance.

Source code in sgptools/methods.py
def update(self, kernel: gpflow.kernels.Kernel,
           noise_variance: float) -> None:
    """
    Update the kernel and noise variance used by the objective.

    Parameters
    ----------
    kernel:
        New GPflow kernel instance.
    noise_variance:
        New observation noise variance.
    """
    self.objective.update(kernel, noise_variance)

update_transform(transform)

Replace the transform used by the CMA-ES method.

Parameters

transform: New Transform instance to use for expansion and constraints.

Source code in sgptools/methods.py
def update_transform(self, transform: Transform) -> None:
    """
    Replace the transform used by the CMA-ES method.

    Parameters
    ----------
    transform:
        New `Transform` instance to use for expansion and constraints.
    """
    self.transform = transform