Skip to content

ContinuousSGP

sgptools.methods.ContinuousSGP

Bases: Method

Implements informative sensor placement/path optimization using a Sparse Gaussian Process (SGP).

This method optimizes the inducing points of an SGP model to maximize the ELBO or other SGP-related objectives.

Refer to the following papers for more details
  • Efficient Sensor Placement from Regression with Sparse Gaussian Processes in Continuous and Discrete Spaces [Jakkala and Akella, 2023]
  • Multi-Robot Informative Path Planning from Regression with Sparse Gaussian Processes [Jakkala and Akella, 2024]

Attributes:

Name Type Description
sgpr AugmentedSGPR

The Augmented Sparse Gaussian Process Regression model.

Source code in sgptools/methods.py
class ContinuousSGP(Method):
    """
    Implements informative sensor placement/path optimization using a Sparse Gaussian Process (SGP).

    This method optimizes the inducing points of an SGP model to maximize the ELBO or other SGP-related objectives.

    Refer to the following papers for more details:
        - Efficient Sensor Placement from Regression with Sparse Gaussian Processes in Continuous and Discrete Spaces [[Jakkala and Akella, 2023](https://www.itskalvik.com/publication/sgp-sp/)]
        - Multi-Robot Informative Path Planning from Regression with Sparse Gaussian Processes [[Jakkala and Akella, 2024](https://www.itskalvik.com/publication/sgp-ipp/)]

    Attributes:
        sgpr (AugmentedSGPR): The Augmented Sparse Gaussian Process Regression model.
    """

    def __init__(self,
                 num_sensing: int,
                 X_objective: np.ndarray,
                 kernel: gpflow.kernels.Kernel,
                 noise_variance: float,
                 transform: Optional[Transform] = None,
                 num_robots: int = 1,
                 X_candidates: Optional[np.ndarray] = None,
                 num_dim: Optional[int] = None,
                 X_init: Optional[np.ndarray] = None,
                 X_time: Optional[np.ndarray] = None,
                 orientation: bool = False,
                 **kwargs: Any):
        """
        Initializes the ContinuousSGP optimizer.

        Args:
            num_sensing (int): Number of sensing locations (inducing points) to optimize.
            X_objective (np.ndarray): (n, d); Data points used to approximate the bounds of the environment.
            kernel (gpflow.kernels.Kernel): GPflow kernel function.
            noise_variance (float): Data noise variance.
            transform (Optional[Transform]): Transform object to apply to inducing points. Defaults to None.
            num_robots (int): Number of robots/agents. Defaults to 1.
            X_candidates (Optional[np.ndarray]): (c, d); Discrete set of candidate locations for sensor placement.
                                                 Defaults to None.
            num_dim (Optional[int]): Dimensionality of the sensing locations. Defaults to dimensonality of X_objective.
            X_init (Optional[np.ndarray]): (num_sensing * num_robots, d); Initial inducing points.
                                            If None, initial points are randomly selected from X_objective.
            X_time (Optional[np.ndarray]): (m, d); Temporal dimensions of the inducing points, used when
                                            modeling spatio-temporal IPP. Defaults to None.
            orientation (bool): If True, adds an additional dimension to model sensor FoV rotation angle
                                when selecting initial inducing points. Defaults to False.
            **kwargs: Additional keyword arguments.
        """
        super().__init__(num_sensing, X_objective, kernel, noise_variance,
                         transform, num_robots, X_candidates, num_dim)
        if X_init is None:
            X_init = get_inducing_pts(X_objective,
                                      num_sensing * self.num_robots,
                                      orientation=orientation)
        else:
            # override num_dim with initial inducing points dim, in case it differes from X_objective dim
            self.num_dim = X_init.shape[-1]

        # Fit the SGP
        dtype = X_objective.dtype
        train_set: Tuple[tf.Tensor, tf.Tensor] = (tf.constant(X_objective,
                                                              dtype=dtype),
                                                  tf.zeros(
                                                      (len(X_objective), 1),
                                                      dtype=dtype))
        self.sgpr = AugmentedSGPR(train_set,
                                  noise_variance=noise_variance,
                                  kernel=kernel,
                                  inducing_variable=X_init,
                                  inducing_variable_time=X_time,
                                  transform=transform)

    def update(self, kernel: gpflow.kernels.Kernel,
               noise_variance: float) -> None:
        """
        Updates the kernel and noise variance parameters of the SGP model.

        Args:
            kernel (gpflow.kernels.Kernel): Updated GPflow kernel function.
            noise_variance (float): Updated data noise variance.
        """
        self.sgpr.update(kernel, noise_variance)

    def get_hyperparameters(self) -> Tuple[gpflow.kernels.Kernel, float]:
        """
        Retrieves the current kernel and noise variance hyperparameters from the SGP model.

        Returns:
            Tuple[gpflow.kernels.Kernel, float]: A tuple containing a deep copy of the kernel and the noise variance.
        """
        return deepcopy(self.sgpr.kernel), \
               self.sgpr.likelihood.variance.numpy()

    def optimize(self,
                 max_steps: int = 500,
                 optimizer: str = 'scipy.L-BFGS-B',
                 verbose: bool = False,
                 **kwargs: Any) -> np.ndarray:
        """
        Optimizes the inducing points of the SGP model.

        Args:
            max_steps (int): Maximum number of optimization steps. Defaults to 500.
            optimizer (str): Optimizer "<backend>.<method>" to use for training (e.g., 'scipy.L-BFGS-B', 'tf.adam').
                             Defaults to 'scipy.L-BFGS-B'.
            verbose (bool): Verbosity, if True additional details will by reported. Defaults to False.
            **kwargs: Additional keyword arguments for the optimizer.

        Returns:
            np.ndarray: (num_robots, num_sensing, num_dim); Optimized inducing points (sensing locations).

        Usage:
            ```python
            # Assuming X_train, candidates, kernel_opt, noise_variance_opt are defined
            csgp_method = ContinuousSGP(
                num_sensing=10,
                X_objective=dataset.X_train,
                kernel=kernel_opt,
                noise_variance=noise_variance_opt,
                transform=IPPTransform(num_robots=1), # Example transform
                X_candidates=candidates # Only if the solution needs to be mapped to candidates
            )
            optimized_solution = csgp_method.optimize(max_steps=500, optimizer='scipy.L-BFGS-B')
            ```
        """
        _ = optimize_model(
            self.sgpr,
            max_steps=max_steps,
            optimize_hparams=
            False,  # Inducing points are optimized, not kernel hyperparameters
            optimizer=optimizer,
            verbose=verbose,
            **kwargs)

        sol: tf.Tensor = self.sgpr.inducing_variable.Z
        try:
            sol_expanded = self.transform.expand(sol,
                                                 expand_sensor_model=False)
        except TypeError:
            sol_expanded = sol
        if not isinstance(sol_expanded, np.ndarray):
            sol_np = sol_expanded.numpy()
        else:
            sol_np = sol_expanded

        # Map solution locations to candidates set locations if X_candidates is provided
        if self.X_candidates is not None:
            sol_np = cont2disc(sol_np, self.X_candidates)

        sol_np = sol_np.reshape(self.num_robots, -1, self.num_dim)
        return sol_np

    @property
    def transform(self) -> Transform:
        """
        Gets the transform object associated with the SGP model.

        Returns:
            Transform: The transform object.
        """
        return self.sgpr.transform

transform property

Gets the transform object associated with the SGP model.

Returns:

Name Type Description
Transform Transform

The transform object.

__init__(num_sensing, X_objective, kernel, noise_variance, transform=None, num_robots=1, X_candidates=None, num_dim=None, X_init=None, X_time=None, orientation=False, **kwargs)

Initializes the ContinuousSGP optimizer.

Parameters:

Name Type Description Default
num_sensing int

Number of sensing locations (inducing points) to optimize.

required
X_objective ndarray

(n, d); Data points used to approximate the bounds of the environment.

required
kernel Kernel

GPflow kernel function.

required
noise_variance float

Data noise variance.

required
transform Optional[Transform]

Transform object to apply to inducing points. Defaults to None.

None
num_robots int

Number of robots/agents. Defaults to 1.

1
X_candidates Optional[ndarray]

(c, d); Discrete set of candidate locations for sensor placement. Defaults to None.

None
num_dim Optional[int]

Dimensionality of the sensing locations. Defaults to dimensonality of X_objective.

None
X_init Optional[ndarray]

(num_sensing * num_robots, d); Initial inducing points. If None, initial points are randomly selected from X_objective.

None
X_time Optional[ndarray]

(m, d); Temporal dimensions of the inducing points, used when modeling spatio-temporal IPP. Defaults to None.

None
orientation bool

If True, adds an additional dimension to model sensor FoV rotation angle when selecting initial inducing points. Defaults to False.

False
**kwargs Any

Additional keyword arguments.

{}
Source code in sgptools/methods.py
def __init__(self,
             num_sensing: int,
             X_objective: np.ndarray,
             kernel: gpflow.kernels.Kernel,
             noise_variance: float,
             transform: Optional[Transform] = None,
             num_robots: int = 1,
             X_candidates: Optional[np.ndarray] = None,
             num_dim: Optional[int] = None,
             X_init: Optional[np.ndarray] = None,
             X_time: Optional[np.ndarray] = None,
             orientation: bool = False,
             **kwargs: Any):
    """
    Initializes the ContinuousSGP optimizer.

    Args:
        num_sensing (int): Number of sensing locations (inducing points) to optimize.
        X_objective (np.ndarray): (n, d); Data points used to approximate the bounds of the environment.
        kernel (gpflow.kernels.Kernel): GPflow kernel function.
        noise_variance (float): Data noise variance.
        transform (Optional[Transform]): Transform object to apply to inducing points. Defaults to None.
        num_robots (int): Number of robots/agents. Defaults to 1.
        X_candidates (Optional[np.ndarray]): (c, d); Discrete set of candidate locations for sensor placement.
                                             Defaults to None.
        num_dim (Optional[int]): Dimensionality of the sensing locations. Defaults to dimensonality of X_objective.
        X_init (Optional[np.ndarray]): (num_sensing * num_robots, d); Initial inducing points.
                                        If None, initial points are randomly selected from X_objective.
        X_time (Optional[np.ndarray]): (m, d); Temporal dimensions of the inducing points, used when
                                        modeling spatio-temporal IPP. Defaults to None.
        orientation (bool): If True, adds an additional dimension to model sensor FoV rotation angle
                            when selecting initial inducing points. Defaults to False.
        **kwargs: Additional keyword arguments.
    """
    super().__init__(num_sensing, X_objective, kernel, noise_variance,
                     transform, num_robots, X_candidates, num_dim)
    if X_init is None:
        X_init = get_inducing_pts(X_objective,
                                  num_sensing * self.num_robots,
                                  orientation=orientation)
    else:
        # override num_dim with initial inducing points dim, in case it differes from X_objective dim
        self.num_dim = X_init.shape[-1]

    # Fit the SGP
    dtype = X_objective.dtype
    train_set: Tuple[tf.Tensor, tf.Tensor] = (tf.constant(X_objective,
                                                          dtype=dtype),
                                              tf.zeros(
                                                  (len(X_objective), 1),
                                                  dtype=dtype))
    self.sgpr = AugmentedSGPR(train_set,
                              noise_variance=noise_variance,
                              kernel=kernel,
                              inducing_variable=X_init,
                              inducing_variable_time=X_time,
                              transform=transform)

get_hyperparameters()

Retrieves the current kernel and noise variance hyperparameters from the SGP model.

Returns:

Type Description
Tuple[Kernel, float]

Tuple[gpflow.kernels.Kernel, float]: A tuple containing a deep copy of the kernel and the noise variance.

Source code in sgptools/methods.py
def get_hyperparameters(self) -> Tuple[gpflow.kernels.Kernel, float]:
    """
    Retrieves the current kernel and noise variance hyperparameters from the SGP model.

    Returns:
        Tuple[gpflow.kernels.Kernel, float]: A tuple containing a deep copy of the kernel and the noise variance.
    """
    return deepcopy(self.sgpr.kernel), \
           self.sgpr.likelihood.variance.numpy()

optimize(max_steps=500, optimizer='scipy.L-BFGS-B', verbose=False, **kwargs)

Optimizes the inducing points of the SGP model.

Parameters:

Name Type Description Default
max_steps int

Maximum number of optimization steps. Defaults to 500.

500
optimizer str

Optimizer "." to use for training (e.g., 'scipy.L-BFGS-B', 'tf.adam'). Defaults to 'scipy.L-BFGS-B'.

'scipy.L-BFGS-B'
verbose bool

Verbosity, if True additional details will by reported. Defaults to False.

False
**kwargs Any

Additional keyword arguments for the optimizer.

{}

Returns:

Type Description
ndarray

np.ndarray: (num_robots, num_sensing, num_dim); Optimized inducing points (sensing locations).

Usage
# Assuming X_train, candidates, kernel_opt, noise_variance_opt are defined
csgp_method = ContinuousSGP(
    num_sensing=10,
    X_objective=dataset.X_train,
    kernel=kernel_opt,
    noise_variance=noise_variance_opt,
    transform=IPPTransform(num_robots=1), # Example transform
    X_candidates=candidates # Only if the solution needs to be mapped to candidates
)
optimized_solution = csgp_method.optimize(max_steps=500, optimizer='scipy.L-BFGS-B')
Source code in sgptools/methods.py
def optimize(self,
             max_steps: int = 500,
             optimizer: str = 'scipy.L-BFGS-B',
             verbose: bool = False,
             **kwargs: Any) -> np.ndarray:
    """
    Optimizes the inducing points of the SGP model.

    Args:
        max_steps (int): Maximum number of optimization steps. Defaults to 500.
        optimizer (str): Optimizer "<backend>.<method>" to use for training (e.g., 'scipy.L-BFGS-B', 'tf.adam').
                         Defaults to 'scipy.L-BFGS-B'.
        verbose (bool): Verbosity, if True additional details will by reported. Defaults to False.
        **kwargs: Additional keyword arguments for the optimizer.

    Returns:
        np.ndarray: (num_robots, num_sensing, num_dim); Optimized inducing points (sensing locations).

    Usage:
        ```python
        # Assuming X_train, candidates, kernel_opt, noise_variance_opt are defined
        csgp_method = ContinuousSGP(
            num_sensing=10,
            X_objective=dataset.X_train,
            kernel=kernel_opt,
            noise_variance=noise_variance_opt,
            transform=IPPTransform(num_robots=1), # Example transform
            X_candidates=candidates # Only if the solution needs to be mapped to candidates
        )
        optimized_solution = csgp_method.optimize(max_steps=500, optimizer='scipy.L-BFGS-B')
        ```
    """
    _ = optimize_model(
        self.sgpr,
        max_steps=max_steps,
        optimize_hparams=
        False,  # Inducing points are optimized, not kernel hyperparameters
        optimizer=optimizer,
        verbose=verbose,
        **kwargs)

    sol: tf.Tensor = self.sgpr.inducing_variable.Z
    try:
        sol_expanded = self.transform.expand(sol,
                                             expand_sensor_model=False)
    except TypeError:
        sol_expanded = sol
    if not isinstance(sol_expanded, np.ndarray):
        sol_np = sol_expanded.numpy()
    else:
        sol_np = sol_expanded

    # Map solution locations to candidates set locations if X_candidates is provided
    if self.X_candidates is not None:
        sol_np = cont2disc(sol_np, self.X_candidates)

    sol_np = sol_np.reshape(self.num_robots, -1, self.num_dim)
    return sol_np

update(kernel, noise_variance)

Updates the kernel and noise variance parameters of the SGP model.

Parameters:

Name Type Description Default
kernel Kernel

Updated GPflow kernel function.

required
noise_variance float

Updated data noise variance.

required
Source code in sgptools/methods.py
def update(self, kernel: gpflow.kernels.Kernel,
           noise_variance: float) -> None:
    """
    Updates the kernel and noise variance parameters of the SGP model.

    Args:
        kernel (gpflow.kernels.Kernel): Updated GPflow kernel function.
        noise_variance (float): Updated data noise variance.
    """
    self.sgpr.update(kernel, noise_variance)