diff --git a/pycti/api/opencti_api_client.py b/pycti/api/opencti_api_client.py index 5729f6d0e..69278be60 100644 --- a/pycti/api/opencti_api_client.py +++ b/pycti/api/opencti_api_client.py @@ -1,8 +1,14 @@ # coding: utf-8 +import atexit import base64 import datetime import io import json +import os +import shutil +import signal +import tempfile +import threading from typing import Dict, Tuple, Union import magic @@ -78,6 +84,12 @@ from pycti.utils.opencti_stix2 import OpenCTIStix2 from pycti.utils.opencti_stix2_utils import OpenCTIStix2Utils +# Global singleton variables for proxy certificate management +_PROXY_CERT_BUNDLE = None +_PROXY_CERT_DIR = None +_PROXY_CERT_LOCK = threading.Lock() +_PROXY_SIGNAL_HANDLERS_REGISTERED = False + def build_request_headers(token: str, custom_headers: str, app_logger): headers_dict = { @@ -166,6 +178,9 @@ def __init__( self.app_logger = self.logger_class("api") self.admin_logger = self.logger_class("admin") + # Setup proxy certificates if provided + self._setup_proxy_certificates() + # Define API self.api_token = token self.api_url = url + "/graphql" @@ -249,6 +264,147 @@ def __init__( "OpenCTI API is not reachable. Waiting for OpenCTI API to start or check your configuration..." ) + def _setup_proxy_certificates(self): + """Setup HTTPS proxy certificates from environment variable. + + Detects HTTPS_CA_CERTIFICATES environment variable and combines + proxy certificates with system certificates for SSL verification. + Supports both inline certificate content and file paths. + + Uses a singleton pattern to ensure only one certificate bundle is created + across all instances, avoiding resource leaks and conflicts. + """ + global _PROXY_CERT_BUNDLE, _PROXY_CERT_DIR, _PROXY_SIGNAL_HANDLERS_REGISTERED + + https_ca_certificates = os.getenv("HTTPS_CA_CERTIFICATES") + if not https_ca_certificates: + return + + # Thread-safe check and setup + with _PROXY_CERT_LOCK: + # If already configured, reuse existing bundle + if _PROXY_CERT_BUNDLE is not None: + self.ssl_verify = _PROXY_CERT_BUNDLE + self.app_logger.debug( + "Reusing existing proxy certificate bundle", + {"cert_bundle": _PROXY_CERT_BUNDLE}, + ) + return + + # First initialization - create the certificate bundle + try: + # Create secure temporary directory + cert_dir = tempfile.mkdtemp(prefix="opencti_proxy_certs_") + + # Determine if HTTPS_CA_CERTIFICATES contains inline content or file path + cert_content = self._get_certificate_content(https_ca_certificates) + + # Write proxy certificate to temp file + proxy_cert_file = os.path.join(cert_dir, "proxy-ca.crt") + with open(proxy_cert_file, "w") as f: + f.write(cert_content) + + # Find system certificates + system_cert_paths = [ + "/etc/ssl/certs/ca-certificates.crt", # Debian/Ubuntu + "/etc/pki/tls/certs/ca-bundle.crt", # RHEL/CentOS + "/etc/ssl/cert.pem", # Alpine/BSD + ] + + # Create combined certificate bundle + combined_cert_file = os.path.join(cert_dir, "combined-ca-bundle.crt") + with open(combined_cert_file, "w") as combined: + # Add system certificates first + for system_path in system_cert_paths: + if os.path.exists(system_path): + with open(system_path, "r") as sys_certs: + combined.write(sys_certs.read()) + combined.write("\n") + break + + # Add proxy certificate + combined.write(cert_content) + + # Update global singleton variables + _PROXY_CERT_BUNDLE = combined_cert_file + _PROXY_CERT_DIR = cert_dir + self.ssl_verify = combined_cert_file + + # Set environment variables for urllib and other libraries + os.environ["REQUESTS_CA_BUNDLE"] = combined_cert_file + os.environ["SSL_CERT_FILE"] = combined_cert_file + + # Register cleanup handlers only once + atexit.register(_cleanup_proxy_certificates) + + # Register signal handlers only once + if not _PROXY_SIGNAL_HANDLERS_REGISTERED: + signal.signal(signal.SIGTERM, _signal_handler_proxy_cleanup) + signal.signal(signal.SIGINT, _signal_handler_proxy_cleanup) + _PROXY_SIGNAL_HANDLERS_REGISTERED = True + + self.app_logger.info( + "Proxy certificates configured", + {"cert_bundle": combined_cert_file}, + ) + + except Exception as e: + self.app_logger.error( + "Failed to setup proxy certificates", {"error": str(e)} + ) + raise + + def _get_certificate_content(self, https_ca_certificates): + """Extract certificate content from environment variable. + + Supports both inline certificate content (PEM format) and file paths. + + :param https_ca_certificates: Content from HTTPS_CA_CERTIFICATES env var + :type https_ca_certificates: str + :return: Certificate content in PEM format + :rtype: str + :raises ValueError: If the certificate content is invalid or cannot be read + """ + # Strip whitespace once at the beginning + stripped_https_ca_certificates = https_ca_certificates.strip() + + # Check if it's inline certificate content (starts with PEM header) + if stripped_https_ca_certificates.startswith("-----BEGIN CERTIFICATE-----"): + self.app_logger.debug( + "HTTPS_CA_CERTIFICATES contains inline certificate content" + ) + return https_ca_certificates + + # Check if it's a file path + if os.path.isfile(stripped_https_ca_certificates): + cert_file_path = stripped_https_ca_certificates + try: + with open(cert_file_path, "r") as f: + cert_content = f.read() + # Validate it's actually a certificate + if "-----BEGIN CERTIFICATE-----" in cert_content: + self.app_logger.debug( + "HTTPS_CA_CERTIFICATES contains valid certificate file path", + {"file_path": cert_file_path}, + ) + return cert_content + else: + raise ValueError( + f"File at HTTPS_CA_CERTIFICATES path does not contain valid certificate: {cert_file_path}" + ) + except ValueError: + # Re-raise ValueError from certificate validation + raise + except Exception as e: + raise ValueError( + f"Failed to read certificate file at {cert_file_path}: {str(e)}" + ) + + # Neither inline content nor valid file path + raise ValueError( + f"HTTPS_CA_CERTIFICATES is not a valid certificate or file path: {https_ca_certificates[:50]}..." + ) + def set_applicant_id_header(self, applicant_id): self.request_headers["opencti-applicant-id"] = applicant_id @@ -884,3 +1040,33 @@ def get_attribute_in_mitre_extension(key, object) -> any: "extension-definition--322b8f77-262a-4cb8-a915-1e441e00329b" ][key] return None + + +# Global cleanup functions for proxy certificates singleton +def _cleanup_proxy_certificates(): + """Clean up temporary certificate directory for proxy certificates. + + This function is called on normal program exit via atexit. + """ + global _PROXY_CERT_DIR + if _PROXY_CERT_DIR and os.path.exists(_PROXY_CERT_DIR): + try: + shutil.rmtree(_PROXY_CERT_DIR) + except Exception: + # Silently fail cleanup - best effort + pass + finally: + _PROXY_CERT_DIR = None + + +def _signal_handler_proxy_cleanup(signum, frame): + """Handle termination signals (SIGTERM/SIGINT) for proxy certificate cleanup. + + Performs cleanup and then raises SystemExit to allow + normal shutdown procedures to complete. + + :param signum: Signal number + :param frame: Current stack frame + """ + _cleanup_proxy_certificates() + raise SystemExit(0) diff --git a/tests/01-unit/api/__init__.py b/tests/01-unit/api/__init__.py new file mode 100644 index 000000000..78c08441c --- /dev/null +++ b/tests/01-unit/api/__init__.py @@ -0,0 +1 @@ +# Unit tests for API client functionality diff --git a/tests/01-unit/api/test_opencti_api_client-proxy.py b/tests/01-unit/api/test_opencti_api_client-proxy.py new file mode 100644 index 000000000..8996257d5 --- /dev/null +++ b/tests/01-unit/api/test_opencti_api_client-proxy.py @@ -0,0 +1,290 @@ +import os +import tempfile +from unittest.mock import MagicMock, mock_open, patch + +import pytest + +from pycti import OpenCTIApiClient + + +class TestOpenCTIApiClient: + """Test OpenCTIApiClient certificate handling functionality.""" + + SAMPLE_CERTIFICATE = """-----BEGIN CERTIFICATE----- +MIIDXTCCAkWgAwIBAgIJAKLdQVPy90WjMA0GCSqGSIb3DQEBCwUAMEUxCzAJBgNV +BAYTAkFVMRMwEQYDVQQIDApTb21lLVN0YXRlMSEwHwYDVQQKDBhJbnRlcm5ldCBX +aWRnaXRzIFB0eSBMdGQwHhcNMjQwMTAxMDAwMDAwWhcNMjUwMTAxMDAwMDAwWjBF +MQswCQYDVQQGEwJBVTETMBEGA1UECAwKU29tZS1TdGF0ZTEhMB8GA1UECgwYSW50 +ZXJuZXQgV2lkZ2l0cyBQdHkgTHRkMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIB +CgKCAQEA0Z3VS5JJcds3xfn/ygWyF0qJDr9oYRH/9dMfqHCOq45DqMVJLJBJnMzN +-----END CERTIFICATE-----""" + + INVALID_CONTENT = "This is not a certificate" + + @pytest.fixture + def api_client(self): + """Create an API client instance without performing health check.""" + with patch.object(OpenCTIApiClient, "_setup_proxy_certificates"): + client = OpenCTIApiClient( + url="http://localhost:4000", + token="test-token", + ssl_verify=False, + perform_health_check=False, + ) + # Mock the logger + client.app_logger = MagicMock() + return client + + def test_get_certificate_content_inline_pem(self, api_client): + """Test _get_certificate_content with inline PEM certificate.""" + result = api_client._get_certificate_content(self.SAMPLE_CERTIFICATE) + + assert result == self.SAMPLE_CERTIFICATE + api_client.app_logger.debug.assert_called_with( + "HTTPS_CA_CERTIFICATES contains inline certificate content" + ) + + def test_get_certificate_content_file_path(self, api_client): + """Test _get_certificate_content with a file path containing certificate.""" + # Create a temporary file with certificate content + with tempfile.NamedTemporaryFile( + mode="w", suffix=".crt", delete=False + ) as cert_file: + cert_file.write(self.SAMPLE_CERTIFICATE) + cert_file_path = cert_file.name + + try: + result = api_client._get_certificate_content(cert_file_path) + + assert result == self.SAMPLE_CERTIFICATE + api_client.app_logger.debug.assert_called_with( + "HTTPS_CA_CERTIFICATES contains valid certificate file path", + {"file_path": cert_file_path}, + ) + finally: + # Clean up + os.unlink(cert_file_path) + + def test_get_certificate_content_invalid_file_content(self, api_client): + """Test _get_certificate_content with a file containing invalid certificate.""" + # Create a temporary file with invalid content + with tempfile.NamedTemporaryFile( + mode="w", suffix=".txt", delete=False + ) as invalid_file: + invalid_file.write(self.INVALID_CONTENT) + invalid_file_path = invalid_file.name + + try: + with pytest.raises( + ValueError, + match="File at HTTPS_CA_CERTIFICATES path does not contain valid certificate", + ): + api_client._get_certificate_content(invalid_file_path) + finally: + # Clean up + os.unlink(invalid_file_path) + + def test_get_certificate_content_nonexistent_file(self, api_client): + """Test _get_certificate_content with a nonexistent file path.""" + nonexistent_path = "/tmp/nonexistent_certificate.crt" + + with pytest.raises( + ValueError, + match="HTTPS_CA_CERTIFICATES is not a valid certificate or file path", + ): + api_client._get_certificate_content(nonexistent_path) + + def test_get_certificate_content_invalid_content(self, api_client): + """Test _get_certificate_content with invalid content (not PEM, not file).""" + with pytest.raises( + ValueError, + match="HTTPS_CA_CERTIFICATES is not a valid certificate or file path", + ): + api_client._get_certificate_content(self.INVALID_CONTENT) + + def test_get_certificate_content_whitespace_handling(self, api_client): + """Test _get_certificate_content handles whitespace correctly.""" + # Test with certificate content with leading/trailing whitespace + cert_with_whitespace = f" \n{self.SAMPLE_CERTIFICATE} \n" + result = api_client._get_certificate_content(cert_with_whitespace) + + assert result == cert_with_whitespace # Should return as-is + api_client.app_logger.debug.assert_called_with( + "HTTPS_CA_CERTIFICATES contains inline certificate content" + ) + + @patch.dict(os.environ, {"HTTPS_CA_CERTIFICATES": ""}) + def test_setup_proxy_certificates_no_env(self, api_client): + """Test _setup_proxy_certificates when HTTPS_CA_CERTIFICATES is not set.""" + api_client._setup_proxy_certificates() + + # Should return early without setting ssl_verify + assert not hasattr(api_client, "ssl_verify") or api_client.ssl_verify is False + + @patch.dict(os.environ, {}) + def test_setup_proxy_certificates_env_not_present(self, api_client): + """Test _setup_proxy_certificates when HTTPS_CA_CERTIFICATES env var doesn't exist.""" + api_client._setup_proxy_certificates() + + # Should return early without setting ssl_verify + assert not hasattr(api_client, "ssl_verify") or api_client.ssl_verify is False + + @patch("tempfile.mkdtemp") + @patch("os.path.isfile") + @patch("builtins.open", new_callable=mock_open) + @patch.dict( + os.environ, + { + "HTTPS_CA_CERTIFICATES": "-----BEGIN CERTIFICATE-----\ntest\n-----END CERTIFICATE-----" + }, + ) + def test_setup_proxy_certificates_with_inline_cert( + self, mock_file, mock_isfile, mock_mkdtemp, api_client + ): + """Test _setup_proxy_certificates with inline certificate content.""" + # Setup mocks + mock_mkdtemp.return_value = "/tmp/test_certs" + mock_isfile.side_effect = ( + lambda path: path == "/etc/ssl/certs/ca-certificates.crt" + ) + + # Mock system certificate content + system_cert_content = ( + "-----BEGIN CERTIFICATE-----\nsystem\n-----END CERTIFICATE-----" + ) + + def open_side_effect(path, mode="r"): + if path == "/etc/ssl/certs/ca-certificates.crt" and mode == "r": + return mock_open(read_data=system_cert_content)() + return mock_file() + + with patch("builtins.open", side_effect=open_side_effect): + api_client._setup_proxy_certificates() + + # Verify proxy certificates were processed + api_client.app_logger.info.assert_called() + + @patch("tempfile.mkdtemp") + @patch.dict(os.environ, {"HTTPS_CA_CERTIFICATES": "/path/to/cert.crt"}) + def test_setup_proxy_certificates_with_invalid_path(self, mock_mkdtemp, api_client): + """Test _setup_proxy_certificates with invalid certificate file path.""" + from pycti.api import opencti_api_client + + # Reset global state to ensure clean test + opencti_api_client._PROXY_CERT_BUNDLE = None + opencti_api_client._PROXY_CERT_DIR = None + + mock_mkdtemp.return_value = "/tmp/test_certs" + + # Mock _get_certificate_content to raise ValueError (invalid) + with patch.object( + api_client, + "_get_certificate_content", + side_effect=ValueError("Invalid certificate"), + ): + with pytest.raises(ValueError, match="Invalid certificate"): + api_client._setup_proxy_certificates() + + # Should log error before raising + api_client.app_logger.error.assert_called_with( + "Failed to setup proxy certificates", {"error": "Invalid certificate"} + ) + + # Cleanup + opencti_api_client._PROXY_CERT_BUNDLE = None + opencti_api_client._PROXY_CERT_DIR = None + + def test_setup_proxy_certificates_exception_handling(self, api_client): + """Test _setup_proxy_certificates raises exception on error.""" + from pycti.api import opencti_api_client + + # Reset global state to ensure clean test + opencti_api_client._PROXY_CERT_BUNDLE = None + opencti_api_client._PROXY_CERT_DIR = None + + with patch.dict(os.environ, {"HTTPS_CA_CERTIFICATES": self.SAMPLE_CERTIFICATE}): + with patch("tempfile.mkdtemp", side_effect=Exception("Mock error")): + with pytest.raises(Exception, match="Mock error"): + api_client._setup_proxy_certificates() + + # Should log error before raising + api_client.app_logger.error.assert_called_with( + "Failed to setup proxy certificates", {"error": "Mock error"} + ) + + # Cleanup + opencti_api_client._PROXY_CERT_BUNDLE = None + opencti_api_client._PROXY_CERT_DIR = None + + def test_cleanup_proxy_certificates_successful(self): + """Test _cleanup_proxy_certificates successfully removes temporary directory.""" + from pycti.api import opencti_api_client + + # Create a real temporary directory + temp_dir = tempfile.mkdtemp(prefix="opencti_test_") + opencti_api_client._PROXY_CERT_DIR = temp_dir + + # Call cleanup + opencti_api_client._cleanup_proxy_certificates() + + # Verify directory was removed and _PROXY_CERT_DIR reset + assert not os.path.exists(temp_dir) + assert opencti_api_client._PROXY_CERT_DIR is None + + @patch("pycti.api.opencti_api_client.shutil.rmtree") + @patch("pycti.api.opencti_api_client.os.path.exists") + def test_cleanup_proxy_certificates_with_error(self, mock_exists, mock_rmtree): + """Test _cleanup_proxy_certificates handles errors during removal.""" + from pycti.api import opencti_api_client + + temp_dir = "/tmp/opencti_test_certs" + opencti_api_client._PROXY_CERT_DIR = temp_dir + mock_exists.return_value = True + mock_rmtree.side_effect = OSError("Permission denied") + + # Call cleanup - should not raise exception + opencti_api_client._cleanup_proxy_certificates() + + # Should reset _PROXY_CERT_DIR + assert opencti_api_client._PROXY_CERT_DIR is None + + def test_singleton_behavior_multiple_instances(self): + """Test that multiple instances reuse the same certificate bundle.""" + from pycti.api import opencti_api_client + + # Reset global state + opencti_api_client._PROXY_CERT_BUNDLE = None + opencti_api_client._PROXY_CERT_DIR = None + + with patch.dict(os.environ, {"HTTPS_CA_CERTIFICATES": self.SAMPLE_CERTIFICATE}): + with patch("tempfile.mkdtemp", return_value="/tmp/test_certs"): + with patch("builtins.open", mock_open()): + with patch("os.path.exists", return_value=True): + # Create first instance + client1 = OpenCTIApiClient( + url="http://localhost:4000", + token="test-token", + ssl_verify=False, + perform_health_check=False, + ) + + # Verify certificate bundle was created + assert opencti_api_client._PROXY_CERT_BUNDLE is not None + first_bundle = opencti_api_client._PROXY_CERT_BUNDLE + assert client1.ssl_verify == first_bundle + + # Create second instance + client2 = OpenCTIApiClient( + url="http://localhost:4000", + token="test-token2", + ssl_verify=False, + perform_health_check=False, + ) + + # Verify same bundle is reused + assert opencti_api_client._PROXY_CERT_BUNDLE == first_bundle + assert client2.ssl_verify == first_bundle + + # Cleanup + opencti_api_client._PROXY_CERT_BUNDLE = None + opencti_api_client._PROXY_CERT_DIR = None