Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
174 changes: 174 additions & 0 deletions 186. Gaussian_process_for_regression
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
import numpy as np
import math

# --- Kernels -------------------------------------------------------------

def linear_kernel(x, x_prime, sigma_b=0.0, sigma_v=1.0):
return sigma_b**2 + sigma_v**2 * np.dot(x, x_prime.T)

def rbf_kernel(x, x_prime, sigma=1.0, length_scale=1.0):
sqdist = np.sum((x[:, None] - x_prime[None, :]) ** 2, axis=2)
return sigma**2 * np.exp(-0.5 * sqdist / length_scale**2)

def matern_kernel(x, x_prime, length_scale=1.0, nu=1.5):
d = np.sqrt(np.sum((x[:, None] - x_prime[None, :]) ** 2, axis=2))
if nu == 0.5:
k = np.exp(-d / length_scale)
elif nu == 1.5:
sqrt3 = np.sqrt(3)
k = (1 + sqrt3 * d / length_scale) * np.exp(-sqrt3 * d / length_scale)
elif nu == 2.5:
sqrt5 = np.sqrt(5)
k = (1 + sqrt5 * d / length_scale + 5 * d**2 / (3 * length_scale**2)) * np.exp(-sqrt5 * d / length_scale)
else:
raise NotImplementedError
return k

def periodic_kernel(x, x_prime, sigma=1.0, length_scale=1.0, period=1.0):
diff = np.pi * np.abs(x[:, None] - x_prime[None, :]) / period
return sigma**2 * np.exp(-2 * (np.sin(diff) ** 2) / length_scale**2)

def rational_quadratic_kernel(x, x_prime, sigma=1.0, length_scale=1.0, alpha=1.0):
sqdist = np.sum((x[:, None] - x_prime[None, :]) ** 2, axis=2)
return sigma**2 * (1 + sqdist / (2 * alpha * length_scale**2)) ** (-alpha)

# --- Base Class -------------------------------------------------------------

class _GaussianProcessBase:
def __init__(self, kernel="rbf", noise=1e-5, kernel_params=None):
self.kernel_name = kernel
self.noise = noise
self.kernel_params = kernel_params or {}

def _select_kernel(self, x1, x2):
if self.kernel_name == "linear":
return linear_kernel(x1, x2, **self.kernel_params)
elif self.kernel_name == "rbf":
return rbf_kernel(x1, x2, **self.kernel_params)
elif self.kernel_name == "matern":
return matern_kernel(x1, x2, **self.kernel_params)
elif self.kernel_name == "periodic":
return periodic_kernel(x1, x2, **self.kernel_params)
elif self.kernel_name == "rational_quadratic":
return rational_quadratic_kernel(x1, x2, **self.kernel_params)
else:
raise ValueError("Unknown kernel")

def _compute_covariance(self, X1, X2):
return self._select_kernel(X1, X2)

# --- Gaussian Process Regression --------------------------------------------

class GaussianProcessRegression(_GaussianProcessBase):
def fit(self, X, y):
self.X_train = np.array(X)
self.y_train = np.array(y)
K = self._compute_covariance(self.X_train, self.X_train)
self.K = K + self.noise * np.eye(len(X))
self.K_inv = np.linalg.inv(self.K)

def predict(self, X_test, return_std=False):
X_test = np.array(X_test)
K_s = self._compute_covariance(X_test, self.X_train)
K_ss = self._compute_covariance(X_test, X_test) + 1e-8 * np.eye(len(X_test))
mu = K_s @ self.K_inv @ self.y_train
if return_std:
cov = K_ss - K_s @ self.K_inv @ K_s.T
std = np.sqrt(np.diag(cov))
return mu, std
return mu

def log_marginal_likelihood(self):
n = len(self.y_train)
sign, logdet = np.linalg.slogdet(self.K)
return -0.5 * self.y_train.T @ self.K_inv @ self.y_train - 0.5 * logdet - 0.5 * n * np.log(2 * np.pi)

def optimize_hyperparameters(self):
pass # Not needed for this task
import numpy as np
import math

# --- Kernels -------------------------------------------------------------

def linear_kernel(x, x_prime, sigma_b=0.0, sigma_v=1.0):
return sigma_b**2 + sigma_v**2 * np.dot(x, x_prime.T)

def rbf_kernel(x, x_prime, sigma=1.0, length_scale=1.0):
sqdist = np.sum((x[:, None] - x_prime[None, :]) ** 2, axis=2)
return sigma**2 * np.exp(-0.5 * sqdist / length_scale**2)

def matern_kernel(x, x_prime, length_scale=1.0, nu=1.5):
d = np.sqrt(np.sum((x[:, None] - x_prime[None, :]) ** 2, axis=2))
if nu == 0.5:
k = np.exp(-d / length_scale)
elif nu == 1.5:
sqrt3 = np.sqrt(3)
k = (1 + sqrt3 * d / length_scale) * np.exp(-sqrt3 * d / length_scale)
elif nu == 2.5:
sqrt5 = np.sqrt(5)
k = (1 + sqrt5 * d / length_scale + 5 * d**2 / (3 * length_scale**2)) * np.exp(-sqrt5 * d / length_scale)
else:
raise NotImplementedError
return k

def periodic_kernel(x, x_prime, sigma=1.0, length_scale=1.0, period=1.0):
diff = np.pi * np.abs(x[:, None] - x_prime[None, :]) / period
return sigma**2 * np.exp(-2 * (np.sin(diff) ** 2) / length_scale**2)

def rational_quadratic_kernel(x, x_prime, sigma=1.0, length_scale=1.0, alpha=1.0):
sqdist = np.sum((x[:, None] - x_prime[None, :]) ** 2, axis=2)
return sigma**2 * (1 + sqdist / (2 * alpha * length_scale**2)) ** (-alpha)

# --- Base Class -------------------------------------------------------------

class _GaussianProcessBase:
def __init__(self, kernel="rbf", noise=1e-5, kernel_params=None):
self.kernel_name = kernel
self.noise = noise
self.kernel_params = kernel_params or {}

def _select_kernel(self, x1, x2):
if self.kernel_name == "linear":
return linear_kernel(x1, x2, **self.kernel_params)
elif self.kernel_name == "rbf":
return rbf_kernel(x1, x2, **self.kernel_params)
elif self.kernel_name == "matern":
return matern_kernel(x1, x2, **self.kernel_params)
elif self.kernel_name == "periodic":
return periodic_kernel(x1, x2, **self.kernel_params)
elif self.kernel_name == "rational_quadratic":
return rational_quadratic_kernel(x1, x2, **self.kernel_params)
else:
raise ValueError("Unknown kernel")

def _compute_covariance(self, X1, X2):
return self._select_kernel(X1, X2)

# --- Gaussian Process Regression --------------------------------------------

class GaussianProcessRegression(_GaussianProcessBase):
def fit(self, X, y):
self.X_train = np.array(X)
self.y_train = np.array(y)
K = self._compute_covariance(self.X_train, self.X_train)
self.K = K + self.noise * np.eye(len(X))
self.K_inv = np.linalg.inv(self.K)

def predict(self, X_test, return_std=False):
X_test = np.array(X_test)
K_s = self._compute_covariance(X_test, self.X_train)
K_ss = self._compute_covariance(X_test, X_test) + 1e-8 * np.eye(len(X_test))
mu = K_s @ self.K_inv @ self.y_train
if return_std:
cov = K_ss - K_s @ self.K_inv @ K_s.T
std = np.sqrt(np.diag(cov))
return mu, std
return mu

def log_marginal_likelihood(self):
n = len(self.y_train)
sign, logdet = np.linalg.slogdet(self.K)
return -0.5 * self.y_train.T @ self.K_inv @ self.y_train - 0.5 * logdet - 0.5 * n * np.log(2 * np.pi)

def optimize_hyperparameters(self):
pass # Not needed for this task