From 425e127bc0578071c801b2c3732c7703898de7b0 Mon Sep 17 00:00:00 2001 From: Mael Van den Plas Date: Wed, 29 Oct 2025 11:59:39 +0100 Subject: [PATCH 1/6] [client] Add inline proxy certificate support for HTTPS connections --- pycti/api/opencti_api_client.py | 62 +++++++++++++++++++++++++++++++++ 1 file changed, 62 insertions(+) diff --git a/pycti/api/opencti_api_client.py b/pycti/api/opencti_api_client.py index 5729f6d0e..f5afaef21 100644 --- a/pycti/api/opencti_api_client.py +++ b/pycti/api/opencti_api_client.py @@ -3,6 +3,8 @@ import datetime import io import json +import os +import tempfile from typing import Dict, Tuple, Union import magic @@ -166,6 +168,9 @@ def __init__( self.app_logger = self.logger_class("api") self.admin_logger = self.logger_class("admin") + # Setup proxy certificates if provided + self._setup_proxy_certificates() + # Define API self.api_token = token self.api_url = url + "/graphql" @@ -249,6 +254,63 @@ def __init__( "OpenCTI API is not reachable. Waiting for OpenCTI API to start or check your configuration..." ) + def _setup_proxy_certificates(self): + """Setup HTTPS proxy certificates from environment variable. + + Detects HTTPS_CA_CERTIFICATES environment variable and combines + proxy certificates with system certificates for SSL verification. + """ + https_ca_certificates = os.getenv("HTTPS_CA_CERTIFICATES") + if not https_ca_certificates: + return + + try: + # Create secure temporary directory + cert_dir = tempfile.mkdtemp(prefix="opencti_proxy_certs_") + + # Write proxy certificate to temp file + proxy_cert_file = os.path.join(cert_dir, "proxy-ca.crt") + with open(proxy_cert_file, "w") as f: + f.write(https_ca_certificates) + + # Find system certificates + system_cert_paths = [ + "/etc/ssl/certs/ca-certificates.crt", # Debian/Ubuntu + "/etc/pki/tls/certs/ca-bundle.crt", # RHEL/CentOS + "/etc/ssl/cert.pem", # Alpine/BSD + ] + + # Create combined certificate bundle + combined_cert_file = os.path.join(cert_dir, "combined-ca-bundle.crt") + with open(combined_cert_file, "w") as combined: + # Add system certificates first + for system_path in system_cert_paths: + if os.path.exists(system_path): + with open(system_path, "r") as sys_certs: + combined.write(sys_certs.read()) + combined.write("\n") + break + + # Add proxy certificate + combined.write(https_ca_certificates) + + # Update ssl_verify to use combined certificate bundle + self.ssl_verify = combined_cert_file + + # Set environment variables for urllib and other libraries + os.environ["REQUESTS_CA_BUNDLE"] = combined_cert_file + os.environ["SSL_CERT_FILE"] = combined_cert_file + + self.app_logger.info( + "Proxy certificates configured", + {"cert_bundle": combined_cert_file}, + ) + + except Exception as e: + self.app_logger.warning( + "Failed to setup proxy certificates", {"error": str(e)} + ) + def set_applicant_id_header(self, applicant_id): self.request_headers["opencti-applicant-id"] = applicant_id From e33b77c2f790d48bd1bd1167b5910fa5fbca29bd Mon Sep 17 00:00:00 2001 From: Mael Van den Plas Date: Thu, 30 Oct 2025 09:23:01 +0100 Subject: [PATCH 2/6] handle case when HTTPS_CA_CERTIFICATES is a filepath --- pycti/api/opencti_api_client.py | 66 ++++++++++++++++++++++++++++++++- 1 file changed, 64 insertions(+), 2 deletions(-) diff --git a/pycti/api/opencti_api_client.py b/pycti/api/opencti_api_client.py index f5afaef21..4250ead00 100644 --- a/pycti/api/opencti_api_client.py +++ b/pycti/api/opencti_api_client.py @@ -259,6 +259,7 @@ def _setup_proxy_certificates(self): Detects HTTPS_CA_CERTIFICATES environment variable and combines proxy certificates with system certificates for SSL verification. + Supports both inline certificate content and file paths. """ https_ca_certificates = os.getenv("HTTPS_CA_CERTIFICATES") if not https_ca_certificates: @@ -268,10 +269,25 @@ def _setup_proxy_certificates(self): # Create secure temporary directory cert_dir = tempfile.mkdtemp(prefix="opencti_proxy_certs_") + # Determine if HTTPS_CA_CERTIFICATES contains inline content or file path + cert_content = self._get_certificate_content(https_ca_certificates) + if not cert_content: + self.app_logger.warning( + "Invalid HTTPS_CA_CERTIFICATES: not a valid certificate or file path", + { + "value": ( + https_ca_certificates[:50] + "..." + if len(https_ca_certificates) > 50 + else https_ca_certificates + ) + }, + ) + return + # Write proxy certificate to temp file proxy_cert_file = os.path.join(cert_dir, "proxy-ca.crt") with open(proxy_cert_file, "w") as f: - f.write(https_ca_certificates) + f.write(cert_content) # Find system certificates system_cert_paths = [ @@ -292,7 +308,7 @@ def _setup_proxy_certificates(self): break # Add proxy certificate - combined.write(https_ca_certificates) + combined.write(cert_content) # Update ssl_verify to use combined certificate bundle self.ssl_verify = combined_cert_file @@ -311,6 +327,52 @@ def _setup_proxy_certificates(self): "Failed to setup proxy certificates", {"error": str(e)} ) + def _get_certificate_content(self, https_ca_certificates): + """Extract certificate content from environment variable. + + Supports both inline certificate content (PEM format) and file paths. + + :param https_ca_certificates: Content from HTTPS_CA_CERTIFICATES env var + :type https_ca_certificates: str + :return: Certificate content in PEM format or None if invalid + :rtype: str or None + """ + # Check if it's inline certificate content (starts with PEM header) + if https_ca_certificates.strip().startswith("-----BEGIN CERTIFICATE-----"): + self.app_logger.debug( + "HTTPS_CA_CERTIFICATES contains inline certificate content" + ) + return https_ca_certificates + + # Check if it's a file path + if os.path.isfile(https_ca_certificates.strip()): + cert_file_path = https_ca_certificates.strip() + try: + with open(cert_file_path, "r") as f: + cert_content = f.read() + # Validate it's actually a certificate + if "-----BEGIN CERTIFICATE-----" in cert_content: + self.app_logger.debug( + "HTTPS_CA_CERTIFICATES contains valid certificate file path", + {"file_path": cert_file_path}, + ) + return cert_content + else: + self.app_logger.warning( + "File at HTTPS_CA_CERTIFICATES path does not contain valid certificate", + {"file_path": cert_file_path}, + ) + return None + except Exception as e: + self.app_logger.warning( + "Failed to read certificate file", + {"file_path": cert_file_path, "error": str(e)}, + ) + return None + + # Neither inline content nor valid file path + return None + def set_applicant_id_header(self, applicant_id): self.request_headers["opencti-applicant-id"] = applicant_id From 7f4923a14536a81fa6c6e6546bf7787a9226ebc2 Mon Sep 17 00:00:00 2001 From: Mael Van den Plas Date: Fri, 31 Oct 2025 16:32:11 +0100 Subject: [PATCH 3/6] add unit test for proxy cert validation --- tests/01-unit/api/__init__.py | 1 + .../api/test_opencti_api_client-proxy.py | 188 ++++++++++++++++++ 2 files changed, 189 insertions(+) create mode 100644 tests/01-unit/api/__init__.py create mode 100644 tests/01-unit/api/test_opencti_api_client-proxy.py diff --git a/tests/01-unit/api/__init__.py b/tests/01-unit/api/__init__.py new file mode 100644 index 000000000..78c08441c --- /dev/null +++ b/tests/01-unit/api/__init__.py @@ -0,0 +1 @@ +# Unit tests for API client functionality diff --git a/tests/01-unit/api/test_opencti_api_client-proxy.py b/tests/01-unit/api/test_opencti_api_client-proxy.py new file mode 100644 index 000000000..76a384531 --- /dev/null +++ b/tests/01-unit/api/test_opencti_api_client-proxy.py @@ -0,0 +1,188 @@ +import os +import tempfile +from unittest.mock import MagicMock, mock_open, patch + +import pytest + +from pycti import OpenCTIApiClient + + +class TestOpenCTIApiClient: + """Test OpenCTIApiClient certificate handling functionality.""" + + SAMPLE_CERTIFICATE = """-----BEGIN CERTIFICATE----- +MIIDXTCCAkWgAwIBAgIJAKLdQVPy90WjMA0GCSqGSIb3DQEBCwUAMEUxCzAJBgNV +BAYTAkFVMRMwEQYDVQQIDApTb21lLVN0YXRlMSEwHwYDVQQKDBhJbnRlcm5ldCBX +aWRnaXRzIFB0eSBMdGQwHhcNMjQwMTAxMDAwMDAwWhcNMjUwMTAxMDAwMDAwWjBF +MQswCQYDVQQGEwJBVTETMBEGA1UECAwKU29tZS1TdGF0ZTEhMB8GA1UECgwYSW50 +ZXJuZXQgV2lkZ2l0cyBQdHkgTHRkMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIB +CgKCAQEA0Z3VS5JJcds3xfn/ygWyF0qJDr9oYRH/9dMfqHCOq45DqMVJLJBJnMzN +-----END CERTIFICATE-----""" + + INVALID_CONTENT = "This is not a certificate" + + @pytest.fixture + def api_client(self): + """Create an API client instance without performing health check.""" + with patch.object(OpenCTIApiClient, "_setup_proxy_certificates"): + client = OpenCTIApiClient( + url="http://localhost:4000", + token="test-token", + ssl_verify=False, + perform_health_check=False, + ) + # Mock the logger + client.app_logger = MagicMock() + return client + + def test_get_certificate_content_inline_pem(self, api_client): + """Test _get_certificate_content with inline PEM certificate.""" + result = api_client._get_certificate_content(self.SAMPLE_CERTIFICATE) + + assert result == self.SAMPLE_CERTIFICATE + api_client.app_logger.debug.assert_called_with( + "HTTPS_CA_CERTIFICATES contains inline certificate content" + ) + + def test_get_certificate_content_file_path(self, api_client): + """Test _get_certificate_content with a file path containing certificate.""" + # Create a temporary file with certificate content + with tempfile.NamedTemporaryFile( + mode="w", suffix=".crt", delete=False + ) as cert_file: + cert_file.write(self.SAMPLE_CERTIFICATE) + cert_file_path = cert_file.name + + try: + result = api_client._get_certificate_content(cert_file_path) + + assert result == self.SAMPLE_CERTIFICATE + api_client.app_logger.debug.assert_called_with( + "HTTPS_CA_CERTIFICATES contains valid certificate file path", + {"file_path": cert_file_path}, + ) + finally: + # Clean up + os.unlink(cert_file_path) + + def test_get_certificate_content_invalid_file_content(self, api_client): + """Test _get_certificate_content with a file containing invalid certificate.""" + # Create a temporary file with invalid content + with tempfile.NamedTemporaryFile( + mode="w", suffix=".txt", delete=False + ) as invalid_file: + invalid_file.write(self.INVALID_CONTENT) + invalid_file_path = invalid_file.name + + try: + result = api_client._get_certificate_content(invalid_file_path) + + assert result is None + api_client.app_logger.warning.assert_called_with( + "File at HTTPS_CA_CERTIFICATES path does not contain valid certificate", + {"file_path": invalid_file_path}, + ) + finally: + # Clean up + os.unlink(invalid_file_path) + + def test_get_certificate_content_nonexistent_file(self, api_client): + """Test _get_certificate_content with a nonexistent file path.""" + nonexistent_path = "/tmp/nonexistent_certificate.crt" + + result = api_client._get_certificate_content(nonexistent_path) + + assert result is None + + def test_get_certificate_content_invalid_content(self, api_client): + """Test _get_certificate_content with invalid content (not PEM, not file).""" + result = api_client._get_certificate_content(self.INVALID_CONTENT) + + assert result is None + + def test_get_certificate_content_whitespace_handling(self, api_client): + """Test _get_certificate_content handles whitespace correctly.""" + # Test with certificate content with leading/trailing whitespace + cert_with_whitespace = f" \n{self.SAMPLE_CERTIFICATE} \n" + result = api_client._get_certificate_content(cert_with_whitespace) + + assert result == cert_with_whitespace # Should return as-is + api_client.app_logger.debug.assert_called_with( + "HTTPS_CA_CERTIFICATES contains inline certificate content" + ) + + @patch.dict(os.environ, {"HTTPS_CA_CERTIFICATES": ""}) + def test_setup_proxy_certificates_no_env(self, api_client): + """Test _setup_proxy_certificates when HTTPS_CA_CERTIFICATES is not set.""" + api_client._setup_proxy_certificates() + + # Should return early without setting ssl_verify + assert not hasattr(api_client, "ssl_verify") or api_client.ssl_verify is False + + @patch.dict(os.environ, {}) + def test_setup_proxy_certificates_env_not_present(self, api_client): + """Test _setup_proxy_certificates when HTTPS_CA_CERTIFICATES env var doesn't exist.""" + api_client._setup_proxy_certificates() + + # Should return early without setting ssl_verify + assert not hasattr(api_client, "ssl_verify") or api_client.ssl_verify is False + + @patch("tempfile.mkdtemp") + @patch("os.path.isfile") + @patch("builtins.open", new_callable=mock_open) + @patch.dict( + os.environ, + { + "HTTPS_CA_CERTIFICATES": "-----BEGIN CERTIFICATE-----\ntest\n-----END CERTIFICATE-----" + }, + ) + def test_setup_proxy_certificates_with_inline_cert( + self, mock_file, mock_isfile, mock_mkdtemp, api_client + ): + """Test _setup_proxy_certificates with inline certificate content.""" + # Setup mocks + mock_mkdtemp.return_value = "/tmp/test_certs" + mock_isfile.side_effect = ( + lambda path: path == "/etc/ssl/certs/ca-certificates.crt" + ) + + # Mock system certificate content + system_cert_content = ( + "-----BEGIN CERTIFICATE-----\nsystem\n-----END CERTIFICATE-----" + ) + + def open_side_effect(path, mode="r"): + if path == "/etc/ssl/certs/ca-certificates.crt" and mode == "r": + return mock_open(read_data=system_cert_content)() + return mock_file() + + with patch("builtins.open", side_effect=open_side_effect): + api_client._setup_proxy_certificates() + + # Verify proxy certificates were processed + api_client.app_logger.info.assert_called() + + @patch("tempfile.mkdtemp") + @patch.dict(os.environ, {"HTTPS_CA_CERTIFICATES": "/path/to/cert.crt"}) + def test_setup_proxy_certificates_with_invalid_path(self, mock_mkdtemp, api_client): + """Test _setup_proxy_certificates with invalid certificate file path.""" + mock_mkdtemp.return_value = "/tmp/test_certs" + + # Mock _get_certificate_content to return None (invalid) + with patch.object(api_client, "_get_certificate_content", return_value=None): + api_client._setup_proxy_certificates() + + # Should log warning and return early + api_client.app_logger.warning.assert_called() + assert not hasattr(api_client, "ssl_verify") or api_client.ssl_verify is False + + def test_setup_proxy_certificates_exception_handling(self, api_client): + """Test _setup_proxy_certificates handles exceptions gracefully.""" + with patch.dict(os.environ, {"HTTPS_CA_CERTIFICATES": self.SAMPLE_CERTIFICATE}): + with patch("tempfile.mkdtemp", side_effect=Exception("Mock error")): + api_client._setup_proxy_certificates() + + # Should log warning and continue + api_client.app_logger.warning.assert_called_with( + "Failed to setup proxy certificates", {"error": "Mock error"} + ) From 3843dd23801c55e9e37353e28c5d681bbc12bf14 Mon Sep 17 00:00:00 2001 From: Mael Van den Plas Date: Fri, 7 Nov 2025 16:18:30 +0100 Subject: [PATCH 4/6] address review feedback --- pycti/api/opencti_api_client.py | 62 +++++++++++++++++-- .../api/test_opencti_api_client-proxy.py | 43 +++++++++++-- 2 files changed, 97 insertions(+), 8 deletions(-) diff --git a/pycti/api/opencti_api_client.py b/pycti/api/opencti_api_client.py index 4250ead00..bd825d7e8 100644 --- a/pycti/api/opencti_api_client.py +++ b/pycti/api/opencti_api_client.py @@ -1,9 +1,12 @@ # coding: utf-8 +import atexit import base64 import datetime import io import json import os +import shutil +import signal import tempfile from typing import Dict, Tuple, Union @@ -168,9 +171,18 @@ def __init__( self.app_logger = self.logger_class("api") self.admin_logger = self.logger_class("admin") + # Initialize temp certificate directory tracker + self.temp_cert_dir = None + # Setup proxy certificates if provided self._setup_proxy_certificates() + # Register cleanup handlers for temp certificates + if self.temp_cert_dir: + atexit.register(self._cleanup_temp_certificates) + signal.signal(signal.SIGTERM, self._signal_handler) + signal.signal(signal.SIGINT, self._signal_handler) + # Define API self.api_token = token self.api_url = url + "/graphql" @@ -268,6 +280,7 @@ def _setup_proxy_certificates(self): try: # Create secure temporary directory cert_dir = tempfile.mkdtemp(prefix="opencti_proxy_certs_") + self.temp_cert_dir = cert_dir # Determine if HTTPS_CA_CERTIFICATES contains inline content or file path cert_content = self._get_certificate_content(https_ca_certificates) @@ -323,9 +336,10 @@ def _setup_proxy_certificates(self): ) except Exception as e: - self.app_logger.warning( + self.app_logger.error( "Failed to setup proxy certificates", {"error": str(e)} ) + raise def _get_certificate_content(self, https_ca_certificates): """Extract certificate content from environment variable. @@ -337,16 +351,19 @@ def _get_certificate_content(self, https_ca_certificates): :return: Certificate content in PEM format or None if invalid :rtype: str or None """ + # Strip whitespace once at the beginning + stripped_https_ca_certificates = https_ca_certificates.strip() + # Check if it's inline certificate content (starts with PEM header) - if https_ca_certificates.strip().startswith("-----BEGIN CERTIFICATE-----"): + if stripped_https_ca_certificates.startswith("-----BEGIN CERTIFICATE-----"): self.app_logger.debug( "HTTPS_CA_CERTIFICATES contains inline certificate content" ) return https_ca_certificates # Check if it's a file path - if os.path.isfile(https_ca_certificates.strip()): - cert_file_path = https_ca_certificates.strip() + if os.path.isfile(stripped_https_ca_certificates): + cert_file_path = stripped_https_ca_certificates try: with open(cert_file_path, "r") as f: cert_content = f.read() @@ -373,6 +390,43 @@ def _get_certificate_content(self, https_ca_certificates): # Neither inline content nor valid file path return None + def _cleanup_temp_certificates(self): + """Clean up temporary certificate directory. + + This method is called on normal program exit via atexit + or when receiving termination signals (SIGTERM/SIGINT). + """ + if self.temp_cert_dir and os.path.exists(self.temp_cert_dir): + try: + shutil.rmtree(self.temp_cert_dir) + self.app_logger.debug( + "Cleaned up temporary certificates", + {"cert_dir": self.temp_cert_dir} + ) + except Exception as e: + self.app_logger.warning( + "Failed to cleanup temporary certificates", + {"cert_dir": self.temp_cert_dir, "error": str(e)} + ) + finally: + self.temp_cert_dir = None + + def _signal_handler(self, signum, frame): + """Handle termination signals (SIGTERM/SIGINT). + + Performs cleanup and then raises SystemExit to allow + normal shutdown procedures to complete. + + :param signum: Signal number + :param frame: Current stack frame + """ + self.app_logger.info( + "Received termination signal, cleaning up", + {"signal": signum} + ) + self._cleanup_temp_certificates() + raise SystemExit(0) + def set_applicant_id_header(self, applicant_id): self.request_headers["opencti-applicant-id"] = applicant_id diff --git a/tests/01-unit/api/test_opencti_api_client-proxy.py b/tests/01-unit/api/test_opencti_api_client-proxy.py index 76a384531..31cb5d179 100644 --- a/tests/01-unit/api/test_opencti_api_client-proxy.py +++ b/tests/01-unit/api/test_opencti_api_client-proxy.py @@ -177,12 +177,47 @@ def test_setup_proxy_certificates_with_invalid_path(self, mock_mkdtemp, api_clie assert not hasattr(api_client, "ssl_verify") or api_client.ssl_verify is False def test_setup_proxy_certificates_exception_handling(self, api_client): - """Test _setup_proxy_certificates handles exceptions gracefully.""" + """Test _setup_proxy_certificates raises exception on error.""" with patch.dict(os.environ, {"HTTPS_CA_CERTIFICATES": self.SAMPLE_CERTIFICATE}): with patch("tempfile.mkdtemp", side_effect=Exception("Mock error")): - api_client._setup_proxy_certificates() + with pytest.raises(Exception, match="Mock error"): + api_client._setup_proxy_certificates() - # Should log warning and continue - api_client.app_logger.warning.assert_called_with( + # Should log error before raising + api_client.app_logger.error.assert_called_with( "Failed to setup proxy certificates", {"error": "Mock error"} ) + + def test_cleanup_temp_certificates_successful(self, api_client): + """Test _cleanup_temp_certificates successfully removes temporary directory.""" + # Create a real temporary directory + temp_dir = tempfile.mkdtemp(prefix="opencti_test_") + api_client.temp_cert_dir = temp_dir + + # Call cleanup + api_client._cleanup_temp_certificates() + + # Verify directory was removed and temp_cert_dir reset + assert not os.path.exists(temp_dir) + assert api_client.temp_cert_dir is None + + @patch("shutil.rmtree") + @patch("os.path.exists") + def test_cleanup_temp_certificates_with_error( + self, mock_exists, mock_rmtree, api_client + ): + """Test _cleanup_temp_certificates handles errors during removal.""" + temp_dir = "/tmp/opencti_test_certs" + api_client.temp_cert_dir = temp_dir + mock_exists.return_value = True + mock_rmtree.side_effect = OSError("Permission denied") + + # Call cleanup - should not raise exception + api_client._cleanup_temp_certificates() + + # Should log warning and reset temp_cert_dir + api_client.app_logger.warning.assert_called_with( + "Failed to cleanup temporary certificates", + {"cert_dir": temp_dir, "error": "Permission denied"}, + ) + assert api_client.temp_cert_dir is None From e84e7fa11f2d2162956afb6b0211aa9dcd491fc5 Mon Sep 17 00:00:00 2001 From: Mael Van den Plas Date: Fri, 7 Nov 2025 17:54:25 +0100 Subject: [PATCH 5/6] convert to singleton pattern --- pycti/api/opencti_api_client.py | 230 ++++++++++-------- .../api/test_opencti_api_client-proxy.py | 99 ++++++-- 2 files changed, 203 insertions(+), 126 deletions(-) diff --git a/pycti/api/opencti_api_client.py b/pycti/api/opencti_api_client.py index bd825d7e8..3df435648 100644 --- a/pycti/api/opencti_api_client.py +++ b/pycti/api/opencti_api_client.py @@ -8,6 +8,7 @@ import shutil import signal import tempfile +import threading from typing import Dict, Tuple, Union import magic @@ -83,6 +84,12 @@ from pycti.utils.opencti_stix2 import OpenCTIStix2 from pycti.utils.opencti_stix2_utils import OpenCTIStix2Utils +# Global singleton variables for proxy certificate management +_PROXY_CERT_BUNDLE = None +_PROXY_CERT_DIR = None +_PROXY_CERT_LOCK = threading.Lock() +_PROXY_SIGNAL_HANDLERS_REGISTERED = False + def build_request_headers(token: str, custom_headers: str, app_logger): headers_dict = { @@ -171,18 +178,9 @@ def __init__( self.app_logger = self.logger_class("api") self.admin_logger = self.logger_class("admin") - # Initialize temp certificate directory tracker - self.temp_cert_dir = None - # Setup proxy certificates if provided self._setup_proxy_certificates() - # Register cleanup handlers for temp certificates - if self.temp_cert_dir: - atexit.register(self._cleanup_temp_certificates) - signal.signal(signal.SIGTERM, self._signal_handler) - signal.signal(signal.SIGINT, self._signal_handler) - # Define API self.api_token = token self.api_url = url + "/graphql" @@ -272,74 +270,101 @@ def _setup_proxy_certificates(self): Detects HTTPS_CA_CERTIFICATES environment variable and combines proxy certificates with system certificates for SSL verification. Supports both inline certificate content and file paths. + + Uses a singleton pattern to ensure only one certificate bundle is created + across all instances, avoiding resource leaks and conflicts. """ + global _PROXY_CERT_BUNDLE, _PROXY_CERT_DIR, _PROXY_SIGNAL_HANDLERS_REGISTERED + https_ca_certificates = os.getenv("HTTPS_CA_CERTIFICATES") if not https_ca_certificates: return - try: - # Create secure temporary directory - cert_dir = tempfile.mkdtemp(prefix="opencti_proxy_certs_") - self.temp_cert_dir = cert_dir - - # Determine if HTTPS_CA_CERTIFICATES contains inline content or file path - cert_content = self._get_certificate_content(https_ca_certificates) - if not cert_content: - self.app_logger.warning( - "Invalid HTTPS_CA_CERTIFICATES: not a valid certificate or file path", - { - "value": ( - https_ca_certificates[:50] + "..." - if len(https_ca_certificates) > 50 - else https_ca_certificates - ) - }, + # Thread-safe check and setup + with _PROXY_CERT_LOCK: + # If already configured, reuse existing bundle + if _PROXY_CERT_BUNDLE is not None: + self.ssl_verify = _PROXY_CERT_BUNDLE + self.app_logger.debug( + "Reusing existing proxy certificate bundle", + {"cert_bundle": _PROXY_CERT_BUNDLE}, ) return - # Write proxy certificate to temp file - proxy_cert_file = os.path.join(cert_dir, "proxy-ca.crt") - with open(proxy_cert_file, "w") as f: - f.write(cert_content) - - # Find system certificates - system_cert_paths = [ - "/etc/ssl/certs/ca-certificates.crt", # Debian/Ubuntu - "/etc/pki/tls/certs/ca-bundle.crt", # RHEL/CentOS - "/etc/ssl/cert.pem", # Alpine/BSD - ] - - # Create combined certificate bundle - combined_cert_file = os.path.join(cert_dir, "combined-ca-bundle.crt") - with open(combined_cert_file, "w") as combined: - # Add system certificates first - for system_path in system_cert_paths: - if os.path.exists(system_path): - with open(system_path, "r") as sys_certs: - combined.write(sys_certs.read()) - combined.write("\n") - break - - # Add proxy certificate - combined.write(cert_content) - - # Update ssl_verify to use combined certificate bundle - self.ssl_verify = combined_cert_file - - # Set environment variables for urllib and other libraries - os.environ["REQUESTS_CA_BUNDLE"] = combined_cert_file - os.environ["SSL_CERT_FILE"] = combined_cert_file - - self.app_logger.info( - "Proxy certificates configured", - {"cert_bundle": combined_cert_file}, - ) + # First initialization - create the certificate bundle + try: + # Create secure temporary directory + cert_dir = tempfile.mkdtemp(prefix="opencti_proxy_certs_") + + # Determine if HTTPS_CA_CERTIFICATES contains inline content or file path + cert_content = self._get_certificate_content(https_ca_certificates) + if not cert_content: + self.app_logger.warning( + "Invalid HTTPS_CA_CERTIFICATES: not a valid certificate or file path", + { + "value": ( + https_ca_certificates[:50] + "..." + if len(https_ca_certificates) > 50 + else https_ca_certificates + ) + }, + ) + return + + # Write proxy certificate to temp file + proxy_cert_file = os.path.join(cert_dir, "proxy-ca.crt") + with open(proxy_cert_file, "w") as f: + f.write(cert_content) + + # Find system certificates + system_cert_paths = [ + "/etc/ssl/certs/ca-certificates.crt", # Debian/Ubuntu + "/etc/pki/tls/certs/ca-bundle.crt", # RHEL/CentOS + "/etc/ssl/cert.pem", # Alpine/BSD + ] + + # Create combined certificate bundle + combined_cert_file = os.path.join(cert_dir, "combined-ca-bundle.crt") + with open(combined_cert_file, "w") as combined: + # Add system certificates first + for system_path in system_cert_paths: + if os.path.exists(system_path): + with open(system_path, "r") as sys_certs: + combined.write(sys_certs.read()) + combined.write("\n") + break + + # Add proxy certificate + combined.write(cert_content) + + # Update global singleton variables + _PROXY_CERT_BUNDLE = combined_cert_file + _PROXY_CERT_DIR = cert_dir + self.ssl_verify = combined_cert_file + + # Set environment variables for urllib and other libraries + os.environ["REQUESTS_CA_BUNDLE"] = combined_cert_file + os.environ["SSL_CERT_FILE"] = combined_cert_file + + # Register cleanup handlers only once + atexit.register(_cleanup_proxy_certificates) + + # Register signal handlers only once + if not _PROXY_SIGNAL_HANDLERS_REGISTERED: + signal.signal(signal.SIGTERM, _signal_handler_proxy_cleanup) + signal.signal(signal.SIGINT, _signal_handler_proxy_cleanup) + _PROXY_SIGNAL_HANDLERS_REGISTERED = True + + self.app_logger.info( + "Proxy certificates configured", + {"cert_bundle": combined_cert_file}, + ) - except Exception as e: - self.app_logger.error( - "Failed to setup proxy certificates", {"error": str(e)} - ) - raise + except Exception as e: + self.app_logger.error( + "Failed to setup proxy certificates", {"error": str(e)} + ) + raise def _get_certificate_content(self, https_ca_certificates): """Extract certificate content from environment variable. @@ -353,7 +378,7 @@ def _get_certificate_content(self, https_ca_certificates): """ # Strip whitespace once at the beginning stripped_https_ca_certificates = https_ca_certificates.strip() - + # Check if it's inline certificate content (starts with PEM header) if stripped_https_ca_certificates.startswith("-----BEGIN CERTIFICATE-----"): self.app_logger.debug( @@ -390,43 +415,6 @@ def _get_certificate_content(self, https_ca_certificates): # Neither inline content nor valid file path return None - def _cleanup_temp_certificates(self): - """Clean up temporary certificate directory. - - This method is called on normal program exit via atexit - or when receiving termination signals (SIGTERM/SIGINT). - """ - if self.temp_cert_dir and os.path.exists(self.temp_cert_dir): - try: - shutil.rmtree(self.temp_cert_dir) - self.app_logger.debug( - "Cleaned up temporary certificates", - {"cert_dir": self.temp_cert_dir} - ) - except Exception as e: - self.app_logger.warning( - "Failed to cleanup temporary certificates", - {"cert_dir": self.temp_cert_dir, "error": str(e)} - ) - finally: - self.temp_cert_dir = None - - def _signal_handler(self, signum, frame): - """Handle termination signals (SIGTERM/SIGINT). - - Performs cleanup and then raises SystemExit to allow - normal shutdown procedures to complete. - - :param signum: Signal number - :param frame: Current stack frame - """ - self.app_logger.info( - "Received termination signal, cleaning up", - {"signal": signum} - ) - self._cleanup_temp_certificates() - raise SystemExit(0) - def set_applicant_id_header(self, applicant_id): self.request_headers["opencti-applicant-id"] = applicant_id @@ -1062,3 +1050,33 @@ def get_attribute_in_mitre_extension(key, object) -> any: "extension-definition--322b8f77-262a-4cb8-a915-1e441e00329b" ][key] return None + + +# Global cleanup functions for proxy certificates singleton +def _cleanup_proxy_certificates(): + """Clean up temporary certificate directory for proxy certificates. + + This function is called on normal program exit via atexit. + """ + global _PROXY_CERT_DIR + if _PROXY_CERT_DIR and os.path.exists(_PROXY_CERT_DIR): + try: + shutil.rmtree(_PROXY_CERT_DIR) + except Exception: + # Silently fail cleanup - best effort + pass + finally: + _PROXY_CERT_DIR = None + + +def _signal_handler_proxy_cleanup(signum, frame): + """Handle termination signals (SIGTERM/SIGINT) for proxy certificate cleanup. + + Performs cleanup and then raises SystemExit to allow + normal shutdown procedures to complete. + + :param signum: Signal number + :param frame: Current stack frame + """ + _cleanup_proxy_certificates() + raise SystemExit(0) diff --git a/tests/01-unit/api/test_opencti_api_client-proxy.py b/tests/01-unit/api/test_opencti_api_client-proxy.py index 31cb5d179..280d07454 100644 --- a/tests/01-unit/api/test_opencti_api_client-proxy.py +++ b/tests/01-unit/api/test_opencti_api_client-proxy.py @@ -166,6 +166,12 @@ def open_side_effect(path, mode="r"): @patch.dict(os.environ, {"HTTPS_CA_CERTIFICATES": "/path/to/cert.crt"}) def test_setup_proxy_certificates_with_invalid_path(self, mock_mkdtemp, api_client): """Test _setup_proxy_certificates with invalid certificate file path.""" + from pycti.api import opencti_api_client + + # Reset global state to ensure clean test + opencti_api_client._PROXY_CERT_BUNDLE = None + opencti_api_client._PROXY_CERT_DIR = None + mock_mkdtemp.return_value = "/tmp/test_certs" # Mock _get_certificate_content to return None (invalid) @@ -176,8 +182,18 @@ def test_setup_proxy_certificates_with_invalid_path(self, mock_mkdtemp, api_clie api_client.app_logger.warning.assert_called() assert not hasattr(api_client, "ssl_verify") or api_client.ssl_verify is False + # Cleanup + opencti_api_client._PROXY_CERT_BUNDLE = None + opencti_api_client._PROXY_CERT_DIR = None + def test_setup_proxy_certificates_exception_handling(self, api_client): """Test _setup_proxy_certificates raises exception on error.""" + from pycti.api import opencti_api_client + + # Reset global state to ensure clean test + opencti_api_client._PROXY_CERT_BUNDLE = None + opencti_api_client._PROXY_CERT_DIR = None + with patch.dict(os.environ, {"HTTPS_CA_CERTIFICATES": self.SAMPLE_CERTIFICATE}): with patch("tempfile.mkdtemp", side_effect=Exception("Mock error")): with pytest.raises(Exception, match="Mock error"): @@ -188,36 +204,79 @@ def test_setup_proxy_certificates_exception_handling(self, api_client): "Failed to setup proxy certificates", {"error": "Mock error"} ) - def test_cleanup_temp_certificates_successful(self, api_client): - """Test _cleanup_temp_certificates successfully removes temporary directory.""" + # Cleanup + opencti_api_client._PROXY_CERT_BUNDLE = None + opencti_api_client._PROXY_CERT_DIR = None + + def test_cleanup_proxy_certificates_successful(self): + """Test _cleanup_proxy_certificates successfully removes temporary directory.""" + from pycti.api import opencti_api_client + # Create a real temporary directory temp_dir = tempfile.mkdtemp(prefix="opencti_test_") - api_client.temp_cert_dir = temp_dir + opencti_api_client._PROXY_CERT_DIR = temp_dir # Call cleanup - api_client._cleanup_temp_certificates() + opencti_api_client._cleanup_proxy_certificates() - # Verify directory was removed and temp_cert_dir reset + # Verify directory was removed and _PROXY_CERT_DIR reset assert not os.path.exists(temp_dir) - assert api_client.temp_cert_dir is None + assert opencti_api_client._PROXY_CERT_DIR is None + + @patch("pycti.api.opencti_api_client.shutil.rmtree") + @patch("pycti.api.opencti_api_client.os.path.exists") + def test_cleanup_proxy_certificates_with_error(self, mock_exists, mock_rmtree): + """Test _cleanup_proxy_certificates handles errors during removal.""" + from pycti.api import opencti_api_client - @patch("shutil.rmtree") - @patch("os.path.exists") - def test_cleanup_temp_certificates_with_error( - self, mock_exists, mock_rmtree, api_client - ): - """Test _cleanup_temp_certificates handles errors during removal.""" temp_dir = "/tmp/opencti_test_certs" - api_client.temp_cert_dir = temp_dir + opencti_api_client._PROXY_CERT_DIR = temp_dir mock_exists.return_value = True mock_rmtree.side_effect = OSError("Permission denied") # Call cleanup - should not raise exception - api_client._cleanup_temp_certificates() + opencti_api_client._cleanup_proxy_certificates() - # Should log warning and reset temp_cert_dir - api_client.app_logger.warning.assert_called_with( - "Failed to cleanup temporary certificates", - {"cert_dir": temp_dir, "error": "Permission denied"}, - ) - assert api_client.temp_cert_dir is None + # Should reset _PROXY_CERT_DIR + assert opencti_api_client._PROXY_CERT_DIR is None + + def test_singleton_behavior_multiple_instances(self): + """Test that multiple instances reuse the same certificate bundle.""" + from pycti.api import opencti_api_client + + # Reset global state + opencti_api_client._PROXY_CERT_BUNDLE = None + opencti_api_client._PROXY_CERT_DIR = None + + with patch.dict(os.environ, {"HTTPS_CA_CERTIFICATES": self.SAMPLE_CERTIFICATE}): + with patch("tempfile.mkdtemp", return_value="/tmp/test_certs"): + with patch("builtins.open", mock_open()): + with patch("os.path.exists", return_value=True): + # Create first instance + client1 = OpenCTIApiClient( + url="http://localhost:4000", + token="test-token", + ssl_verify=False, + perform_health_check=False, + ) + + # Verify certificate bundle was created + assert opencti_api_client._PROXY_CERT_BUNDLE is not None + first_bundle = opencti_api_client._PROXY_CERT_BUNDLE + assert client1.ssl_verify == first_bundle + + # Create second instance + client2 = OpenCTIApiClient( + url="http://localhost:4000", + token="test-token2", + ssl_verify=False, + perform_health_check=False, + ) + + # Verify same bundle is reused + assert opencti_api_client._PROXY_CERT_BUNDLE == first_bundle + assert client2.ssl_verify == first_bundle + + # Cleanup + opencti_api_client._PROXY_CERT_BUNDLE = None + opencti_api_client._PROXY_CERT_DIR = None From fb60061ef8cc9bf880f77560d62f9c54bac7d68d Mon Sep 17 00:00:00 2001 From: Mael Van den Plas Date: Fri, 7 Nov 2025 17:58:57 +0100 Subject: [PATCH 6/6] Exceptions are raised instead of returning None --- pycti/api/opencti_api_client.py | 36 ++++++--------- .../api/test_opencti_api_client-proxy.py | 46 +++++++++++-------- 2 files changed, 40 insertions(+), 42 deletions(-) diff --git a/pycti/api/opencti_api_client.py b/pycti/api/opencti_api_client.py index 3df435648..69278be60 100644 --- a/pycti/api/opencti_api_client.py +++ b/pycti/api/opencti_api_client.py @@ -298,18 +298,6 @@ def _setup_proxy_certificates(self): # Determine if HTTPS_CA_CERTIFICATES contains inline content or file path cert_content = self._get_certificate_content(https_ca_certificates) - if not cert_content: - self.app_logger.warning( - "Invalid HTTPS_CA_CERTIFICATES: not a valid certificate or file path", - { - "value": ( - https_ca_certificates[:50] + "..." - if len(https_ca_certificates) > 50 - else https_ca_certificates - ) - }, - ) - return # Write proxy certificate to temp file proxy_cert_file = os.path.join(cert_dir, "proxy-ca.crt") @@ -373,8 +361,9 @@ def _get_certificate_content(self, https_ca_certificates): :param https_ca_certificates: Content from HTTPS_CA_CERTIFICATES env var :type https_ca_certificates: str - :return: Certificate content in PEM format or None if invalid - :rtype: str or None + :return: Certificate content in PEM format + :rtype: str + :raises ValueError: If the certificate content is invalid or cannot be read """ # Strip whitespace once at the beginning stripped_https_ca_certificates = https_ca_certificates.strip() @@ -400,20 +389,21 @@ def _get_certificate_content(self, https_ca_certificates): ) return cert_content else: - self.app_logger.warning( - "File at HTTPS_CA_CERTIFICATES path does not contain valid certificate", - {"file_path": cert_file_path}, + raise ValueError( + f"File at HTTPS_CA_CERTIFICATES path does not contain valid certificate: {cert_file_path}" ) - return None + except ValueError: + # Re-raise ValueError from certificate validation + raise except Exception as e: - self.app_logger.warning( - "Failed to read certificate file", - {"file_path": cert_file_path, "error": str(e)}, + raise ValueError( + f"Failed to read certificate file at {cert_file_path}: {str(e)}" ) - return None # Neither inline content nor valid file path - return None + raise ValueError( + f"HTTPS_CA_CERTIFICATES is not a valid certificate or file path: {https_ca_certificates[:50]}..." + ) def set_applicant_id_header(self, applicant_id): self.request_headers["opencti-applicant-id"] = applicant_id diff --git a/tests/01-unit/api/test_opencti_api_client-proxy.py b/tests/01-unit/api/test_opencti_api_client-proxy.py index 280d07454..8996257d5 100644 --- a/tests/01-unit/api/test_opencti_api_client-proxy.py +++ b/tests/01-unit/api/test_opencti_api_client-proxy.py @@ -75,13 +75,11 @@ def test_get_certificate_content_invalid_file_content(self, api_client): invalid_file_path = invalid_file.name try: - result = api_client._get_certificate_content(invalid_file_path) - - assert result is None - api_client.app_logger.warning.assert_called_with( - "File at HTTPS_CA_CERTIFICATES path does not contain valid certificate", - {"file_path": invalid_file_path}, - ) + with pytest.raises( + ValueError, + match="File at HTTPS_CA_CERTIFICATES path does not contain valid certificate", + ): + api_client._get_certificate_content(invalid_file_path) finally: # Clean up os.unlink(invalid_file_path) @@ -90,15 +88,19 @@ def test_get_certificate_content_nonexistent_file(self, api_client): """Test _get_certificate_content with a nonexistent file path.""" nonexistent_path = "/tmp/nonexistent_certificate.crt" - result = api_client._get_certificate_content(nonexistent_path) - - assert result is None + with pytest.raises( + ValueError, + match="HTTPS_CA_CERTIFICATES is not a valid certificate or file path", + ): + api_client._get_certificate_content(nonexistent_path) def test_get_certificate_content_invalid_content(self, api_client): """Test _get_certificate_content with invalid content (not PEM, not file).""" - result = api_client._get_certificate_content(self.INVALID_CONTENT) - - assert result is None + with pytest.raises( + ValueError, + match="HTTPS_CA_CERTIFICATES is not a valid certificate or file path", + ): + api_client._get_certificate_content(self.INVALID_CONTENT) def test_get_certificate_content_whitespace_handling(self, api_client): """Test _get_certificate_content handles whitespace correctly.""" @@ -174,13 +176,19 @@ def test_setup_proxy_certificates_with_invalid_path(self, mock_mkdtemp, api_clie mock_mkdtemp.return_value = "/tmp/test_certs" - # Mock _get_certificate_content to return None (invalid) - with patch.object(api_client, "_get_certificate_content", return_value=None): - api_client._setup_proxy_certificates() + # Mock _get_certificate_content to raise ValueError (invalid) + with patch.object( + api_client, + "_get_certificate_content", + side_effect=ValueError("Invalid certificate"), + ): + with pytest.raises(ValueError, match="Invalid certificate"): + api_client._setup_proxy_certificates() - # Should log warning and return early - api_client.app_logger.warning.assert_called() - assert not hasattr(api_client, "ssl_verify") or api_client.ssl_verify is False + # Should log error before raising + api_client.app_logger.error.assert_called_with( + "Failed to setup proxy certificates", {"error": "Invalid certificate"} + ) # Cleanup opencti_api_client._PROXY_CERT_BUNDLE = None