Skip to content

Commit b349ee4

Browse files
authored
feat: add DiodeOTLPClient (#68)
1 parent fa2cc14 commit b349ee4

File tree

6 files changed

+410
-8
lines changed

6 files changed

+410
-8
lines changed

README.md

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,23 @@ diode-replay-dryrun \
149149
my_app_92722156890707.json
150150
```
151151

152+
### OTLP client
153+
154+
`DiodeOTLPClient` converts ingestion entities into OpenTelemetry log records and exports them to an OTLP endpoint (gRPC). This is useful when a collector ingests log data and forwards it to Diode.
155+
156+
```python
157+
from netboxlabs.diode.sdk import Entity, DiodeOTLPClient
158+
159+
with DiodeOTLPClient(
160+
target="grpc://localhost:4317",
161+
app_name="my-producer",
162+
app_version="0.0.1",
163+
) as client:
164+
client.ingest([Entity(site="Site1")])
165+
```
166+
167+
Each entity is serialised to JSON and sent as a log record with producer metadata so downstream collectors can enrich and forward the payload. The client raises `OTLPClientError` when the export fails. TLS behaviour honours the existing `DIODE_SKIP_TLS_VERIFY` and `DIODE_CERT_FILE` environment variables.
168+
152169
## Supported entities (object types)
153170

154171
* ASN

netboxlabs/diode/sdk/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,11 @@
55
from netboxlabs.diode.sdk.client import (
66
DiodeClient,
77
DiodeDryRunClient,
8+
DiodeOTLPClient,
89
load_dryrun_entities,
910
)
1011

1112
assert DiodeClient
1213
assert DiodeDryRunClient
14+
assert DiodeOTLPClient
1315
assert load_dryrun_entities

netboxlabs/diode/sdk/client.py

Lines changed: 246 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,19 @@
2020
import grpc
2121
import sentry_sdk
2222
from google.protobuf.json_format import MessageToJson, ParseDict
23+
from opentelemetry.proto.collector.logs.v1 import (
24+
logs_service_pb2,
25+
logs_service_pb2_grpc,
26+
)
27+
from opentelemetry.proto.common.v1 import common_pb2
28+
from opentelemetry.proto.logs.v1 import logs_pb2
2329

2430
from netboxlabs.diode.sdk.diode.v1 import ingester_pb2, ingester_pb2_grpc
25-
from netboxlabs.diode.sdk.exceptions import DiodeClientError, DiodeConfigError
31+
from netboxlabs.diode.sdk.exceptions import (
32+
DiodeClientError,
33+
DiodeConfigError,
34+
OTLPClientError,
35+
)
2636
from netboxlabs.diode.sdk.ingester import Entity
2737
from netboxlabs.diode.sdk.version import version_semver
2838

@@ -38,6 +48,7 @@
3848
_LOGGER = logging.getLogger(__name__)
3949
_MAX_RETRIES_ENVVAR_NAME = "DIODE_MAX_AUTH_RETRIES"
4050

51+
4152
def load_dryrun_entities(file_path: str | Path) -> Iterable[Entity]:
4253
"""Yield entities from a file with concatenated JSON messages."""
4354
path = Path(file_path)
@@ -79,7 +90,9 @@ def parse_target(target: str) -> tuple[str, str, bool]:
7990
parsed_target = urlparse(target)
8091

8192
if parsed_target.scheme not in ["grpc", "grpcs", "http", "https"]:
82-
raise ValueError("target should start with grpc://, grpcs://, http:// or https://")
93+
raise ValueError(
94+
"target should start with grpc://, grpcs://, http:// or https://"
95+
)
8396

8497
# Determine if TLS verification should be enabled
8598
tls_verify = _should_verify_tls(parsed_target.scheme)
@@ -149,16 +162,21 @@ def __init__(
149162
log_level = os.getenv(_DIODE_SDK_LOG_LEVEL_ENVVAR_NAME, "INFO").upper()
150163
logging.basicConfig(level=log_level)
151164

152-
self._max_auth_retries = int(_get_optional_config_value(
153-
_MAX_RETRIES_ENVVAR_NAME, str(max_auth_retries)
154-
) or max_auth_retries)
165+
self._max_auth_retries = int(
166+
_get_optional_config_value(_MAX_RETRIES_ENVVAR_NAME, str(max_auth_retries))
167+
or max_auth_retries
168+
)
155169
self._cert_file = _get_optional_config_value(
156170
_DIODE_CERT_FILE_ENVVAR_NAME, cert_file
157171
)
158172
self._target, self._path, self._tls_verify = parse_target(target)
159173

160174
# Load certificates once if needed
161-
self._certificates = _load_certs(self._cert_file) if (self._tls_verify or self._cert_file) else None
175+
self._certificates = (
176+
_load_certs(self._cert_file)
177+
if (self._tls_verify or self._cert_file)
178+
else None
179+
)
162180
self._app_name = app_name
163181
self._app_version = app_version
164182
self._platform = platform.platform()
@@ -406,6 +424,227 @@ def ingest(
406424
return ingester_pb2.IngestResponse()
407425

408426

427+
class DiodeOTLPClient(DiodeClientInterface):
428+
"""Diode OTLP client that exports ingestion entities as OTLP logs."""
429+
430+
_name = "diode-sdk-python-otlp"
431+
_version = version_semver()
432+
433+
def __init__(
434+
self,
435+
target: str,
436+
app_name: str,
437+
app_version: str,
438+
*,
439+
timeout: float = 10.0,
440+
metadata: dict[str, str] | Iterable[tuple[str, str]] | None = None,
441+
cert_file: str | None = None,
442+
):
443+
"""Initiate a new Diode OTLP client."""
444+
log_level = os.getenv(_DIODE_SDK_LOG_LEVEL_ENVVAR_NAME, "INFO").upper()
445+
logging.basicConfig(level=log_level)
446+
447+
self._app_name = app_name
448+
self._app_version = app_version
449+
self._platform = platform.platform()
450+
self._python_version = platform.python_version()
451+
self._timeout = timeout
452+
453+
self._target, self._path, self._tls_verify = parse_target(target)
454+
self._cert_file = _get_optional_config_value(
455+
_DIODE_CERT_FILE_ENVVAR_NAME, cert_file
456+
)
457+
self._certificates = (
458+
_load_certs(self._cert_file)
459+
if (self._tls_verify or self._cert_file)
460+
else None
461+
)
462+
463+
channel_opts = (
464+
(
465+
"grpc.primary_user_agent",
466+
f"{self._name}/{self._version} {self._app_name}/{self._app_version}",
467+
),
468+
)
469+
470+
if self._tls_verify:
471+
credentials = (
472+
grpc.ssl_channel_credentials(root_certificates=self._certificates)
473+
if self._certificates
474+
else grpc.ssl_channel_credentials()
475+
)
476+
base_channel = grpc.secure_channel(
477+
self._target,
478+
credentials,
479+
options=channel_opts,
480+
)
481+
else:
482+
base_channel = grpc.insecure_channel(
483+
target=self._target,
484+
options=channel_opts,
485+
)
486+
487+
self._base_channel = base_channel
488+
channel = base_channel
489+
if self._path:
490+
interceptor = DiodeMethodClientInterceptor(subpath=self._path)
491+
channel = grpc.intercept_channel(base_channel, interceptor)
492+
493+
self._channel = channel
494+
self._stub = logs_service_pb2_grpc.LogsServiceStub(channel)
495+
self._metadata = self._prepare_metadata(metadata)
496+
497+
@staticmethod
498+
def _prepare_metadata(
499+
metadata: dict[str, str] | Iterable[tuple[str, str]] | None,
500+
) -> tuple[tuple[str, str], ...] | None:
501+
if metadata is None:
502+
return None
503+
if isinstance(metadata, dict):
504+
return tuple(metadata.items())
505+
return tuple(metadata)
506+
507+
@property
508+
def name(self) -> str:
509+
"""Retrieve the client name."""
510+
return self._name
511+
512+
@property
513+
def version(self) -> str:
514+
"""Retrieve the client version."""
515+
return self._version
516+
517+
@property
518+
def app_name(self) -> str:
519+
"""Retrieve the producer application name."""
520+
return self._app_name
521+
522+
@property
523+
def app_version(self) -> str:
524+
"""Retrieve the producer application version."""
525+
return self._app_version
526+
527+
@property
528+
def timeout(self) -> float:
529+
"""Retrieve the export timeout."""
530+
return self._timeout
531+
532+
@property
533+
def target(self) -> str:
534+
"""Retrieve the export target."""
535+
return self._target
536+
537+
def __enter__(self):
538+
"""Enter the runtime context."""
539+
return self
540+
541+
def __exit__(self, exc_type, exc_value, exc_traceback):
542+
"""Exit the runtime context."""
543+
self.close()
544+
545+
def close(self):
546+
"""Close the underlying channel."""
547+
if getattr(self, "_base_channel", None):
548+
self._base_channel.close()
549+
550+
def ingest(
551+
self,
552+
entities: Iterable[Entity | ingester_pb2.Entity | None],
553+
stream: str | None = _DEFAULT_STREAM,
554+
) -> ingester_pb2.IngestResponse:
555+
"""Export entities as OTLP logs."""
556+
stream = stream or _DEFAULT_STREAM
557+
log_records = [
558+
self._entity_to_log_record(entity)
559+
for entity in self._normalize_entities(entities)
560+
]
561+
562+
if not log_records:
563+
return ingester_pb2.IngestResponse()
564+
565+
request = self._build_export_request(log_records, stream)
566+
567+
try:
568+
self._stub.Export(
569+
request,
570+
timeout=self._timeout,
571+
metadata=self._metadata,
572+
)
573+
except grpc.RpcError as err:
574+
raise OTLPClientError(err) from err
575+
576+
return ingester_pb2.IngestResponse()
577+
578+
def _normalize_entities(
579+
self, entities: Iterable[Entity | ingester_pb2.Entity | None]
580+
) -> list[ingester_pb2.Entity]:
581+
normalized: list[ingester_pb2.Entity] = []
582+
for entity in entities:
583+
if entity is None:
584+
continue
585+
if not isinstance(entity, ingester_pb2.Entity):
586+
raise TypeError("DiodeOTLPClient expects ingester_pb2.Entity instances")
587+
normalized.append(entity)
588+
return normalized
589+
590+
def _build_export_request(
591+
self,
592+
log_records: list[logs_pb2.LogRecord],
593+
stream: str | None,
594+
) -> logs_service_pb2.ExportLogsServiceRequest:
595+
resource_logs = logs_pb2.ResourceLogs()
596+
resource_logs.resource.attributes.extend(self._resource_attributes())
597+
resource_logs.resource.attributes.append(
598+
self._string_kv("diode.stream", stream)
599+
)
600+
scope_logs = resource_logs.scope_logs.add()
601+
scope_logs.scope.CopyFrom(
602+
common_pb2.InstrumentationScope(
603+
name=self._name,
604+
version=self._version,
605+
)
606+
)
607+
scope_logs.log_records.extend(log_records)
608+
609+
request = logs_service_pb2.ExportLogsServiceRequest()
610+
request.resource_logs.append(resource_logs)
611+
return request
612+
613+
def _resource_attributes(self) -> list[common_pb2.KeyValue]:
614+
return [
615+
self._string_kv("service.name", self._app_name),
616+
self._string_kv("service.version", self._app_version),
617+
self._string_kv("os.description", self._platform),
618+
self._string_kv("process.runtime.version", self._python_version),
619+
]
620+
621+
def _entity_to_log_record(
622+
self,
623+
entity: ingester_pb2.Entity,
624+
) -> logs_pb2.LogRecord:
625+
body_json = MessageToJson(entity, preserving_proto_field_name=True)
626+
entity_type = entity.WhichOneof("entity") or "unknown"
627+
628+
log_record = logs_pb2.LogRecord(
629+
time_unix_nano=time.time_ns(),
630+
severity_number=logs_pb2.SeverityNumber.SEVERITY_NUMBER_INFO,
631+
severity_text="INFO",
632+
)
633+
log_record.body.CopyFrom(common_pb2.AnyValue(string_value=body_json))
634+
log_record.attributes.extend(
635+
[
636+
self._string_kv("diode.entity", entity_type),
637+
]
638+
)
639+
return log_record
640+
641+
@staticmethod
642+
def _string_kv(key: str, value: str) -> common_pb2.KeyValue:
643+
return common_pb2.KeyValue(
644+
key=key, value=common_pb2.AnyValue(string_value=value)
645+
)
646+
647+
409648
class _DiodeAuthentication:
410649
def __init__(
411650
self,
@@ -429,7 +668,7 @@ def authenticate(self) -> str:
429668
"""Request an OAuth2 token using client credentials and return it."""
430669
if self._tls_verify and self._certificates:
431670
context = ssl.create_default_context()
432-
context.load_verify_locations(cadata=self._certificates.decode('utf-8'))
671+
context.load_verify_locations(cadata=self._certificates.decode("utf-8"))
433672
conn = http.client.HTTPSConnection(
434673
self._target,
435674
context=context,

netboxlabs/diode/sdk/exceptions.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,3 +45,42 @@ def details(self):
4545
def __repr__(self):
4646
"""Return string representation."""
4747
return f"<DiodeClientError status code: {self._status_code}, details: {self._details}>"
48+
49+
50+
class OTLPClientError(BaseError):
51+
"""Raised when the OTLP client fails to export log data."""
52+
53+
def __init__(self, error: Exception, message: str | None = None):
54+
"""Initialize OTLPClientError."""
55+
self._message = message or "OTLP export failed"
56+
self.status_code = None
57+
self.details = None
58+
59+
if isinstance(error, grpc.RpcError):
60+
try:
61+
self.status_code = error.code()
62+
except Exception: # pragma: no cover - defensive
63+
self.status_code = None
64+
try:
65+
self.details = error.details()
66+
except Exception: # pragma: no cover - defensive
67+
self.details = None
68+
else:
69+
self.details = str(error)
70+
71+
parts: list[str] = [self._message]
72+
if self.status_code is not None:
73+
status_name = getattr(self.status_code, "name", str(self.status_code))
74+
parts.append(f"status={status_name}")
75+
if self.details:
76+
parts.append(f"details={self.details}")
77+
78+
super().__init__(", ".join(parts))
79+
80+
def __repr__(self):
81+
"""Return string representation."""
82+
status = getattr(self.status_code, "name", self.status_code)
83+
return (
84+
f"<OTLPClientError message={self._message!r}, "
85+
f"status_code={status!r}, details={self.details!r}>"
86+
)

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ dependencies = [
2828
"grpcio>=1.68.1",
2929
"grpcio-status>=1.68.1",
3030
"sentry-sdk>=2.2.1",
31+
"opentelemetry-proto>=1.26.0",
3132
]
3233

3334
[project.optional-dependencies] # Optional

0 commit comments

Comments
 (0)