Skip to content

CMA

sgptools.methods.CMA

Bases: Method

Implements informative sensor placement/path optimization using CMA-ES (Covariance Matrix Adaptation Evolution Strategy).

CMA-ES is a powerful black-box optimization algorithm for non-convex problems.

Refer to the following paper for more details
  • Adaptive Continuous-Space Informative Path Planning for Online Environmental Monitoring [Hitz et al., 2017]

Attributes:

Name Type Description
objective object

The objective function to be minimized/maximized.

transform Optional[Transform]

Transform object applied to inducing points.

X_init ndarray

Initial solution guess for the optimization.

pbounds MultiPoint

The convex hull of the objective area, used implicitly for bounds.

Source code in sgptools/methods.py
class CMA(Method):
    """
    Implements informative sensor placement/path optimization using CMA-ES (Covariance Matrix Adaptation Evolution Strategy).

    CMA-ES is a powerful black-box optimization algorithm for non-convex problems.

    Refer to the following paper for more details:
        - Adaptive Continuous-Space Informative Path Planning for Online Environmental Monitoring [Hitz et al., 2017]

    Attributes:
        objective (object): The objective function to be minimized/maximized.
        transform (Optional[Transform]): Transform object applied to inducing points.
        X_init (np.ndarray): Initial solution guess for the optimization.
        pbounds (geometry.MultiPoint): The convex hull of the objective area, used implicitly for bounds.
    """

    def __init__(self,
                 num_sensing: int,
                 X_objective: np.ndarray,
                 kernel: gpflow.kernels.Kernel,
                 noise_variance: float,
                 transform: Optional[Transform] = None,
                 num_robots: int = 1,
                 X_candidates: Optional[np.ndarray] = None,
                 num_dim: Optional[int] = None,
                 objective: Union[str, Any] = 'SLogMI',
                 X_init: Optional[np.ndarray] = None,
                 **kwargs: Any):
        """
        Initializes the CMA-ES optimizer.

        Args:
            num_sensing (int): Number of sensing locations to optimize.
            X_objective (np.ndarray): (n, d); Data points used to define the objective function.
            kernel (gpflow.kernels.Kernel): GPflow kernel function.
            noise_variance (float): Data noise variance.
            transform (Optional[Transform]): Transform object to apply to inducing points. Defaults to None.
            num_robots (int): Number of robots/agents. Defaults to 1.
            X_candidates (Optional[np.ndarray]): (c, d); Discrete set of candidate locations for sensor placement.
                                                 Defaults to None.
            num_dim (Optional[int]): Dimensionality of the sensing locations. Defaults to dimensonality of X_objective.
            objective (Union[str, Any]): The objective function to use. Can be a string ('SLogMI', 'MI')
                                         or an instance of an objective class. Defaults to 'SLogMI'.
            X_init (Optional[np.ndarray]): (num_sensing * num_robots, num_dim); Initial guess for sensing locations.
                                            If None, initial points are randomly selected from X_objective.
            **kwargs: Additional keyword arguments passed to the objective function.
        """
        super().__init__(num_sensing, X_objective, kernel, noise_variance,
                         transform, num_robots, X_candidates, num_dim)
        self.transform = transform
        if X_init is None:
            X_init = get_inducing_pts(X_objective,
                                      num_sensing * self.num_robots)
        else:
            # override num_dim with initial inducing points dim, in case it differes from X_objective dim
            self.num_dim = X_init.shape[-1]

        self.X_init: np.ndarray = X_init.reshape(-1)  # Flattened initial guess

        if isinstance(objective, str):
            self.objective = get_objective(objective)(X_objective, kernel,
                                                      noise_variance, **kwargs)
        else:
            self.objective = objective

        # Use the boundaries of the X_objective area as the search space limits
        self.pbounds = geometry.MultiPoint([[p[0], p[1]]
                                            for p in X_objective]).convex_hull

    def update(self, kernel: gpflow.kernels.Kernel,
               noise_variance: float) -> None:
        """
        Updates the kernel and noise variance parameters of the objective function.

        Args:
            kernel (gpflow.kernels.Kernel): Updated GPflow kernel function.
            noise_variance (float): Updated data noise variance.
        """
        self.objective.update(kernel, noise_variance)

    def get_hyperparameters(self) -> Tuple[gpflow.kernels.Kernel, float]:
        """
        Retrieves the current kernel and noise variance hyperparameters from the objective.

        Returns:
            Tuple[gpflow.kernels.Kernel, float]: A tuple containing a deep copy of the kernel and the noise variance.
        """
        return deepcopy(self.objective.kernel), \
               self.objective.noise_variance

    def optimize(self,
                 max_steps: int = 500,
                 tol: float = 1e-6,
                 verbose: bool = False,
                 seed: Optional[int] = None,
                 restarts: int = 5,
                 **kwargs: Any) -> np.ndarray:
        """
        Optimizes the sensor placement/path using CMA-ES.

        Args:
            max_steps (int): Maximum number of optimization steps (function evaluations). Defaults to 500.
            tol (float): Tolerance for termination. Defaults to 1e-6.
            verbose (bool): Verbosity, if True additional details will by reported. Defaults to False.
            seed (Optional[int]): Random seed for reproducibility. Defaults to None.
            restarts (int): Number of restarts for CMA-ES. Defaults to 5.
            **kwargs: Additional keyword arguments for CMA-ES.

        Returns:
            np.ndarray: (num_robots, num_sensing, num_dim); Optimized sensing locations.

        Usage:
            ```python
            # Assuming X_train, candidates, kernel_opt, noise_variance_opt are defined
            cma_method = CMA(
                num_sensing=10,
                X_objective=X_train,
                kernel=kernel_opt,
                noise_variance=noise_variance_opt,
                transform=IPPTransform(num_robots=1), # Example transform
                X_candidates=candidates
            )
            optimized_solution = cma_method.optimize(max_steps=1000)
            ```
        """
        sigma0 = 1.0
        verbose = 1 if verbose else 0
        sol, _ = cma.fmin2(self._objective,
                           self.X_init,
                           sigma0,
                           options={
                               'maxfevals': max_steps,
                               'verb_disp': verbose,
                               'tolfun': tol,
                               'seed': seed
                           },
                           restarts=restarts)

        sol_np = np.array(sol).reshape(-1, self.num_dim)
        if self.transform is not None:
            try:
                sol_np = self.transform.expand(sol_np,
                                               expand_sensor_model=False)
            except TypeError:
                pass
            if not isinstance(sol_np, np.ndarray):
                sol_np = sol_np.numpy()

        # Map solution locations to candidates set locations if X_candidates is provided
        if self.X_candidates is not None:
            sol_np = cont2disc(sol_np, self.X_candidates)

        sol_np = sol_np.reshape(self.num_robots, -1, self.num_dim)
        return sol_np

    def _objective(self, X: np.ndarray) -> float:
        """
        Internal objective function to be minimized by CMA-ES.

        This function reshapes the input array, applies any specified transformations,
        calculates the objective value, and applies a penalty for constraint violations.
        Note: CMA-ES minimizes, so the reward (which is to be maximized) is returned as negative.

        Args:
            X (np.ndarray): (num_sensing * num_robots * num_dim); Flattened array of
                            current solution sensor placement locations.

        Returns:
            float: The negative objective value (-reward + constraint penalty) to be minimized.
        """
        X_reshaped = np.array(X).reshape(-1, self.num_dim)
        constraint_penality: float = 0.0
        if self.transform is not None:
            X_expanded = self.transform.expand(X_reshaped)
            constraint_penality = self.transform.constraints(X_reshaped)
            reward = self.objective(X_expanded)  # maximize
        else:
            reward = self.objective(X_reshaped)  # maximize

        reward += constraint_penality  # minimize (large negative value when constraint is unsatisfied)
        return -reward.numpy()  # Return negative as CMA-ES minimizes

    def update_transform(self, transform: Transform) -> None:
        """
        Updates the transform object used by the CMA-ES optimizer.

        Args:
            transform (Transform): The new transform object.
        """
        self.transform = transform

    def get_transform(self) -> Transform:
        """
        Retrieves a deep copy of the transform object.

        Returns:
            Transform: A deep copy of the transform object.
        """
        return deepcopy(self.transform)

__init__(num_sensing, X_objective, kernel, noise_variance, transform=None, num_robots=1, X_candidates=None, num_dim=None, objective='SLogMI', X_init=None, **kwargs)

Initializes the CMA-ES optimizer.

Parameters:

Name Type Description Default
num_sensing int

Number of sensing locations to optimize.

required
X_objective ndarray

(n, d); Data points used to define the objective function.

required
kernel Kernel

GPflow kernel function.

required
noise_variance float

Data noise variance.

required
transform Optional[Transform]

Transform object to apply to inducing points. Defaults to None.

None
num_robots int

Number of robots/agents. Defaults to 1.

1
X_candidates Optional[ndarray]

(c, d); Discrete set of candidate locations for sensor placement. Defaults to None.

None
num_dim Optional[int]

Dimensionality of the sensing locations. Defaults to dimensonality of X_objective.

None
objective Union[str, Any]

The objective function to use. Can be a string ('SLogMI', 'MI') or an instance of an objective class. Defaults to 'SLogMI'.

'SLogMI'
X_init Optional[ndarray]

(num_sensing * num_robots, num_dim); Initial guess for sensing locations. If None, initial points are randomly selected from X_objective.

None
**kwargs Any

Additional keyword arguments passed to the objective function.

{}
Source code in sgptools/methods.py
def __init__(self,
             num_sensing: int,
             X_objective: np.ndarray,
             kernel: gpflow.kernels.Kernel,
             noise_variance: float,
             transform: Optional[Transform] = None,
             num_robots: int = 1,
             X_candidates: Optional[np.ndarray] = None,
             num_dim: Optional[int] = None,
             objective: Union[str, Any] = 'SLogMI',
             X_init: Optional[np.ndarray] = None,
             **kwargs: Any):
    """
    Initializes the CMA-ES optimizer.

    Args:
        num_sensing (int): Number of sensing locations to optimize.
        X_objective (np.ndarray): (n, d); Data points used to define the objective function.
        kernel (gpflow.kernels.Kernel): GPflow kernel function.
        noise_variance (float): Data noise variance.
        transform (Optional[Transform]): Transform object to apply to inducing points. Defaults to None.
        num_robots (int): Number of robots/agents. Defaults to 1.
        X_candidates (Optional[np.ndarray]): (c, d); Discrete set of candidate locations for sensor placement.
                                             Defaults to None.
        num_dim (Optional[int]): Dimensionality of the sensing locations. Defaults to dimensonality of X_objective.
        objective (Union[str, Any]): The objective function to use. Can be a string ('SLogMI', 'MI')
                                     or an instance of an objective class. Defaults to 'SLogMI'.
        X_init (Optional[np.ndarray]): (num_sensing * num_robots, num_dim); Initial guess for sensing locations.
                                        If None, initial points are randomly selected from X_objective.
        **kwargs: Additional keyword arguments passed to the objective function.
    """
    super().__init__(num_sensing, X_objective, kernel, noise_variance,
                     transform, num_robots, X_candidates, num_dim)
    self.transform = transform
    if X_init is None:
        X_init = get_inducing_pts(X_objective,
                                  num_sensing * self.num_robots)
    else:
        # override num_dim with initial inducing points dim, in case it differes from X_objective dim
        self.num_dim = X_init.shape[-1]

    self.X_init: np.ndarray = X_init.reshape(-1)  # Flattened initial guess

    if isinstance(objective, str):
        self.objective = get_objective(objective)(X_objective, kernel,
                                                  noise_variance, **kwargs)
    else:
        self.objective = objective

    # Use the boundaries of the X_objective area as the search space limits
    self.pbounds = geometry.MultiPoint([[p[0], p[1]]
                                        for p in X_objective]).convex_hull

get_hyperparameters()

Retrieves the current kernel and noise variance hyperparameters from the objective.

Returns:

Type Description
Tuple[Kernel, float]

Tuple[gpflow.kernels.Kernel, float]: A tuple containing a deep copy of the kernel and the noise variance.

Source code in sgptools/methods.py
def get_hyperparameters(self) -> Tuple[gpflow.kernels.Kernel, float]:
    """
    Retrieves the current kernel and noise variance hyperparameters from the objective.

    Returns:
        Tuple[gpflow.kernels.Kernel, float]: A tuple containing a deep copy of the kernel and the noise variance.
    """
    return deepcopy(self.objective.kernel), \
           self.objective.noise_variance

get_transform()

Retrieves a deep copy of the transform object.

Returns:

Name Type Description
Transform Transform

A deep copy of the transform object.

Source code in sgptools/methods.py
def get_transform(self) -> Transform:
    """
    Retrieves a deep copy of the transform object.

    Returns:
        Transform: A deep copy of the transform object.
    """
    return deepcopy(self.transform)

optimize(max_steps=500, tol=1e-06, verbose=False, seed=None, restarts=5, **kwargs)

Optimizes the sensor placement/path using CMA-ES.

Parameters:

Name Type Description Default
max_steps int

Maximum number of optimization steps (function evaluations). Defaults to 500.

500
tol float

Tolerance for termination. Defaults to 1e-6.

1e-06
verbose bool

Verbosity, if True additional details will by reported. Defaults to False.

False
seed Optional[int]

Random seed for reproducibility. Defaults to None.

None
restarts int

Number of restarts for CMA-ES. Defaults to 5.

5
**kwargs Any

Additional keyword arguments for CMA-ES.

{}

Returns:

Type Description
ndarray

np.ndarray: (num_robots, num_sensing, num_dim); Optimized sensing locations.

Usage
# Assuming X_train, candidates, kernel_opt, noise_variance_opt are defined
cma_method = CMA(
    num_sensing=10,
    X_objective=X_train,
    kernel=kernel_opt,
    noise_variance=noise_variance_opt,
    transform=IPPTransform(num_robots=1), # Example transform
    X_candidates=candidates
)
optimized_solution = cma_method.optimize(max_steps=1000)
Source code in sgptools/methods.py
def optimize(self,
             max_steps: int = 500,
             tol: float = 1e-6,
             verbose: bool = False,
             seed: Optional[int] = None,
             restarts: int = 5,
             **kwargs: Any) -> np.ndarray:
    """
    Optimizes the sensor placement/path using CMA-ES.

    Args:
        max_steps (int): Maximum number of optimization steps (function evaluations). Defaults to 500.
        tol (float): Tolerance for termination. Defaults to 1e-6.
        verbose (bool): Verbosity, if True additional details will by reported. Defaults to False.
        seed (Optional[int]): Random seed for reproducibility. Defaults to None.
        restarts (int): Number of restarts for CMA-ES. Defaults to 5.
        **kwargs: Additional keyword arguments for CMA-ES.

    Returns:
        np.ndarray: (num_robots, num_sensing, num_dim); Optimized sensing locations.

    Usage:
        ```python
        # Assuming X_train, candidates, kernel_opt, noise_variance_opt are defined
        cma_method = CMA(
            num_sensing=10,
            X_objective=X_train,
            kernel=kernel_opt,
            noise_variance=noise_variance_opt,
            transform=IPPTransform(num_robots=1), # Example transform
            X_candidates=candidates
        )
        optimized_solution = cma_method.optimize(max_steps=1000)
        ```
    """
    sigma0 = 1.0
    verbose = 1 if verbose else 0
    sol, _ = cma.fmin2(self._objective,
                       self.X_init,
                       sigma0,
                       options={
                           'maxfevals': max_steps,
                           'verb_disp': verbose,
                           'tolfun': tol,
                           'seed': seed
                       },
                       restarts=restarts)

    sol_np = np.array(sol).reshape(-1, self.num_dim)
    if self.transform is not None:
        try:
            sol_np = self.transform.expand(sol_np,
                                           expand_sensor_model=False)
        except TypeError:
            pass
        if not isinstance(sol_np, np.ndarray):
            sol_np = sol_np.numpy()

    # Map solution locations to candidates set locations if X_candidates is provided
    if self.X_candidates is not None:
        sol_np = cont2disc(sol_np, self.X_candidates)

    sol_np = sol_np.reshape(self.num_robots, -1, self.num_dim)
    return sol_np

update(kernel, noise_variance)

Updates the kernel and noise variance parameters of the objective function.

Parameters:

Name Type Description Default
kernel Kernel

Updated GPflow kernel function.

required
noise_variance float

Updated data noise variance.

required
Source code in sgptools/methods.py
def update(self, kernel: gpflow.kernels.Kernel,
           noise_variance: float) -> None:
    """
    Updates the kernel and noise variance parameters of the objective function.

    Args:
        kernel (gpflow.kernels.Kernel): Updated GPflow kernel function.
        noise_variance (float): Updated data noise variance.
    """
    self.objective.update(kernel, noise_variance)

update_transform(transform)

Updates the transform object used by the CMA-ES optimizer.

Parameters:

Name Type Description Default
transform Transform

The new transform object.

required
Source code in sgptools/methods.py
def update_transform(self, transform: Transform) -> None:
    """
    Updates the transform object used by the CMA-ES optimizer.

    Args:
        transform (Transform): The new transform object.
    """
    self.transform = transform