diff --git a/hypertrace-ingester/build.gradle.kts b/hypertrace-ingester/build.gradle.kts index 87d61180c..97929bcf2 100644 --- a/hypertrace-ingester/build.gradle.kts +++ b/hypertrace-ingester/build.gradle.kts @@ -37,7 +37,6 @@ dependencies { implementation(project(":span-normalizer:span-normalizer-constants")) implementation(project(":span-normalizer:raw-span-constants")) implementation(project(":semantic-convention-utils")) - implementation(project(":span-normalizer:span-normalizer")) implementation(project(":raw-spans-grouper:raw-spans-grouper")) implementation(project(":hypertrace-trace-enricher:hypertrace-trace-enricher")) implementation(project(":hypertrace-view-generator:hypertrace-view-generator")) @@ -65,12 +64,6 @@ tasks.processResources { tasks.register("copyServiceConfigs") { with( - createCopySpec( - "span-normalizer", - "span-normalizer", - "main", - "common" - ), createCopySpec( "raw-spans-grouper", "raw-spans-grouper", @@ -143,12 +136,6 @@ tasks.test { tasks.register("copyServiceConfigsTest") { with( - createCopySpec( - "span-normalizer", - "span-normalizer", - "test", - "span-normalizer" - ), createCopySpec( "raw-spans-grouper", "raw-spans-grouper", diff --git a/hypertrace-ingester/src/main/java/org/hypertrace/ingester/HypertraceIngester.java b/hypertrace-ingester/src/main/java/org/hypertrace/ingester/HypertraceIngester.java index ede314b3f..9042dfb6e 100644 --- a/hypertrace-ingester/src/main/java/org/hypertrace/ingester/HypertraceIngester.java +++ b/hypertrace-ingester/src/main/java/org/hypertrace/ingester/HypertraceIngester.java @@ -15,7 +15,6 @@ import org.hypertrace.core.serviceframework.config.ConfigClient; import org.hypertrace.core.serviceframework.config.ConfigClientFactory; import org.hypertrace.core.serviceframework.config.ConfigUtils; -import org.hypertrace.core.spannormalizer.SpanNormalizer; import org.hypertrace.core.viewgenerator.service.MultiViewGeneratorLauncher; import org.hypertrace.metrics.exporter.MetricsExporterService; import org.hypertrace.metrics.generator.MetricsGenerator; @@ -52,9 +51,6 @@ public HypertraceIngester(ConfigClient configClient) { private KafkaStreamsApp getSubTopologyInstance(String name) { KafkaStreamsApp kafkaStreamsApp; switch (name) { - case "span-normalizer": - kafkaStreamsApp = new SpanNormalizer(ConfigClientFactory.getClient()); - break; case "raw-spans-grouper": kafkaStreamsApp = new RawSpansGrouper(ConfigClientFactory.getClient()); break; diff --git a/hypertrace-ingester/src/main/resources/configs/hypertrace-ingester/application.conf b/hypertrace-ingester/src/main/resources/configs/hypertrace-ingester/application.conf index a874fe408..9e433bde9 100644 --- a/hypertrace-ingester/src/main/resources/configs/hypertrace-ingester/application.conf +++ b/hypertrace-ingester/src/main/resources/configs/hypertrace-ingester/application.conf @@ -4,7 +4,6 @@ service.name = hypertrace-ingester service.admin.port = 8099 sub.topology.names = [ - "span-normalizer", "raw-spans-grouper", "hypertrace-trace-enricher", "all-views" diff --git a/hypertrace-ingester/src/test/java/org/hypertrace/ingester/HypertraceIngesterTest.java b/hypertrace-ingester/src/test/java/org/hypertrace/ingester/HypertraceIngesterTest.java index c2b17280c..7123998af 100644 --- a/hypertrace-ingester/src/test/java/org/hypertrace/ingester/HypertraceIngesterTest.java +++ b/hypertrace-ingester/src/test/java/org/hypertrace/ingester/HypertraceIngesterTest.java @@ -6,7 +6,7 @@ import com.google.protobuf.ByteString; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; -import io.jaegertracing.api_v2.JaegerSpanInternalModel.Span; +import io.jaegertracing.api_v2.JaegerSpanInternalModel; import java.io.File; import java.nio.file.Path; import java.time.Duration; @@ -36,7 +36,6 @@ public class HypertraceIngesterTest { private static final String CONFIG_PATH = "configs/%s/application.conf"; private HypertraceIngester underTest; private Config underTestConfig; - private Config spanNormalizerConfig; private Config rawSpansGrouperConfig; private Config traceEnricherConfig; private Config spanEventViewGeneratorConfig; @@ -45,7 +44,6 @@ public class HypertraceIngesterTest { public void setUp() { underTest = new HypertraceIngester(ConfigClientFactory.getClient()); underTestConfig = getConfig("hypertrace-ingester"); - spanNormalizerConfig = getConfig("span-normalizer"); rawSpansGrouperConfig = getConfig("raw-spans-grouper"); traceEnricherConfig = getConfig("hypertrace-trace-enricher"); spanEventViewGeneratorConfig = getConfig("view-gen-span-event"); @@ -71,30 +69,26 @@ public void testIngestionPacketFlow(@TempDir Path tempDir) { // create topology test driver for ingester TopologyTestDriver topologyTestDriver = new TopologyTestDriver(streamsBuilder.build(), props); - Span span = - Span.newBuilder() + JaegerSpanInternalModel.Span span = + JaegerSpanInternalModel.Span.newBuilder() .setSpanId(ByteString.copyFrom("1".getBytes())) .setTraceId(ByteString.copyFrom("trace-1".getBytes())) + .addTags( + JaegerSpanInternalModel.KeyValue.newBuilder() + .setKey("tenant-id") + .setVStr("tenant-1") + .build()) .build(); - TestInputTopic spanNormalizerInputTopic = + TestInputTopic spanGrouperInputTopic = topologyTestDriver.createInputTopic( - spanNormalizerConfig.getString( + rawSpansGrouperConfig.getString( org.hypertrace.core.spannormalizer.constants.SpanNormalizerConstants .INPUT_TOPIC_CONFIG_KEY), Serdes.ByteArray().serializer(), new JaegerSpanSerde().serializer()); - spanNormalizerInputTopic.pipeInput(span); - - // create output topic for span-normalizer topology - TestOutputTopic spanNormalizerOutputTopic = - topologyTestDriver.createOutputTopic( - spanNormalizerConfig.getString( - StructuredTraceEnricherConstants.OUTPUT_TOPIC_CONFIG_KEY), - Serdes.String().deserializer(), - new AvroSerde<>().deserializer()); - assertNotNull(spanNormalizerOutputTopic.readKeyValue()); + spanGrouperInputTopic.pipeInput(span); topologyTestDriver.advanceWallClockTime(Duration.ofSeconds(32)); @@ -107,14 +101,6 @@ public void testIngestionPacketFlow(@TempDir Path tempDir) { new AvroSerde<>().deserializer()); assertNotNull(spanGrouperOutputTopic.readKeyValue()); - // create output topic for trace-enricher topology - TestOutputTopic traceEnricherOutputTopic = - topologyTestDriver.createOutputTopic( - traceEnricherConfig.getString(StructuredTraceEnricherConstants.OUTPUT_TOPIC_CONFIG_KEY), - Serdes.String().deserializer(), - new AvroSerde<>().deserializer()); - assertNotNull(traceEnricherOutputTopic.readKeyValue()); - // create output topic for topology TestOutputTopic spanEventViewOutputTopic = topologyTestDriver.createOutputTopic( diff --git a/hypertrace-ingester/src/test/resources/configs/hypertrace-ingester/application.conf b/hypertrace-ingester/src/test/resources/configs/hypertrace-ingester/application.conf index 4cc0a6bac..24f9a0708 100644 --- a/hypertrace-ingester/src/test/resources/configs/hypertrace-ingester/application.conf +++ b/hypertrace-ingester/src/test/resources/configs/hypertrace-ingester/application.conf @@ -3,7 +3,7 @@ main.class = org.hypertrace.ingester.HypertraceIngester service.name = hypertrace-ingester service.admin.port = 8099 -sub.topology.names = ["span-normalizer", "raw-spans-grouper", "hypertrace-trace-enricher", "all-views"] +sub.topology.names = ["raw-spans-grouper", "hypertrace-trace-enricher", "all-views"] precreate.topics = false diff --git a/raw-spans-grouper/helm/templates/raw-spans-grouper-config.yaml b/raw-spans-grouper/helm/templates/raw-spans-grouper-config.yaml index e77ebee00..9edabde9d 100644 --- a/raw-spans-grouper/helm/templates/raw-spans-grouper-config.yaml +++ b/raw-spans-grouper/helm/templates/raw-spans-grouper-config.yaml @@ -81,3 +81,83 @@ data: } } {{- end }} + + # span normalizer config + {{- if hasKey .Values.spanNormalizerConfig "processor" }} + processor { + {{- if hasKey .Values.spanNormalizerConfig.processor "tenantIdTagKey" }} + tenantIdTagKey = "{{ .Values.spanNormalizerConfig.processor.tenantIdTagKey }}" + {{- end }} + + {{- if hasKey .Values.spanNormalizerConfig.processor "excludeTenantIds" }} + excludeTenantIds = {{ .Values.spanNormalizerConfig.processor.excludeTenantIds | toJson }} + {{- end }} + + {{- if hasKey .Values.spanNormalizerConfig.processor "defaultTenantId" }} + defaultTenantId = "{{ .Values.spanNormalizerConfig.processor.defaultTenantId }}" + {{- end }} + + {{- if hasKey .Values.spanNormalizerConfig.processor "spanDropCriterion" }} + spanDropCriterion = {{ .Values.spanNormalizerConfig.processor.spanDropCriterion | toJson }} + {{- end }} + + {{- if hasKey .Values.spanNormalizerConfig.processor "spanDropFilters" }} + spanDropFilters = {{ .Values.spanNormalizerConfig.processor.spanDropFilters | toJson }} + {{- end }} + + {{- if hasKey .Values.spanNormalizerConfig.processor "bypassKey" }} + bypass.key = "{{ .Values.spanNormalizerConfig.processor.bypassKey }}" + {{- end }} + + {{- if hasKey .Values.spanNormalizerConfig.processor "lateArrivalThresholdDuration" }} + late.arrival.threshold.duration = "{{ .Values.spanNormalizerConfig.processor.lateArrivalThresholdDuration }}" + {{- end }} + + {{- if hasKey .Values.spanNormalizerConfig.processor "allowedAttributesPrefixes" }} + allowed.attributes.prefixes = {{ .Values.spanNormalizerConfig.processor.allowedAttributesPrefixes | toJson }} + {{- end }} + + {{- if hasKey .Values.spanNormalizerConfig.processor "prefixedMatchedAllowedAttributes" }} + prefixed.matched.allowed.attributes = {{ .Values.spanNormalizerConfig.processor.prefixedMatchedAllowedAttributes | toJson }} + {{- end }} + + {{- if hasKey .Values.spanNormalizerConfig.processor "rootExitSpanDropCriterion" }} + rootExitSpanDropCriterion = {{ .Values.spanNormalizerConfig.processor.rootExitSpanDropCriterion | toJson }} + {{- end }} + + {{- if hasKey .Values.spanNormalizerConfig.processor "excludeLogsTenantIds" }} + excludeLogsTenantIds = {{ .Values.spanNormalizerConfig.processor.excludeLogsTenantIds | toJson }} + {{- end }} + } + {{- end }} + + {{- if hasKey .Values.spanNormalizerConfig "metrics" }} + metrics { + reporter { + names = {{- toJson .Values.spanNormalizerConfig.metrics.reporter.names | trim | nindent 12 }} + } + } + {{- end }} + clients { + config.service.config = { + host = {{ .Values.spanNormalizerConfig.configServiceHost }} + port = {{ .Values.spanNormalizerConfig.configServicePort }} + } + } + span.rules.exclude { + cache = { + refreshAfterWriteDuration = {{ .Values.spanNormalizerConfig.excludeSpanRulesConfig.cache.refreshAfterWriteDuration }} + expireAfterWriteDuration = {{ .Values.spanNormalizerConfig.excludeSpanRulesConfig.cache.expireAfterWriteDuration }} + } + } + {{- if hasKey .Values.spanNormalizerConfig "rateLimitConfig" }} + rate.limit.config = [ + {{- range $k,$v := $.Values.spanNormalizerConfig.rateLimitConfig }} + { + tenantId = {{ $v.tenantId }} + groupingKey = {{ $v.groupingKey }} + maxSpansPerMinute = {{ $v.maxSpansPerMinute }} + }, + {{- end }} + ] + {{- end }} diff --git a/raw-spans-grouper/helm/values.yaml b/raw-spans-grouper/helm/values.yaml index 52ab84b31..ede2775f8 100644 --- a/raw-spans-grouper/helm/values.yaml +++ b/raw-spans-grouper/helm/values.yaml @@ -134,6 +134,15 @@ rawSpansGrouperConfig: groupby: internal: 30 +spanNormalizerConfig: + name: span-normalizer-config + excludeSpanRulesConfig: + cache: + refreshAfterWriteDuration: 3m + expireAfterWriteDuration: 5m + rateLimitConfig: [] + + logConfig: name: raw-spans-grouper-log-config monitorInterval: 30 @@ -166,6 +175,13 @@ kafka-topic-creator: partitions: 8 configs: cleanup.policy: "[compact, delete]" + raw-logs: + replicationFactor: 3 + partitions: 8 + configs: + retention.bytes: 8589934592 # default = -1 + retention.ms: 86400000 # default = 604800000 (7 days) + max.message.bytes: 1048588 # default = 1048588 zookeeper: address: zookeeper:2181 diff --git a/raw-spans-grouper/raw-spans-grouper/build.gradle.kts b/raw-spans-grouper/raw-spans-grouper/build.gradle.kts index 94c68f0cc..88daf7677 100644 --- a/raw-spans-grouper/raw-spans-grouper/build.gradle.kts +++ b/raw-spans-grouper/raw-spans-grouper/build.gradle.kts @@ -33,6 +33,9 @@ dependencies { because("https://snyk.io/vuln/SNYK-JAVA-ORGGLASSFISHJERSEYCORE-1255637") } implementation(project(":span-normalizer:span-normalizer-api")) + implementation(project(":span-normalizer:raw-span-constants")) + implementation(project(":span-normalizer:span-normalizer-constants")) + implementation(project(":semantic-convention-utils")) implementation("org.hypertrace.core.datamodel:data-model:0.1.27") implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.49") implementation("org.hypertrace.core.serviceframework:platform-metrics:0.1.49") @@ -41,6 +44,18 @@ dependencies { implementation("org.hypertrace.core.kafkastreams.framework:weighted-group-partitioner:0.2.6") implementation("de.javakaffee:kryo-serializers:0.45") implementation("com.google.guava:guava:31.1-jre") + implementation("org.apache.commons:commons-lang3:3.12.0") + + // Required for the GRPC clients. + runtimeOnly("io.grpc:grpc-netty:1.50.0") + annotationProcessor("org.projectlombok:lombok:1.18.18") + compileOnly("org.projectlombok:lombok:1.18.18") + + implementation("org.hypertrace.config.service:span-processing-config-service-api:0.1.47") + implementation("org.hypertrace.config.service:span-processing-utils:0.1.47") + + implementation("org.hypertrace.core.grpcutils:grpc-client-utils:0.11.2") + implementation("org.hypertrace.core.grpcutils:grpc-context-utils:0.11.2") // Logging implementation("org.slf4j:slf4j-api:1.7.30") diff --git a/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/RawSpansGrouper.java b/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/RawSpansGrouper.java index d70d8ba9f..33ade78c9 100644 --- a/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/RawSpansGrouper.java +++ b/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/RawSpansGrouper.java @@ -6,13 +6,18 @@ import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.RAW_SPANS_GROUPER_JOB_CONFIG; import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.SPAN_STATE_STORE_NAME; import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.TRACE_STATE_STORE; +import static org.hypertrace.core.spannormalizer.constants.SpanNormalizerConstants.BYPASS_OUTPUT_TOPIC_CONFIG_KEY; +import static org.hypertrace.core.spannormalizer.constants.SpanNormalizerConstants.OUTPUT_TOPIC_RAW_LOGS_CONFIG_KEY; import com.typesafe.config.Config; +import io.jaegertracing.api_v2.JaegerSpanInternalModel; import java.util.List; import java.util.Map; import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.Named; import org.apache.kafka.streams.kstream.Produced; @@ -29,6 +34,13 @@ import org.hypertrace.core.spannormalizer.SpanIdentity; import org.hypertrace.core.spannormalizer.TraceIdentity; import org.hypertrace.core.spannormalizer.TraceState; +import org.hypertrace.core.spannormalizer.jaeger.JaegerSpanPreProcessor; +import org.hypertrace.core.spannormalizer.jaeger.JaegerSpanSerde; +import org.hypertrace.core.spannormalizer.jaeger.JaegerSpanToAvroRawSpanTransformer; +import org.hypertrace.core.spannormalizer.jaeger.JaegerSpanToLogRecordsTransformer; +import org.hypertrace.core.spannormalizer.jaeger.PreProcessedSpan; +import org.hypertrace.core.spannormalizer.rawspan.ByPassPredicate; +import org.hypertrace.core.spannormalizer.rawspan.RawSpanToStructuredTraceTransformer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,17 +56,19 @@ public StreamsBuilder buildTopology( Map properties, StreamsBuilder streamsBuilder, Map> inputStreams) { + Config jobConfig = getJobConfig(properties); String inputTopic = jobConfig.getString(INPUT_TOPIC_CONFIG_KEY); String outputTopic = jobConfig.getString(OUTPUT_TOPIC_CONFIG_KEY); + String bypassOutputTopic = jobConfig.getString(BYPASS_OUTPUT_TOPIC_CONFIG_KEY); + String outputTopicRawLogs = jobConfig.getString(OUTPUT_TOPIC_RAW_LOGS_CONFIG_KEY); - KStream inputStream = - (KStream) inputStreams.get(inputTopic); + KStream inputStream = + (KStream) inputStreams.get(inputTopic); if (inputStream == null) { inputStream = - streamsBuilder - // read the input topic - .stream(inputTopic); + streamsBuilder.stream( + inputTopic, Consumed.with(Serdes.ByteArray(), new JaegerSpanSerde())); inputStreams.put(inputTopic, inputStream); } @@ -75,7 +89,34 @@ public StreamsBuilder buildTopology( streamsBuilder.addStateStore(spanStoreBuilder); streamsBuilder.addStateStore(traceStateStoreBuilder); - StreamPartitioner groupPartitioner = + KStream preProcessedStream = + inputStream.transform(() -> new JaegerSpanPreProcessor(getGrpcChannelRegistry())); + + // logs output + preProcessedStream.transform(JaegerSpanToLogRecordsTransformer::new).to(outputTopicRawLogs); + + KStream[] branches = + preProcessedStream + .transform(JaegerSpanToAvroRawSpanTransformer::new) + .branch(new ByPassPredicate(jobConfig), (key, value) -> true); + + KStream bypassTopicBranch = branches[0]; + KStream outputTopicBranch = branches[1]; + + StreamPartitioner tenantIsolationPartitionerForBypassTopic = + new GroupPartitionerBuilder() + .buildPartitioner( + "spans", + jobConfig, + (traceid, span) -> traceid, + new KeyHashPartitioner<>(), + getGrpcChannelRegistry()); + + bypassTopicBranch + .transform(RawSpanToStructuredTraceTransformer::new) + .to(bypassOutputTopic, Produced.with(null, null, tenantIsolationPartitionerForBypassTopic)); + + StreamPartitioner tenantIsolationPartitionerForOutputTopic = new GroupPartitionerBuilder() .buildPartitioner( "spans", @@ -85,10 +126,10 @@ public StreamsBuilder buildTopology( getGrpcChannelRegistry()); Produced outputTopicProducer = - Produced.with(null, null, groupPartitioner); + Produced.with(null, null, tenantIsolationPartitionerForOutputTopic); outputTopicProducer = outputTopicProducer.withName(OUTPUT_TOPIC_PRODUCER); - inputStream + outputTopicBranch .transform( RawSpansProcessor::new, Named.as(RawSpansProcessor.class.getSimpleName()), diff --git a/span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/span/normalizer/constants/SpanNormalizerConstants.java b/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/span/normalizer/constants/SpanNormalizerConstants.java similarity index 100% rename from span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/span/normalizer/constants/SpanNormalizerConstants.java rename to raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/span/normalizer/constants/SpanNormalizerConstants.java diff --git a/span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/client/ConfigServiceClient.java b/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/spannormalizer/client/ConfigServiceClient.java similarity index 100% rename from span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/client/ConfigServiceClient.java rename to raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/spannormalizer/client/ConfigServiceClient.java diff --git a/span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/config/ConfigServiceConfig.java b/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/spannormalizer/config/ConfigServiceConfig.java similarity index 100% rename from span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/config/ConfigServiceConfig.java rename to raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/spannormalizer/config/ConfigServiceConfig.java diff --git a/span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/fieldgenerators/FieldGenerator.java b/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/spannormalizer/fieldgenerators/FieldGenerator.java similarity index 100% rename from span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/fieldgenerators/FieldGenerator.java rename to raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/spannormalizer/fieldgenerators/FieldGenerator.java diff --git a/span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/fieldgenerators/FieldsGenerator.java b/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/spannormalizer/fieldgenerators/FieldsGenerator.java similarity index 100% rename from span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/fieldgenerators/FieldsGenerator.java rename to raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/spannormalizer/fieldgenerators/FieldsGenerator.java diff --git a/span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/fieldgenerators/FirstMatchingKeyFinder.java b/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/spannormalizer/fieldgenerators/FirstMatchingKeyFinder.java similarity index 100% rename from span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/fieldgenerators/FirstMatchingKeyFinder.java rename to raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/spannormalizer/fieldgenerators/FirstMatchingKeyFinder.java diff --git a/span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/fieldgenerators/GrpcFieldsGenerator.java b/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/spannormalizer/fieldgenerators/GrpcFieldsGenerator.java similarity index 100% rename from span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/fieldgenerators/GrpcFieldsGenerator.java rename to raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/spannormalizer/fieldgenerators/GrpcFieldsGenerator.java diff --git a/span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/fieldgenerators/HttpFieldsGenerator.java b/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/spannormalizer/fieldgenerators/HttpFieldsGenerator.java similarity index 100% rename from span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/fieldgenerators/HttpFieldsGenerator.java rename to raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/spannormalizer/fieldgenerators/HttpFieldsGenerator.java diff --git a/span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/fieldgenerators/ProtocolFieldsGenerator.java b/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/spannormalizer/fieldgenerators/ProtocolFieldsGenerator.java similarity index 100% rename from span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/fieldgenerators/ProtocolFieldsGenerator.java rename to raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/spannormalizer/fieldgenerators/ProtocolFieldsGenerator.java diff --git a/span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/fieldgenerators/RpcFieldsGenerator.java b/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/spannormalizer/fieldgenerators/RpcFieldsGenerator.java similarity index 100% rename from span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/fieldgenerators/RpcFieldsGenerator.java rename to raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/spannormalizer/fieldgenerators/RpcFieldsGenerator.java diff --git a/span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/fieldgenerators/SqlFieldsGenerator.java b/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/spannormalizer/fieldgenerators/SqlFieldsGenerator.java similarity index 100% rename from span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/fieldgenerators/SqlFieldsGenerator.java rename to raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/spannormalizer/fieldgenerators/SqlFieldsGenerator.java diff --git a/span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/fieldgenerators/ValueConverter.java b/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/spannormalizer/fieldgenerators/ValueConverter.java similarity index 100% rename from span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/fieldgenerators/ValueConverter.java rename to raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/spannormalizer/fieldgenerators/ValueConverter.java diff --git a/span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/jaeger/ExcludeSpanRuleEvaluator.java b/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/spannormalizer/jaeger/ExcludeSpanRuleEvaluator.java similarity index 100% rename from span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/jaeger/ExcludeSpanRuleEvaluator.java rename to raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/spannormalizer/jaeger/ExcludeSpanRuleEvaluator.java diff --git a/span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/jaeger/ExcludeSpanRulesCache.java b/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/spannormalizer/jaeger/ExcludeSpanRulesCache.java similarity index 100% rename from span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/jaeger/ExcludeSpanRulesCache.java rename to raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/spannormalizer/jaeger/ExcludeSpanRulesCache.java diff --git a/span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/jaeger/JaegerResourceNormalizer.java b/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/spannormalizer/jaeger/JaegerResourceNormalizer.java similarity index 100% rename from span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/jaeger/JaegerResourceNormalizer.java rename to raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/spannormalizer/jaeger/JaegerResourceNormalizer.java diff --git a/span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/jaeger/JaegerSpanNormalizer.java b/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/spannormalizer/jaeger/JaegerSpanNormalizer.java similarity index 100% rename from span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/jaeger/JaegerSpanNormalizer.java rename to raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/spannormalizer/jaeger/JaegerSpanNormalizer.java diff --git a/span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/jaeger/JaegerSpanPreProcessor.java b/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/spannormalizer/jaeger/JaegerSpanPreProcessor.java similarity index 95% rename from span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/jaeger/JaegerSpanPreProcessor.java rename to raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/spannormalizer/jaeger/JaegerSpanPreProcessor.java index 9eca29960..c0e3f9e9f 100644 --- a/span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/jaeger/JaegerSpanPreProcessor.java +++ b/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/spannormalizer/jaeger/JaegerSpanPreProcessor.java @@ -1,6 +1,6 @@ package org.hypertrace.core.spannormalizer.jaeger; -import static org.hypertrace.core.spannormalizer.constants.SpanNormalizerConstants.SPAN_NORMALIZER_JOB_CONFIG; +import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.RAW_SPANS_GROUPER_JOB_CONFIG; import static org.hypertrace.core.spannormalizer.util.EventBuilder.buildEvent; import com.google.common.annotations.VisibleForTesting; @@ -53,7 +53,7 @@ public JaegerSpanPreProcessor(GrpcChannelRegistry grpcChannelRegistry) { @Override public void init(ProcessorContext context) { - Config jobConfig = (Config) context.appConfigs().get(SPAN_NORMALIZER_JOB_CONFIG); + Config jobConfig = (Config) context.appConfigs().get(RAW_SPANS_GROUPER_JOB_CONFIG); tenantIdHandler = new TenantIdHandler(jobConfig); spanDropManager = new SpanDropManager(jobConfig, grpcChannelRegistry); tagsFilter = new TagsFilter(jobConfig); diff --git a/span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/jaeger/JaegerSpanSerde.java b/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/spannormalizer/jaeger/JaegerSpanSerde.java similarity index 100% rename from span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/jaeger/JaegerSpanSerde.java rename to raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/spannormalizer/jaeger/JaegerSpanSerde.java diff --git a/span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/jaeger/JaegerSpanToAvroRawSpanTransformer.java b/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/spannormalizer/jaeger/JaegerSpanToAvroRawSpanTransformer.java similarity index 94% rename from span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/jaeger/JaegerSpanToAvroRawSpanTransformer.java rename to raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/spannormalizer/jaeger/JaegerSpanToAvroRawSpanTransformer.java index 37a584eed..9725369fa 100644 --- a/span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/jaeger/JaegerSpanToAvroRawSpanTransformer.java +++ b/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/spannormalizer/jaeger/JaegerSpanToAvroRawSpanTransformer.java @@ -1,7 +1,7 @@ package org.hypertrace.core.spannormalizer.jaeger; import static org.hypertrace.core.datamodel.shared.AvroBuilderCache.fastNewBuilder; -import static org.hypertrace.core.spannormalizer.constants.SpanNormalizerConstants.SPAN_NORMALIZER_JOB_CONFIG; +import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.RAW_SPANS_GROUPER_JOB_CONFIG; import static org.hypertrace.core.spannormalizer.jaeger.JaegerSpanPreProcessor.SPANS_COUNTER; import com.typesafe.config.Config; @@ -36,7 +36,7 @@ public class JaegerSpanToAvroRawSpanTransformer @Override public void init(ProcessorContext context) { - Config jobConfig = (Config) context.appConfigs().get(SPAN_NORMALIZER_JOB_CONFIG); + Config jobConfig = (Config) context.appConfigs().get(RAW_SPANS_GROUPER_JOB_CONFIG); converter = JaegerSpanNormalizer.get(jobConfig); } diff --git a/span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/jaeger/JaegerSpanToLogRecordsTransformer.java b/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/spannormalizer/jaeger/JaegerSpanToLogRecordsTransformer.java similarity index 95% rename from span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/jaeger/JaegerSpanToLogRecordsTransformer.java rename to raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/spannormalizer/jaeger/JaegerSpanToLogRecordsTransformer.java index f69899fad..c9dc2dd02 100644 --- a/span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/jaeger/JaegerSpanToLogRecordsTransformer.java +++ b/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/spannormalizer/jaeger/JaegerSpanToLogRecordsTransformer.java @@ -1,7 +1,7 @@ package org.hypertrace.core.spannormalizer.jaeger; import static org.hypertrace.core.datamodel.shared.AvroBuilderCache.fastNewBuilder; -import static org.hypertrace.core.spannormalizer.constants.SpanNormalizerConstants.SPAN_NORMALIZER_JOB_CONFIG; +import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.RAW_SPANS_GROUPER_JOB_CONFIG; import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.util.Timestamps; @@ -44,7 +44,7 @@ public class JaegerSpanToLogRecordsTransformer @Override public void init(ProcessorContext context) { - Config jobConfig = (Config) context.appConfigs().get(SPAN_NORMALIZER_JOB_CONFIG); + Config jobConfig = (Config) context.appConfigs().get(RAW_SPANS_GROUPER_JOB_CONFIG); this.tenantIdsToExclude = jobConfig.hasPath(TENANT_IDS_TO_EXCLUDE_LOGS_CONFIG) ? jobConfig.getStringList(TENANT_IDS_TO_EXCLUDE_LOGS_CONFIG) diff --git a/span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/jaeger/PreProcessedSpan.java b/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/spannormalizer/jaeger/PreProcessedSpan.java similarity index 100% rename from span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/jaeger/PreProcessedSpan.java rename to raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/spannormalizer/jaeger/PreProcessedSpan.java diff --git a/span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/jaeger/RateLimitingSpanFilter.java b/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/spannormalizer/jaeger/RateLimitingSpanFilter.java similarity index 100% rename from span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/jaeger/RateLimitingSpanFilter.java rename to raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/spannormalizer/jaeger/RateLimitingSpanFilter.java diff --git a/span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/jaeger/SpanDropFilter.java b/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/spannormalizer/jaeger/SpanDropFilter.java similarity index 100% rename from span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/jaeger/SpanDropFilter.java rename to raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/spannormalizer/jaeger/SpanDropFilter.java diff --git a/span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/jaeger/SpanDropManager.java b/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/spannormalizer/jaeger/SpanDropManager.java similarity index 100% rename from span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/jaeger/SpanDropManager.java rename to raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/spannormalizer/jaeger/SpanDropManager.java diff --git a/span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/jaeger/SpanFilter.java b/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/spannormalizer/jaeger/SpanFilter.java similarity index 100% rename from span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/jaeger/SpanFilter.java rename to raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/spannormalizer/jaeger/SpanFilter.java diff --git a/span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/jaeger/TagsFilter.java b/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/spannormalizer/jaeger/TagsFilter.java similarity index 100% rename from span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/jaeger/TagsFilter.java rename to raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/spannormalizer/jaeger/TagsFilter.java diff --git a/span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/jaeger/TenantIdHandler.java b/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/spannormalizer/jaeger/TenantIdHandler.java similarity index 100% rename from span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/jaeger/TenantIdHandler.java rename to raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/spannormalizer/jaeger/TenantIdHandler.java diff --git a/span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/jaeger/tenant/DefaultTenantIdProvider.java b/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/spannormalizer/jaeger/tenant/DefaultTenantIdProvider.java similarity index 100% rename from span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/jaeger/tenant/DefaultTenantIdProvider.java rename to raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/spannormalizer/jaeger/tenant/DefaultTenantIdProvider.java diff --git a/span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/jaeger/tenant/JaegerKeyBasedTenantIdProvider.java b/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/spannormalizer/jaeger/tenant/JaegerKeyBasedTenantIdProvider.java similarity index 100% rename from span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/jaeger/tenant/JaegerKeyBasedTenantIdProvider.java rename to raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/spannormalizer/jaeger/tenant/JaegerKeyBasedTenantIdProvider.java diff --git a/span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/jaeger/tenant/TenantIdProvider.java b/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/spannormalizer/jaeger/tenant/TenantIdProvider.java similarity index 100% rename from span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/jaeger/tenant/TenantIdProvider.java rename to raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/spannormalizer/jaeger/tenant/TenantIdProvider.java diff --git a/span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/processor/SpanNormalizer.java b/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/spannormalizer/processor/SpanNormalizer.java similarity index 100% rename from span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/processor/SpanNormalizer.java rename to raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/spannormalizer/processor/SpanNormalizer.java diff --git a/span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/rawspan/ByPassPredicate.java b/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/spannormalizer/rawspan/ByPassPredicate.java similarity index 100% rename from span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/rawspan/ByPassPredicate.java rename to raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/spannormalizer/rawspan/ByPassPredicate.java diff --git a/span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/rawspan/RawSpanToStructuredTraceTransformer.java b/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/spannormalizer/rawspan/RawSpanToStructuredTraceTransformer.java similarity index 100% rename from span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/rawspan/RawSpanToStructuredTraceTransformer.java rename to raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/spannormalizer/rawspan/RawSpanToStructuredTraceTransformer.java diff --git a/span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/util/EventBuilder.java b/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/spannormalizer/util/EventBuilder.java similarity index 100% rename from span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/util/EventBuilder.java rename to raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/spannormalizer/util/EventBuilder.java diff --git a/span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/util/JaegerHTTagsConverter.java b/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/spannormalizer/util/JaegerHTTagsConverter.java similarity index 100% rename from span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/util/JaegerHTTagsConverter.java rename to raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/spannormalizer/util/JaegerHTTagsConverter.java diff --git a/raw-spans-grouper/raw-spans-grouper/src/main/resources/configs/common/application.conf b/raw-spans-grouper/raw-spans-grouper/src/main/resources/configs/common/application.conf index c70747fba..b80b02d0a 100644 --- a/raw-spans-grouper/raw-spans-grouper/src/main/resources/configs/common/application.conf +++ b/raw-spans-grouper/raw-spans-grouper/src/main/resources/configs/common/application.conf @@ -4,8 +4,10 @@ service.admin.port = 8051 main.class = org.hypertrace.core.rawspansgrouper.RawSpansGrouper span.type = rawSpan -input.topic = "raw-spans-from-jaeger-spans" +input.topic = "jaeger-spans" output.topic = "structured-traces-from-raw-spans" +bypass.output.topic = "structured-traces-from-raw-spans" +raw.logs.output.topic = "raw-logs" precreate.topics = false precreate.topics = ${?PRE_CREATE_TOPICS} @@ -61,3 +63,37 @@ metrics.reporter { } dataflow.metriccollection.sampling.percent = 10.0 + + +clients = { + config.service.config = { + host = localhost + port = 50101 + } +} + +processor { + defaultTenantId = ${?DEFAULT_TENANT_ID} + late.arrival.threshold.duration = 365d + + # Configuration for dropping certain attributes that are captured by agent, but doesn't require in + # the processing pipeline. + # + # allowed.attributes.prefixes : the list of prefixes that should match for which allowed keys + # prefixed.matched.allowed.attributes : allowed keys from the subset of keys where prefix matched + # + # If either of config is empty allowed.attributes.prefixes or prefixed.matched.allowed.attributes, + # it will not drop any attributes. + # The above configuration doesn't impact if the key doesn't start with prefix. + allowed.attributes.prefixes = [] + prefixed.matched.allowed.attributes = [] +} + +span.rules.exclude { + cache = { + refreshAfterWriteDuration = 3m + expireAfterWriteDuration = 5m + } +} + +rate.limit.config = [] diff --git a/raw-spans-grouper/raw-spans-grouper/src/test/java/org/hypertrace/core/rawspansgrouper/RawSpansGrouperTest.java b/raw-spans-grouper/raw-spans-grouper/src/test/java/org/hypertrace/core/rawspansgrouper/RawSpansGrouperTest.java index 3d96e9b53..068171882 100644 --- a/raw-spans-grouper/raw-spans-grouper/src/test/java/org/hypertrace/core/rawspansgrouper/RawSpansGrouperTest.java +++ b/raw-spans-grouper/raw-spans-grouper/src/test/java/org/hypertrace/core/rawspansgrouper/RawSpansGrouperTest.java @@ -1,21 +1,29 @@ package org.hypertrace.core.rawspansgrouper; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; +import com.google.protobuf.ByteString; +import com.google.protobuf.Timestamp; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde; +import io.jaegertracing.api_v2.JaegerSpanInternalModel; import java.io.File; import java.nio.ByteBuffer; import java.nio.file.Path; import java.time.Duration; +import java.time.Instant; +import java.time.temporal.ChronoUnit; import java.util.HashMap; import java.util.Map; import java.util.Properties; import java.util.Set; import java.util.stream.Collectors; import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.TestInputTopic; @@ -23,19 +31,26 @@ import org.apache.kafka.streams.TopologyTestDriver; import org.apache.kafka.streams.test.TestRecord; import org.hypertrace.core.datamodel.Event; -import org.hypertrace.core.datamodel.RawSpan; +import org.hypertrace.core.datamodel.LogEvents; import org.hypertrace.core.datamodel.StructuredTrace; +import org.hypertrace.core.datamodel.shared.HexUtils; +import org.hypertrace.core.kafkastreams.framework.serdes.AvroSerde; import org.hypertrace.core.serviceframework.config.ConfigClientFactory; import org.hypertrace.core.spannormalizer.TraceIdentity; +import org.hypertrace.core.spannormalizer.constants.SpanNormalizerConstants; +import org.hypertrace.core.spannormalizer.jaeger.JaegerSpanSerde; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; import org.junitpioneer.jupiter.SetEnvironmentVariable; public class RawSpansGrouperTest { + private static final String SERVICE_NAME = "servicename"; + @Test @SetEnvironmentVariable(key = "SERVICE_NAME", value = "raw-spans-grouper") - public void whenRawSpansAreReceivedWithInactivityExpectTraceToBeOutput(@TempDir Path tempDir) { + public void whenSpansMatchDropFilterThenExpectThemToBeDropped(@TempDir Path tempDir) { File file = tempDir.resolve("state").toFile(); RawSpansGrouper underTest = new RawSpansGrouper(ConfigClientFactory.getClient()); @@ -63,11 +78,14 @@ public void whenRawSpansAreReceivedWithInactivityExpectTraceToBeOutput(@TempDir Serde traceIdentitySerde = new StreamsConfig(mergedProps).defaultKeySerde(); TopologyTestDriver td = new TopologyTestDriver(streamsBuilder.build(), props); - TestInputTopic inputTopic = + + TestInputTopic inputTopic = td.createInputTopic( - config.getString(RawSpanGrouperConstants.INPUT_TOPIC_CONFIG_KEY), - traceIdentitySerde.serializer(), - defaultValueSerde.serializer()); + config.getString( + org.hypertrace.core.spannormalizer.constants.SpanNormalizerConstants + .INPUT_TOPIC_CONFIG_KEY), + Serdes.ByteArray().serializer(), + new JaegerSpanSerde().serializer()); TestOutputTopic outputTopic = td.createOutputTopic( @@ -77,133 +95,606 @@ public void whenRawSpansAreReceivedWithInactivityExpectTraceToBeOutput(@TempDir String tenantId = "tenant1"; - // create spans for trace-1 of tenant1 - RawSpan span1 = - RawSpan.newBuilder() - .setTraceId(ByteBuffer.wrap("trace-1".getBytes())) - .setCustomerId("tenant1") - .setEvent(createEvent("event-1", "tenant1")) - .build(); - RawSpan span2 = - RawSpan.newBuilder() - .setTraceId(ByteBuffer.wrap("trace-1".getBytes())) - .setCustomerId("tenant1") - .setEvent(createEvent("event-2", "tenant1")) - .build(); - RawSpan span3 = - RawSpan.newBuilder() - .setTraceId(ByteBuffer.wrap("trace-1".getBytes())) - .setCustomerId("tenant1") - .setEvent(createEvent("event-3", "tenant1")) + JaegerSpanInternalModel.Span span = + JaegerSpanInternalModel.Span.newBuilder() + .setSpanId(ByteString.copyFrom("1".getBytes())) + .setTraceId(ByteString.copyFrom("trace-1".getBytes())) + .addTags( + JaegerSpanInternalModel.KeyValue.newBuilder() + .setKey("tenant-id") + .setVStr(tenantId) + .build()) + .addTags( + JaegerSpanInternalModel.KeyValue.newBuilder() + .setKey("jaeger.servicename") + .setVStr(SERVICE_NAME) + .build()) + .addTags( + JaegerSpanInternalModel.KeyValue.newBuilder() + .setKey("http.method") + .setVStr("GET") + .build()) + .addTags( + JaegerSpanInternalModel.KeyValue.newBuilder() + .setKey("http.url") + .setVStr("http://xyz.com/health/check") + .build()) .build(); - // create spans for trace-2 of tenant1 - RawSpan span4 = - RawSpan.newBuilder() - .setTraceId(ByteBuffer.wrap("trace-2".getBytes())) - .setCustomerId("tenant1") - .setEvent(createEvent("event-4", "tenant1")) - .build(); - RawSpan span5 = - RawSpan.newBuilder() - .setTraceId(ByteBuffer.wrap("trace-2".getBytes())) - .setCustomerId("tenant1") - .setEvent(createEvent("event-5", "tenant1")) - .build(); + inputTopic.pipeInput(span); + assertTrue(outputTopic.isEmpty()); - // create spans for trace-3 of tenant1 - RawSpan span6 = - RawSpan.newBuilder() - .setTraceId(ByteBuffer.wrap("trace-3".getBytes())) - .setCustomerId("tenant1") - .setEvent(createEvent("event-6", "tenant1")) - .build(); - RawSpan span7 = - RawSpan.newBuilder() - .setTraceId(ByteBuffer.wrap("trace-3".getBytes())) - .setCustomerId("tenant1") - .setEvent(createEvent("event-7", "tenant1")) - .build(); - RawSpan span8 = - RawSpan.newBuilder() - .setTraceId(ByteBuffer.wrap("trace-3".getBytes())) - .setCustomerId("tenant1") - .setEvent(createEvent("event-8", "tenant1")) - .build(); - RawSpan span9 = - RawSpan.newBuilder() - .setTraceId(ByteBuffer.wrap("trace-3".getBytes())) - .setCustomerId("tenant1") - .setEvent(createEvent("event-9", "tenant1")) - .build(); - RawSpan span10 = - RawSpan.newBuilder() - .setTraceId(ByteBuffer.wrap("trace-3".getBytes())) - .setCustomerId("tenant1") - .setEvent(createEvent("event-10", "tenant1")) - .build(); - RawSpan span11 = - RawSpan.newBuilder() - .setTraceId(ByteBuffer.wrap("trace-3".getBytes())) - .setCustomerId("tenant1") - .setEvent(createEvent("event-11", "tenant1")) + // pipe in one more span which match one of spanDropFilters (grpc.url) + JaegerSpanInternalModel.Span span2 = + JaegerSpanInternalModel.Span.newBuilder() + .setSpanId(ByteString.copyFrom("3".getBytes())) + .setTraceId(ByteString.copyFrom("trace-3".getBytes())) + .addTags( + JaegerSpanInternalModel.KeyValue.newBuilder() + .setKey("tenant-id") + .setVStr(tenantId) + .build()) + .addTags( + JaegerSpanInternalModel.KeyValue.newBuilder() + .setKey("jaeger.servicename") + .setVStr(SERVICE_NAME) + .build()) + .addTags( + JaegerSpanInternalModel.KeyValue.newBuilder() + .setKey("grpc.url") + .setVStr("doesn't match with input filter set") + .build()) .build(); - // create 8 spans for tenant-2 for trace-4 - String tenant2 = "tenant2"; - RawSpan span12 = - RawSpan.newBuilder() - .setTraceId(ByteBuffer.wrap("trace-4".getBytes())) - .setCustomerId(tenant2) - .setEvent(createEvent("event-12", tenant2)) - .build(); - RawSpan span13 = - RawSpan.newBuilder() - .setTraceId(ByteBuffer.wrap("trace-4".getBytes())) - .setCustomerId(tenant2) - .setEvent(createEvent("event-13", tenant2)) + inputTopic.pipeInput(span2); + assertTrue(outputTopic.isEmpty()); + + // pipe in one more span which match one of spanDropFilters (operation_name, and span.kind + // not + // exists) + JaegerSpanInternalModel.Span span5 = + JaegerSpanInternalModel.Span.newBuilder() + .setSpanId(ByteString.copyFrom("4".getBytes())) + .setTraceId(ByteString.copyFrom("trace-4".getBytes())) + .setOperationName("/api/") + .addTags( + JaegerSpanInternalModel.KeyValue.newBuilder() + .setKey("jaeger.servicename") + .setVStr(SERVICE_NAME) + .build()) + .addTags( + JaegerSpanInternalModel.KeyValue.newBuilder() + .setKey("grpc1.url1") + .setVStr("xyz") + .build()) .build(); - RawSpan span14 = - RawSpan.newBuilder() - .setTraceId(ByteBuffer.wrap("trace-4".getBytes())) - .setCustomerId(tenant2) - .setEvent(createEvent("event-14", tenant2)) + + inputTopic.pipeInput(span5); + assertTrue(outputTopic.isEmpty()); + } + + @Test + @SetEnvironmentVariable(key = "SERVICE_NAME", value = "raw-spans-grouper") + public void whenBypassedExpectStruturedTraceToBeOutput(@TempDir Path tempDir) { + File file = tempDir.resolve("state").toFile(); + + RawSpansGrouper underTest = new RawSpansGrouper(ConfigClientFactory.getClient()); + Config config = + ConfigFactory.parseURL( + getClass().getClassLoader().getResource("configs/raw-spans-grouper/application.conf")); + + Map baseProps = underTest.getBaseStreamsConfig(); + Map streamsProps = underTest.getStreamsConfig(config); + baseProps.forEach(streamsProps::put); + Map mergedProps = streamsProps; + + mergedProps.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class); + mergedProps.put(RawSpanGrouperConstants.RAW_SPANS_GROUPER_JOB_CONFIG, config); + mergedProps.put(StreamsConfig.STATE_DIR_CONFIG, file.getAbsolutePath()); + + StreamsBuilder streamsBuilder = + underTest.buildTopology(mergedProps, new StreamsBuilder(), new HashMap<>()); + + Properties props = new Properties(); + mergedProps.forEach(props::put); + + TopologyTestDriver td = new TopologyTestDriver(streamsBuilder.build(), props); + + TestInputTopic inputTopic = + td.createInputTopic( + config.getString( + org.hypertrace.core.spannormalizer.constants.SpanNormalizerConstants + .INPUT_TOPIC_CONFIG_KEY), + Serdes.ByteArray().serializer(), + new JaegerSpanSerde().serializer()); + + Serde defaultValueSerde = new StreamsConfig(mergedProps).defaultValueSerde(); + Serde traceIdentitySerde = new StreamsConfig(mergedProps).defaultKeySerde(); + + Serde spanIdentitySerde = new AvroSerde<>(); + spanIdentitySerde.configure(Map.of(), true); + + TestOutputTopic outputTopic = + td.createOutputTopic( + config.getString(RawSpanGrouperConstants.OUTPUT_TOPIC_CONFIG_KEY), + traceIdentitySerde.deserializer(), + defaultValueSerde.deserializer()); + + TestOutputTopic bypassOutputTopic = + td.createOutputTopic( + config.getString(SpanNormalizerConstants.BYPASS_OUTPUT_TOPIC_CONFIG_KEY), + Serdes.String().deserializer(), + defaultValueSerde.deserializer()); + + TestOutputTopic rawLogOutputTopic = + td.createOutputTopic( + config.getString(SpanNormalizerConstants.OUTPUT_TOPIC_RAW_LOGS_CONFIG_KEY), + spanIdentitySerde.deserializer(), + defaultValueSerde.deserializer()); + + // with logs event, with bypass key + // expects no output to raw-span-grouper + // expects output to trace-enricher + // expects log output + JaegerSpanInternalModel.Span span1 = + JaegerSpanInternalModel.Span.newBuilder() + .setSpanId(ByteString.copyFrom("1".getBytes())) + .setTraceId(ByteString.copyFrom("trace-1".getBytes())) + .addTags( + JaegerSpanInternalModel.KeyValue.newBuilder() + .setKey("tenant-id") + .setVStr("tenant-1") + .build()) + .addTags( + JaegerSpanInternalModel.KeyValue.newBuilder() + .setKey("jaeger.servicename") + .setVStr(SERVICE_NAME) + .build()) + .addTags( + JaegerSpanInternalModel.KeyValue.newBuilder() + .setKey("test.bypass") + .setVStr("true") + .build()) + .addLogs( + JaegerSpanInternalModel.Log.newBuilder() + .setTimestamp(Timestamp.newBuilder().setSeconds(10).build()) + .addFields( + JaegerSpanInternalModel.KeyValue.newBuilder() + .setKey("z2") + .setVStr("some event detail") + .build())) .build(); - RawSpan span15 = - RawSpan.newBuilder() - .setTraceId(ByteBuffer.wrap("trace-4".getBytes())) - .setCustomerId(tenant2) - .setEvent(createEvent("event-15", tenant2)) + inputTopic.pipeInput(span1); + + // validate output for trace-enricher + assertFalse(bypassOutputTopic.isEmpty()); + KeyValue kv1 = bypassOutputTopic.readKeyValue(); + assertEquals("tenant-1", kv1.value.getCustomerId()); + assertEquals( + HexUtils.getHex(ByteString.copyFrom("trace-1".getBytes()).toByteArray()), + HexUtils.getHex(kv1.value.getTraceId().array())); + + // validate no output for raw-spans-grouper + assertTrue(outputTopic.isEmpty()); + + // validate that no change in log traffic + assertFalse(rawLogOutputTopic.isEmpty()); + LogEvents logEvents = (LogEvents) rawLogOutputTopic.readKeyValue().value; + Assertions.assertEquals(1, logEvents.getLogEvents().size()); + + // with logs event, without bypass key + // expects output to trace-enricher (after grouping) + // expects no output to bypass output topic + // expects log output + JaegerSpanInternalModel.Span span2 = + JaegerSpanInternalModel.Span.newBuilder() + .setSpanId(ByteString.copyFrom("2".getBytes())) + .setTraceId(ByteString.copyFrom("trace-2".getBytes())) + .addTags( + JaegerSpanInternalModel.KeyValue.newBuilder() + .setKey("tenant-id") + .setVStr("tenant-1") + .build()) + .addTags( + JaegerSpanInternalModel.KeyValue.newBuilder() + .setKey("jaeger.servicename") + .setVStr(SERVICE_NAME) + .build()) + .addLogs( + JaegerSpanInternalModel.Log.newBuilder() + .setTimestamp(Timestamp.newBuilder().setSeconds(10).build()) + .addFields( + JaegerSpanInternalModel.KeyValue.newBuilder() + .setKey("z2") + .setVStr("some event detail") + .build())) .build(); - RawSpan span16 = - RawSpan.newBuilder() - .setTraceId(ByteBuffer.wrap("trace-4".getBytes())) - .setCustomerId(tenant2) - .setEvent(createEvent("event-16", tenant2)) + + inputTopic.pipeInput(span2); + td.advanceWallClockTime(Duration.ofSeconds(35)); + + // validate that no output to bypass topic + assertTrue(bypassOutputTopic.isEmpty()); + + // validate that output to trace-enricher exists + assertFalse(outputTopic.isEmpty()); + KeyValue kv2 = outputTopic.readKeyValue(); + assertEquals("tenant-1", kv2.key.getTenantId()); + assertEquals( + HexUtils.getHex(ByteString.copyFrom("trace-2".getBytes()).toByteArray()), + HexUtils.getHex(kv2.key.getTraceId().array())); + + // validate that no change in log traffic + assertFalse(rawLogOutputTopic.isEmpty()); + logEvents = (LogEvents) rawLogOutputTopic.readKeyValue().value; + Assertions.assertEquals(1, logEvents.getLogEvents().size()); + + // with logs event, with bypass key but false value + // expects output to raw-span-grouper + // expects no output to trace-enricher + // expects log output + JaegerSpanInternalModel.Span span3 = + JaegerSpanInternalModel.Span.newBuilder() + .setSpanId(ByteString.copyFrom("3".getBytes())) + .setTraceId(ByteString.copyFrom("trace-3".getBytes())) + .addTags( + JaegerSpanInternalModel.KeyValue.newBuilder() + .setKey("tenant-id") + .setVStr("tenant-1") + .build()) + .addTags( + JaegerSpanInternalModel.KeyValue.newBuilder() + .setKey("jaeger.servicename") + .setVStr(SERVICE_NAME) + .build()) + .addTags( + JaegerSpanInternalModel.KeyValue.newBuilder() + .setKey("http.method") + .setVStr("GET") + .build()) + .addTags( + JaegerSpanInternalModel.KeyValue.newBuilder() + .setKey("test.bypass") + .setVStr("false") + .build()) + .addLogs( + JaegerSpanInternalModel.Log.newBuilder() + .setTimestamp(Timestamp.newBuilder().setSeconds(10).build()) + .addFields( + JaegerSpanInternalModel.KeyValue.newBuilder() + .setKey("z2") + .setVStr("some event detail") + .build())) .build(); - RawSpan span17 = - RawSpan.newBuilder() - .setTraceId(ByteBuffer.wrap("trace-4".getBytes())) - .setCustomerId(tenant2) - .setEvent(createEvent("event-17", tenant2)) + + inputTopic.pipeInput(span3); + td.advanceWallClockTime(Duration.ofSeconds(35)); + + // validate that no output to bypass topic + assertTrue(bypassOutputTopic.isEmpty()); + + // validate that output to trace-enricher + assertFalse(outputTopic.isEmpty()); + KeyValue kv3 = outputTopic.readKeyValue(); + assertEquals("tenant-1", kv3.key.getTenantId()); + assertEquals( + HexUtils.getHex(ByteString.copyFrom("trace-3".getBytes()).toByteArray()), + HexUtils.getHex(kv3.key.getTraceId().array())); + + // validate that no change in log traffic + assertFalse(rawLogOutputTopic.isEmpty()); + logEvents = (LogEvents) rawLogOutputTopic.readKeyValue().value; + Assertions.assertEquals(1, logEvents.getLogEvents().size()); + } + + @Test + @SetEnvironmentVariable(key = "SERVICE_NAME", value = "raw-spans-grouper") + public void testLaterArrivalJaegerSpans(@TempDir Path tempDir) { + File file = tempDir.resolve("state").toFile(); + + RawSpansGrouper underTest = new RawSpansGrouper(ConfigClientFactory.getClient()); + Config config = + ConfigFactory.parseURL( + getClass().getClassLoader().getResource("configs/raw-spans-grouper/application.conf")); + + Map baseProps = underTest.getBaseStreamsConfig(); + Map streamsProps = underTest.getStreamsConfig(config); + baseProps.forEach(streamsProps::put); + Map mergedProps = streamsProps; + + mergedProps.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class); + mergedProps.put(RawSpanGrouperConstants.RAW_SPANS_GROUPER_JOB_CONFIG, config); + mergedProps.put(StreamsConfig.STATE_DIR_CONFIG, file.getAbsolutePath()); + + StreamsBuilder streamsBuilder = + underTest.buildTopology(mergedProps, new StreamsBuilder(), new HashMap<>()); + + Properties props = new Properties(); + mergedProps.forEach(props::put); + + TopologyTestDriver td = new TopologyTestDriver(streamsBuilder.build(), props); + + TestInputTopic inputTopic = + td.createInputTopic( + config.getString( + org.hypertrace.core.spannormalizer.constants.SpanNormalizerConstants + .INPUT_TOPIC_CONFIG_KEY), + Serdes.ByteArray().serializer(), + new JaegerSpanSerde().serializer()); + + Serde defaultValueSerde = new StreamsConfig(mergedProps).defaultValueSerde(); + Serde traceIdentitySerde = new StreamsConfig(mergedProps).defaultKeySerde(); + + Serde spanIdentitySerde = new AvroSerde<>(); + spanIdentitySerde.configure(Map.of(), true); + + TestOutputTopic outputTopic = + td.createOutputTopic( + config.getString(RawSpanGrouperConstants.OUTPUT_TOPIC_CONFIG_KEY), + traceIdentitySerde.deserializer(), + defaultValueSerde.deserializer()); + + // case 1: within threshold, expect output + Instant instant = Instant.now(); + JaegerSpanInternalModel.Span span = + JaegerSpanInternalModel.Span.newBuilder() + .setSpanId(ByteString.copyFrom("1".getBytes())) + .setTraceId(ByteString.copyFrom("trace-1".getBytes())) + .setStartTime(Timestamp.newBuilder().setSeconds(instant.getEpochSecond()).build()) + .addTags( + JaegerSpanInternalModel.KeyValue.newBuilder() + .setKey("tenant-id") + .setVStr("tenant-1") + .build()) + .addTags( + JaegerSpanInternalModel.KeyValue.newBuilder() + .setKey("jaeger.servicename") + .setVStr(SERVICE_NAME) + .build()) .build(); - RawSpan span18 = - RawSpan.newBuilder() - .setTraceId(ByteBuffer.wrap("trace-4".getBytes())) - .setCustomerId(tenant2) - .setEvent(createEvent("event-18", tenant2)) + inputTopic.pipeInput(span); + td.advanceWallClockTime(Duration.ofSeconds(35)); + + KeyValue kv = outputTopic.readKeyValue(); + assertEquals("tenant-1", kv.key.getTenantId()); + assertEquals( + HexUtils.getHex(ByteString.copyFrom("trace-1".getBytes()).toByteArray()), + HexUtils.getHex(kv.key.getTraceId().array())); + + // outside threshold, expect no output + Instant instant1 = Instant.now().minus(25, ChronoUnit.HOURS); + JaegerSpanInternalModel.Span span2 = + JaegerSpanInternalModel.Span.newBuilder() + .setSpanId(ByteString.copyFrom("2".getBytes())) + .setTraceId(ByteString.copyFrom("trace-2".getBytes())) + .setStartTime(Timestamp.newBuilder().setSeconds(instant1.getEpochSecond()).build()) + .addTags( + JaegerSpanInternalModel.KeyValue.newBuilder() + .setKey("jaeger.servicename") + .setVStr(SERVICE_NAME) + .build()) .build(); - RawSpan span19 = - RawSpan.newBuilder() - .setTraceId(ByteBuffer.wrap("trace-4".getBytes())) - .setCustomerId(tenant2) - .setEvent(createEvent("event-19", tenant2)) + + inputTopic.pipeInput(span2); + td.advanceWallClockTime(Duration.ofSeconds(35)); + Assertions.assertTrue(outputTopic.isEmpty()); + } + + @Test + @SetEnvironmentVariable(key = "SERVICE_NAME", value = "raw-spans-grouper") + public void testTagsFilteringForJaegerSpans(@TempDir Path tempDir) { + File file = tempDir.resolve("state").toFile(); + + RawSpansGrouper underTest = new RawSpansGrouper(ConfigClientFactory.getClient()); + Config config = + ConfigFactory.parseURL( + getClass().getClassLoader().getResource("configs/raw-spans-grouper/application.conf")); + + Map baseProps = underTest.getBaseStreamsConfig(); + Map streamsProps = underTest.getStreamsConfig(config); + baseProps.forEach(streamsProps::put); + Map mergedProps = streamsProps; + + mergedProps.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class); + mergedProps.put(RawSpanGrouperConstants.RAW_SPANS_GROUPER_JOB_CONFIG, config); + mergedProps.put(StreamsConfig.STATE_DIR_CONFIG, file.getAbsolutePath()); + + StreamsBuilder streamsBuilder = + underTest.buildTopology(mergedProps, new StreamsBuilder(), new HashMap<>()); + + Properties props = new Properties(); + mergedProps.forEach(props::put); + + TopologyTestDriver td = new TopologyTestDriver(streamsBuilder.build(), props); + + TestInputTopic inputTopic = + td.createInputTopic( + config.getString( + org.hypertrace.core.spannormalizer.constants.SpanNormalizerConstants + .INPUT_TOPIC_CONFIG_KEY), + Serdes.ByteArray().serializer(), + new JaegerSpanSerde().serializer()); + + Serde defaultValueSerde = new StreamsConfig(mergedProps).defaultValueSerde(); + Serde traceIdentitySerde = new StreamsConfig(mergedProps).defaultKeySerde(); + + Serde spanIdentitySerde = new AvroSerde<>(); + spanIdentitySerde.configure(Map.of(), true); + + TestOutputTopic outputTopic = + td.createOutputTopic( + config.getString(RawSpanGrouperConstants.OUTPUT_TOPIC_CONFIG_KEY), + traceIdentitySerde.deserializer(), + defaultValueSerde.deserializer()); + + // makes sure that e2e works, so it tests basic scenario, rest of the + // scenarios are covered in unit test of tagFilter + // so configure for http extension attributes + Instant instant = Instant.now(); + JaegerSpanInternalModel.Span span = + JaegerSpanInternalModel.Span.newBuilder() + .setSpanId(ByteString.copyFrom("1".getBytes())) + .setTraceId(ByteString.copyFrom("trace-1".getBytes())) + .setStartTime(Timestamp.newBuilder().setSeconds(instant.getEpochSecond()).build()) + .addTags( + JaegerSpanInternalModel.KeyValue.newBuilder() + .setKey("tenant-id") + .setVStr("tenant-1") + .build()) + .addTags( + JaegerSpanInternalModel.KeyValue.newBuilder() + .setKey("jaeger.servicename") + .setVStr(SERVICE_NAME) + .build()) + .addTags( + JaegerSpanInternalModel.KeyValue.newBuilder() + .setKey("http.request.header.x-allowed-1") + .setVStr(SERVICE_NAME) + .build()) + .addTags( + JaegerSpanInternalModel.KeyValue.newBuilder() + .setKey("http.response.header.x-allowed-2") + .setVStr(SERVICE_NAME) + .build()) + .addTags( + JaegerSpanInternalModel.KeyValue.newBuilder() + .setKey("http.request.header.x-not-allowed-1") + .setVStr(SERVICE_NAME) + .build()) + .addTags( + JaegerSpanInternalModel.KeyValue.newBuilder() + .setKey("http.response.header.x-not-allowed-2") + .setVStr(SERVICE_NAME) + .build()) .build(); - inputTopic.pipeInput(createTraceIdentity(tenantId, "trace-1"), span1); - inputTopic.pipeInput(createTraceIdentity(tenantId, "trace-2"), span4); + inputTopic.pipeInput(span); + td.advanceWallClockTime(Duration.ofSeconds(35)); + + KeyValue kv = outputTopic.readKeyValue(); + assertEquals("tenant-1", kv.key.getTenantId()); + assertEquals( + HexUtils.getHex(ByteString.copyFrom("trace-1".getBytes()).toByteArray()), + HexUtils.getHex(kv.key.getTraceId().array())); + StructuredTrace value = kv.value; + + assertEquals( + HexUtils.getHex("1".getBytes()), HexUtils.getHex(value.getEventList().get(0).getEventId())); + assertEquals(SERVICE_NAME, value.getEventList().get(0).getServiceName()); + + // test of attributes + Assertions.assertEquals( + 3, value.getEventList().get(0).getAttributes().getAttributeMap().size()); + + Assertions.assertTrue( + value + .getEventList() + .get(0) + .getAttributes() + .getAttributeMap() + .containsKey("http.request.header.x-allowed-1")); + Assertions.assertTrue( + value + .getEventList() + .get(0) + .getAttributes() + .getAttributeMap() + .containsKey("http.response.header.x-allowed-2")); + + Assertions.assertFalse( + value + .getEventList() + .get(0) + .getAttributes() + .getAttributeMap() + .containsKey("http.request.header.x-not-allowed-1")); + Assertions.assertFalse( + value + .getEventList() + .get(0) + .getAttributes() + .getAttributeMap() + .containsKey("http.response.header.x-not-allowed-2")); + } + + @Test + @SetEnvironmentVariable(key = "SERVICE_NAME", value = "raw-spans-grouper") + public void whenRawSpansAreReceivedWithInactivityExpectTraceToBeOutput(@TempDir Path tempDir) { + File file = tempDir.resolve("state").toFile(); + + RawSpansGrouper underTest = new RawSpansGrouper(ConfigClientFactory.getClient()); + Config config = + ConfigFactory.parseURL( + getClass().getClassLoader().getResource("configs/raw-spans-grouper/application.conf")); + + Map baseProps = underTest.getBaseStreamsConfig(); + Map streamsProps = underTest.getStreamsConfig(config); + baseProps.forEach(streamsProps::put); + Map mergedProps = streamsProps; + + mergedProps.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class); + mergedProps.put(RawSpanGrouperConstants.RAW_SPANS_GROUPER_JOB_CONFIG, config); + mergedProps.put(StreamsConfig.STATE_DIR_CONFIG, file.getAbsolutePath()); + + StreamsBuilder streamsBuilder = + underTest.buildTopology(mergedProps, new StreamsBuilder(), new HashMap<>()); + + Properties props = new Properties(); + mergedProps.forEach(props::put); + + Serde defaultValueSerde = new StreamsConfig(mergedProps).defaultValueSerde(); + + Serde traceIdentitySerde = new StreamsConfig(mergedProps).defaultKeySerde(); + + TopologyTestDriver td = new TopologyTestDriver(streamsBuilder.build(), props); + + TestInputTopic inputTopic = + td.createInputTopic( + config.getString( + org.hypertrace.core.spannormalizer.constants.SpanNormalizerConstants + .INPUT_TOPIC_CONFIG_KEY), + Serdes.ByteArray().serializer(), + new JaegerSpanSerde().serializer()); + + TestOutputTopic outputTopic = + td.createOutputTopic( + config.getString(RawSpanGrouperConstants.OUTPUT_TOPIC_CONFIG_KEY), + traceIdentitySerde.deserializer(), + defaultValueSerde.deserializer()); + + String tenantId = "tenant1"; + + // create spans for trace-1 of tenant1 + JaegerSpanInternalModel.Span span1 = createSpan("tenant1", "trace-1", "event-1"); + JaegerSpanInternalModel.Span span2 = createSpan("tenant1", "trace-1", "event-2"); + JaegerSpanInternalModel.Span span3 = createSpan("tenant1", "trace-1", "event-3"); + + // create spans for trace-2 of tenant1 + JaegerSpanInternalModel.Span span4 = createSpan("tenant1", "trace-2", "event-4"); + JaegerSpanInternalModel.Span span5 = createSpan("tenant1", "trace-2", "event-5"); + + // create spans for trace-3 of tenant1 + JaegerSpanInternalModel.Span span6 = createSpan("tenant1", "trace-3", "event-6"); + JaegerSpanInternalModel.Span span7 = createSpan("tenant1", "trace-3", "event-7"); + JaegerSpanInternalModel.Span span8 = createSpan("tenant1", "trace-3", "event-8"); + JaegerSpanInternalModel.Span span9 = createSpan("tenant1", "trace-3", "event-9"); + JaegerSpanInternalModel.Span span10 = createSpan("tenant1", "trace-3", "event-10"); + JaegerSpanInternalModel.Span span11 = createSpan("tenant1", "trace-3", "event-11"); + + // create 8 spans for tenant-2 for trace-4 + JaegerSpanInternalModel.Span span12 = createSpan("tenant2", "trace-4", "event-12"); + JaegerSpanInternalModel.Span span13 = createSpan("tenant2", "trace-4", "event-13"); + JaegerSpanInternalModel.Span span14 = createSpan("tenant2", "trace-4", "event-14"); + JaegerSpanInternalModel.Span span15 = createSpan("tenant2", "trace-4", "event-15"); + JaegerSpanInternalModel.Span span16 = createSpan("tenant2", "trace-4", "event-16"); + JaegerSpanInternalModel.Span span17 = createSpan("tenant2", "trace-4", "event-17"); + JaegerSpanInternalModel.Span span18 = createSpan("tenant2", "trace-4", "event-18"); + JaegerSpanInternalModel.Span span19 = createSpan("tenant2", "trace-4", "event-19"); + + inputTopic.pipeInput(tenantId.getBytes(), span1); + inputTopic.pipeInput(tenantId.getBytes(), span4); td.advanceWallClockTime(Duration.ofSeconds(1)); - inputTopic.pipeInput(createTraceIdentity(tenantId, "trace-1"), span2); + inputTopic.pipeInput(tenantId.getBytes(), span2); // select a value < 30s (groupingWindowTimeoutInMs) // this shouldn't trigger a punctuate call @@ -228,9 +719,10 @@ public void whenRawSpansAreReceivedWithInactivityExpectTraceToBeOutput(@TempDir assertEquals(1, trace.getEventList().size()); assertEquals("event-4", new String(trace.getEventList().get(0).getEventId().array())); - inputTopic.pipeInput(createTraceIdentity(tenantId, "trace-1"), span3); + inputTopic.pipeInput(tenantId.getBytes(), span3); td.advanceWallClockTime(Duration.ofSeconds(45)); - inputTopic.pipeInput(createTraceIdentity(tenantId, "trace-2"), span5); + inputTopic.pipeInput(tenantId.getBytes(), span5); + // the next advance should trigger a punctuate call and emit a trace with 2 spans td.advanceWallClockTime(Duration.ofSeconds(35)); @@ -244,12 +736,13 @@ public void whenRawSpansAreReceivedWithInactivityExpectTraceToBeOutput(@TempDir assertEquals(1, trace.getEventList().size()); assertEquals("event-5", new String(trace.getEventList().get(0).getEventId().array())); - inputTopic.pipeInput(createTraceIdentity(tenantId, "trace-3"), span6); - inputTopic.pipeInput(createTraceIdentity(tenantId, "trace-3"), span7); - inputTopic.pipeInput(createTraceIdentity(tenantId, "trace-3"), span8); - inputTopic.pipeInput(createTraceIdentity(tenantId, "trace-3"), span9); - inputTopic.pipeInput(createTraceIdentity(tenantId, "trace-3"), span10); - inputTopic.pipeInput(createTraceIdentity(tenantId, "trace-3"), span11); + inputTopic.pipeInput(tenantId.getBytes(), span6); + inputTopic.pipeInput(tenantId.getBytes(), span7); + inputTopic.pipeInput(tenantId.getBytes(), span8); + inputTopic.pipeInput(tenantId.getBytes(), span9); + inputTopic.pipeInput(tenantId.getBytes(), span10); + inputTopic.pipeInput(tenantId.getBytes(), span11); + td.advanceWallClockTime(Duration.ofSeconds(35)); // trace should be truncated with 5 spans @@ -258,22 +751,63 @@ public void whenRawSpansAreReceivedWithInactivityExpectTraceToBeOutput(@TempDir // input 8 spans of trace-4 for tenant2, as there is global upper limit apply, it will emit only // 6 - inputTopic.pipeInput(createTraceIdentity(tenant2, "trace-4"), span12); - inputTopic.pipeInput(createTraceIdentity(tenant2, "trace-4"), span13); - inputTopic.pipeInput(createTraceIdentity(tenant2, "trace-4"), span14); - inputTopic.pipeInput(createTraceIdentity(tenant2, "trace-4"), span15); - inputTopic.pipeInput(createTraceIdentity(tenant2, "trace-4"), span16); - inputTopic.pipeInput(createTraceIdentity(tenant2, "trace-4"), span17); - inputTopic.pipeInput(createTraceIdentity(tenant2, "trace-4"), span18); - inputTopic.pipeInput(createTraceIdentity(tenant2, "trace-4"), span19); + + String tenantId2 = "tenant2"; + inputTopic.pipeInput(tenantId2.getBytes(), span12); + inputTopic.pipeInput(tenantId2.getBytes(), span13); + inputTopic.pipeInput(tenantId2.getBytes(), span14); + inputTopic.pipeInput(tenantId2.getBytes(), span15); + inputTopic.pipeInput(tenantId2.getBytes(), span16); + inputTopic.pipeInput(tenantId2.getBytes(), span17); + inputTopic.pipeInput(tenantId2.getBytes(), span18); + inputTopic.pipeInput(tenantId2.getBytes(), span19); + td.advanceWallClockTime(Duration.ofSeconds(35)); TestRecord testRecord = outputTopic.readRecord(); - assertEquals(tenant2, testRecord.getKey().getTenantId()); + assertEquals(tenantId2, testRecord.getKey().getTenantId()); assertEquals(6, testRecord.getValue().getEventList().size()); } + private JaegerSpanInternalModel.Span createSpan(String tenantId, String traceId, String eventId) { + return JaegerSpanInternalModel.Span.newBuilder() + .setSpanId(ByteString.copyFrom(eventId.getBytes())) + .setTraceId(ByteString.copyFrom(traceId.getBytes())) + .addTags( + JaegerSpanInternalModel.KeyValue.newBuilder() + .setKey("jaeger.servicename") + .setVStr(SERVICE_NAME) + .build()) + .addTags( + JaegerSpanInternalModel.KeyValue.newBuilder() + .setKey("tenant-id") + .setVStr(tenantId) + .build()) + .addLogs( + JaegerSpanInternalModel.Log.newBuilder() + .setTimestamp(Timestamp.newBuilder().setSeconds(5).build()) + .addFields( + JaegerSpanInternalModel.KeyValue.newBuilder() + .setKey("e1") + .setVStr("some event detail") + .build()) + .addFields( + JaegerSpanInternalModel.KeyValue.newBuilder() + .setKey("e2") + .setVStr("some event detail") + .build())) + .addLogs( + JaegerSpanInternalModel.Log.newBuilder() + .setTimestamp(Timestamp.newBuilder().setSeconds(10).build()) + .addFields( + JaegerSpanInternalModel.KeyValue.newBuilder() + .setKey("z2") + .setVStr("some event detail") + .build())) + .build(); + } + private Event createEvent(String eventId, String tenantId) { return Event.newBuilder() .setCustomerId(tenantId) diff --git a/span-normalizer/span-normalizer/src/test/java/org/hypertrace/core/spannormalizer/fieldgenerators/FieldsGeneratorTest.java b/raw-spans-grouper/raw-spans-grouper/src/test/java/org/hypertrace/core/spannormalizer/fieldgenerators/FieldsGeneratorTest.java similarity index 100% rename from span-normalizer/span-normalizer/src/test/java/org/hypertrace/core/spannormalizer/fieldgenerators/FieldsGeneratorTest.java rename to raw-spans-grouper/raw-spans-grouper/src/test/java/org/hypertrace/core/spannormalizer/fieldgenerators/FieldsGeneratorTest.java diff --git a/span-normalizer/span-normalizer/src/test/java/org/hypertrace/core/spannormalizer/fieldgenerators/FirstMatchingKeyFinderTest.java b/raw-spans-grouper/raw-spans-grouper/src/test/java/org/hypertrace/core/spannormalizer/fieldgenerators/FirstMatchingKeyFinderTest.java similarity index 100% rename from span-normalizer/span-normalizer/src/test/java/org/hypertrace/core/spannormalizer/fieldgenerators/FirstMatchingKeyFinderTest.java rename to raw-spans-grouper/raw-spans-grouper/src/test/java/org/hypertrace/core/spannormalizer/fieldgenerators/FirstMatchingKeyFinderTest.java diff --git a/span-normalizer/span-normalizer/src/test/java/org/hypertrace/core/spannormalizer/fieldgenerators/GrpcFieldsGeneratorTest.java b/raw-spans-grouper/raw-spans-grouper/src/test/java/org/hypertrace/core/spannormalizer/fieldgenerators/GrpcFieldsGeneratorTest.java similarity index 100% rename from span-normalizer/span-normalizer/src/test/java/org/hypertrace/core/spannormalizer/fieldgenerators/GrpcFieldsGeneratorTest.java rename to raw-spans-grouper/raw-spans-grouper/src/test/java/org/hypertrace/core/spannormalizer/fieldgenerators/GrpcFieldsGeneratorTest.java diff --git a/span-normalizer/span-normalizer/src/test/java/org/hypertrace/core/spannormalizer/fieldgenerators/HttpFieldsGeneratorTest.java b/raw-spans-grouper/raw-spans-grouper/src/test/java/org/hypertrace/core/spannormalizer/fieldgenerators/HttpFieldsGeneratorTest.java similarity index 100% rename from span-normalizer/span-normalizer/src/test/java/org/hypertrace/core/spannormalizer/fieldgenerators/HttpFieldsGeneratorTest.java rename to raw-spans-grouper/raw-spans-grouper/src/test/java/org/hypertrace/core/spannormalizer/fieldgenerators/HttpFieldsGeneratorTest.java diff --git a/span-normalizer/span-normalizer/src/test/java/org/hypertrace/core/spannormalizer/fieldgenerators/RpcFieldsGeneratorTest.java b/raw-spans-grouper/raw-spans-grouper/src/test/java/org/hypertrace/core/spannormalizer/fieldgenerators/RpcFieldsGeneratorTest.java similarity index 100% rename from span-normalizer/span-normalizer/src/test/java/org/hypertrace/core/spannormalizer/fieldgenerators/RpcFieldsGeneratorTest.java rename to raw-spans-grouper/raw-spans-grouper/src/test/java/org/hypertrace/core/spannormalizer/fieldgenerators/RpcFieldsGeneratorTest.java diff --git a/span-normalizer/span-normalizer/src/test/java/org/hypertrace/core/spannormalizer/fieldgenerators/SqlFieldsGeneratorTest.java b/raw-spans-grouper/raw-spans-grouper/src/test/java/org/hypertrace/core/spannormalizer/fieldgenerators/SqlFieldsGeneratorTest.java similarity index 100% rename from span-normalizer/span-normalizer/src/test/java/org/hypertrace/core/spannormalizer/fieldgenerators/SqlFieldsGeneratorTest.java rename to raw-spans-grouper/raw-spans-grouper/src/test/java/org/hypertrace/core/spannormalizer/fieldgenerators/SqlFieldsGeneratorTest.java diff --git a/span-normalizer/span-normalizer/src/test/java/org/hypertrace/core/spannormalizer/fieldgenerators/ValueConverterTest.java b/raw-spans-grouper/raw-spans-grouper/src/test/java/org/hypertrace/core/spannormalizer/fieldgenerators/ValueConverterTest.java similarity index 100% rename from span-normalizer/span-normalizer/src/test/java/org/hypertrace/core/spannormalizer/fieldgenerators/ValueConverterTest.java rename to raw-spans-grouper/raw-spans-grouper/src/test/java/org/hypertrace/core/spannormalizer/fieldgenerators/ValueConverterTest.java diff --git a/span-normalizer/span-normalizer/src/test/java/org/hypertrace/core/spannormalizer/jaeger/JaegerResourceNormalizerTest.java b/raw-spans-grouper/raw-spans-grouper/src/test/java/org/hypertrace/core/spannormalizer/jaeger/JaegerResourceNormalizerTest.java similarity index 100% rename from span-normalizer/span-normalizer/src/test/java/org/hypertrace/core/spannormalizer/jaeger/JaegerResourceNormalizerTest.java rename to raw-spans-grouper/raw-spans-grouper/src/test/java/org/hypertrace/core/spannormalizer/jaeger/JaegerResourceNormalizerTest.java diff --git a/span-normalizer/span-normalizer/src/test/java/org/hypertrace/core/spannormalizer/jaeger/JaegerSpanNormalizerTest.java b/raw-spans-grouper/raw-spans-grouper/src/test/java/org/hypertrace/core/spannormalizer/jaeger/JaegerSpanNormalizerTest.java similarity index 96% rename from span-normalizer/span-normalizer/src/test/java/org/hypertrace/core/spannormalizer/jaeger/JaegerSpanNormalizerTest.java rename to raw-spans-grouper/raw-spans-grouper/src/test/java/org/hypertrace/core/spannormalizer/jaeger/JaegerSpanNormalizerTest.java index 4a4bb79d4..45eafda1f 100644 --- a/span-normalizer/span-normalizer/src/test/java/org/hypertrace/core/spannormalizer/jaeger/JaegerSpanNormalizerTest.java +++ b/raw-spans-grouper/raw-spans-grouper/src/test/java/org/hypertrace/core/spannormalizer/jaeger/JaegerSpanNormalizerTest.java @@ -18,11 +18,11 @@ import java.util.Random; import org.hypertrace.core.datamodel.AttributeValue; import org.hypertrace.core.datamodel.RawSpan; +import org.hypertrace.core.rawspansgrouper.RawSpansGrouper; import org.hypertrace.core.serviceframework.config.ConfigClientFactory; import org.hypertrace.core.serviceframework.metrics.PlatformMetricsRegistry; import org.hypertrace.core.span.constants.RawSpanConstants; import org.hypertrace.core.span.constants.v1.JaegerAttribute; -import org.hypertrace.core.spannormalizer.SpanNormalizer; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; @@ -52,10 +52,10 @@ public void resetSingleton() } @Test - @SetEnvironmentVariable(key = "SERVICE_NAME", value = "span-normalizer") + @SetEnvironmentVariable(key = "SERVICE_NAME", value = "raw-spans-grouper") public void defaultConfigParseTest() { try { - new SpanNormalizer(ConfigClientFactory.getClient()); + new RawSpansGrouper(ConfigClientFactory.getClient()); } catch (Exception e) { // We don't expect any exceptions in parsing the configuration. e.printStackTrace(); @@ -82,10 +82,10 @@ private Map getCommonConfig() { } @Test - @SetEnvironmentVariable(key = "SERVICE_NAME", value = "span-normalizer") + @SetEnvironmentVariable(key = "SERVICE_NAME", value = "raw-spans-grouper") public void testTenantIdKeyConfiguration() { try { - new SpanNormalizer(ConfigClientFactory.getClient()); + new RawSpansGrouper(ConfigClientFactory.getClient()); } catch (Exception e) { // We don't expect any exceptions in parsing the configuration. e.printStackTrace(); diff --git a/span-normalizer/span-normalizer/src/test/java/org/hypertrace/core/spannormalizer/jaeger/JaegerSpanPreProcessorTest.java b/raw-spans-grouper/raw-spans-grouper/src/test/java/org/hypertrace/core/spannormalizer/jaeger/JaegerSpanPreProcessorTest.java similarity index 100% rename from span-normalizer/span-normalizer/src/test/java/org/hypertrace/core/spannormalizer/jaeger/JaegerSpanPreProcessorTest.java rename to raw-spans-grouper/raw-spans-grouper/src/test/java/org/hypertrace/core/spannormalizer/jaeger/JaegerSpanPreProcessorTest.java diff --git a/span-normalizer/span-normalizer/src/test/java/org/hypertrace/core/spannormalizer/jaeger/JaegerSpanToLogRecordsTransformerTest.java b/raw-spans-grouper/raw-spans-grouper/src/test/java/org/hypertrace/core/spannormalizer/jaeger/JaegerSpanToLogRecordsTransformerTest.java similarity index 97% rename from span-normalizer/span-normalizer/src/test/java/org/hypertrace/core/spannormalizer/jaeger/JaegerSpanToLogRecordsTransformerTest.java rename to raw-spans-grouper/raw-spans-grouper/src/test/java/org/hypertrace/core/spannormalizer/jaeger/JaegerSpanToLogRecordsTransformerTest.java index cadf0059d..56bb6b4c4 100644 --- a/span-normalizer/span-normalizer/src/test/java/org/hypertrace/core/spannormalizer/jaeger/JaegerSpanToLogRecordsTransformerTest.java +++ b/raw-spans-grouper/raw-spans-grouper/src/test/java/org/hypertrace/core/spannormalizer/jaeger/JaegerSpanToLogRecordsTransformerTest.java @@ -42,7 +42,7 @@ void testDropLogEventRecords() { ProcessorContext processorContext = Mockito.mock(ProcessorContext.class); Mockito.when(processorContext.appConfigs()) - .thenReturn(Map.of("span-normalizer-job-config", ConfigFactory.parseMap(configs))); + .thenReturn(Map.of("raw-spans-grouper-job-config", ConfigFactory.parseMap(configs))); JaegerSpanToLogRecordsTransformer jaegerSpanToLogRecordsTransformer = new JaegerSpanToLogRecordsTransformer(); jaegerSpanToLogRecordsTransformer.init(processorContext); diff --git a/span-normalizer/span-normalizer/src/test/java/org/hypertrace/core/spannormalizer/rawspan/ByPassPredicateTest.java b/raw-spans-grouper/raw-spans-grouper/src/test/java/org/hypertrace/core/spannormalizer/rawspan/ByPassPredicateTest.java similarity index 100% rename from span-normalizer/span-normalizer/src/test/java/org/hypertrace/core/spannormalizer/rawspan/ByPassPredicateTest.java rename to raw-spans-grouper/raw-spans-grouper/src/test/java/org/hypertrace/core/spannormalizer/rawspan/ByPassPredicateTest.java diff --git a/span-normalizer/span-normalizer/src/test/java/org/hypertrace/core/spannormalizer/util/JaegerHTTagsConverterTest.java b/raw-spans-grouper/raw-spans-grouper/src/test/java/org/hypertrace/core/spannormalizer/util/JaegerHTTagsConverterTest.java similarity index 100% rename from span-normalizer/span-normalizer/src/test/java/org/hypertrace/core/spannormalizer/util/JaegerHTTagsConverterTest.java rename to raw-spans-grouper/raw-spans-grouper/src/test/java/org/hypertrace/core/spannormalizer/util/JaegerHTTagsConverterTest.java diff --git a/span-normalizer/span-normalizer/src/test/java/org/hypertrace/core/spannormalizer/utils/TestUtils.java b/raw-spans-grouper/raw-spans-grouper/src/test/java/org/hypertrace/core/spannormalizer/utils/TestUtils.java similarity index 100% rename from span-normalizer/span-normalizer/src/test/java/org/hypertrace/core/spannormalizer/utils/TestUtils.java rename to raw-spans-grouper/raw-spans-grouper/src/test/java/org/hypertrace/core/spannormalizer/utils/TestUtils.java diff --git a/raw-spans-grouper/raw-spans-grouper/src/test/resources/configs/raw-spans-grouper/application.conf b/raw-spans-grouper/raw-spans-grouper/src/test/resources/configs/raw-spans-grouper/application.conf index 5284b3714..d8e8479c9 100644 --- a/raw-spans-grouper/raw-spans-grouper/src/test/resources/configs/raw-spans-grouper/application.conf +++ b/raw-spans-grouper/raw-spans-grouper/src/test/resources/configs/raw-spans-grouper/application.conf @@ -4,8 +4,11 @@ service.admin.port = 8051 main.class = org.hypertrace.core.rawspansgrouper.RawSpansGrouper span.type = rawSpan -input.topic = "raw-spans-from-jaeger-spans" +input.topic = "jaeger-spans" output.topic = "structured-traces-from-raw-spans" +bypass.output.topic = "structured-traces-from-raw-spans-bypass" +raw.logs.output.topic = "raw-logs" + precreate.topics = false kafka.streams.config = { @@ -29,6 +32,23 @@ group.partitioner = { service.port = 50104 } +processor { + #defaultTenantId = ${?DEFAULT_TENANT_ID} + late.arrival.threshold.duration = 365d + + # Configuration for dropping certain attributes that are captured by agent, but doesn't require in + # the processing pipeline. + # + # allowed.attributes.prefixes : the list of prefixes that should match for which allowed keys + # prefixed.matched.allowed.attributes : allowed keys from the subset of keys where prefix matched + # + # If either of config is empty allowed.attributes.prefixes or prefixed.matched.allowed.attributes, + # it will not drop any attributes. + # The above configuration doesn't impact if the key doesn't start with prefix. + allowed.attributes.prefixes = [] + prefixed.matched.allowed.attributes = [] +} + span.window.store.retention.time.mins = 60 span.window.store.retention.time.mins = ${?SPAN_WINDOW_STORE_RETENTION_TIME_MINS} span.window.store.segment.size.mins = 20 @@ -44,3 +64,78 @@ span.groupby.session.window.interval = 5 span.groupby.session.window.graceperiod.ms = 100 dataflow.metriccollection.sampling.percent = 10.0 + + +processor { + tenantIdTagKey = "tenant-id" + #defaultTenantId = "__default" +} + +processor { + spanDropFilters = [ + [ + { + "tagKey": "http.method", + "operator": "EQ", + "tagValue": "GET" + }, + { + "tagKey": "http.url", + "operator": "CONTAINS", + "tagValue": "health" + } + ], + [ + { + "tagKey": "grpc.url", + "operator": "NEQ", + "tagValue": "Sent.TestServiceGetEchos" + } + ], + [ + { + "tagKey": "ht.operation.name", + "operator": "EQ", + "tagValue": "/api/" + }, + { + "tagKey": "span.kind", + "operator": "NOT_EXISTS", + "tagValue": "" + } + ] + ] +} + +processor { + bypass.key = "test.bypass" + late.arrival.threshold.duration = "1d" + + # Configuration for dropping certain attributes that are captured by agent, but doesn't require in + # the processing pipeline. + # + # allowed.attributes.prefixes : the list of prefixes that should match for which allowed keys + # prefixed.matched.allowed.attributes : allowed keys from the subset of keys where prefix matched + # + # If either of config is empty allowed.attributes.prefixes or prefixed.matched.allowed.attributes, + # it will not drop any attributes. + # The above configuration doesn't impact if the key doesn't start with prefix. + allowed.attributes.prefixes = ["http.request.header.x-", "http.response.header.x-"] + prefixed.matched.allowed.attributes = ["http.request.header.x-allowed-1", "http.response.header.x-allowed-2"] +} + +clients = { + config.service.config = { + host = localhost + port = 50101 + } +} + +span.rules.exclude { + cache = { + refreshAfterWriteDuration = 3m + expireAfterWriteDuration = 5m + } +} + +rate.limit.config = [] diff --git a/span-normalizer/helm/.helmignore b/span-normalizer/helm/.helmignore deleted file mode 100644 index fbe01f88f..000000000 --- a/span-normalizer/helm/.helmignore +++ /dev/null @@ -1,22 +0,0 @@ -# Patterns to ignore when building packages. -# This supports shell glob matching, relative path matching, and -# negation (prefixed with !). Only one pattern per line. -.DS_Store -# Common VCS dirs -.git/ -.gitignore -.bzr/ -.bzrignore -.hg/ -.hgignore -.svn/ -# Common backup files -*.swp -*.bak -*.tmp -*~ -# Various IDEs -.project -.idea/ -*.tmproj -.vscode/ \ No newline at end of file diff --git a/span-normalizer/helm/Chart.yaml b/span-normalizer/helm/Chart.yaml deleted file mode 100644 index 959435a2d..000000000 --- a/span-normalizer/helm/Chart.yaml +++ /dev/null @@ -1,16 +0,0 @@ -apiVersion: v2 -name: span-normalizer -description: Normalize Spans into HypertraceCore specific format. - -type: application - -# This is the chart version. This version number should be incremented each time you make changes -# to the chart and its templates, including the app version. The "helm package" command will take care of setting this. -# A new chart will be created for each new version of the service. -version: 0.1.0 - -dependencies: - - name: kafka-topic-creator - repository: https://storage.googleapis.com/hypertrace-helm-charts - version: 0.2.x - condition: kafka-topic-creator.enabled diff --git a/span-normalizer/helm/templates/_helpers.tpl b/span-normalizer/helm/templates/_helpers.tpl deleted file mode 100644 index 06c96b1cd..000000000 --- a/span-normalizer/helm/templates/_helpers.tpl +++ /dev/null @@ -1,7 +0,0 @@ -{{- define "spannormalizerservice.image" -}} - {{- if and .Values.image.tagOverride -}} - {{- printf "%s:%s" .Values.image.repository .Values.image.tagOverride }} - {{- else -}} - {{- printf "%s:%s" .Values.image.repository .Chart.AppVersion }} - {{- end -}} -{{- end -}} \ No newline at end of file diff --git a/span-normalizer/helm/templates/deployment.yaml b/span-normalizer/helm/templates/deployment.yaml deleted file mode 100644 index 4e6acaacd..000000000 --- a/span-normalizer/helm/templates/deployment.yaml +++ /dev/null @@ -1,96 +0,0 @@ -apiVersion: apps/v1 -kind: Deployment -metadata: - name: {{ .Chart.Name }} - labels: - release: {{ .Release.Name }} - {{- with .Values.deploymentLabels }} - {{- toYaml . | nindent 4 }} - {{- end }} -spec: - replicas: {{ .Values.replicaCount }} - strategy: - type: RollingUpdate - rollingUpdate: - maxUnavailable: {{ .Values.maxUnavailable }} - selector: - matchLabels: - {{- toYaml .Values.deploymentSelectorMatchLabels | nindent 6 }} - template: - metadata: - labels: - release: {{ .Release.Name }} - {{- with .Values.podLabels }} - {{- toYaml . | nindent 8 }} - {{- end }} - annotations: - checksum/config: {{ include (print $.Template.BasePath "/span-normalizer-config.yaml") . | sha256sum }} - prometheus.io/scrape: "true" - prometheus.io/port: {{ .Values.containerAdminPort | quote }} - {{- with .Values.podAnnotations }} - {{- toYaml . | nindent 8 }} - {{- end }} - spec: - {{- with .Values.imagePullSecrets }} - imagePullSecrets: - {{- toYaml . | nindent 8 }} - {{- end }} - volumes: - - name: service-config - configMap: - name: {{ .Values.spanNormalizerConfig.name }} - - name: log4j-config - configMap: - name: {{ .Values.logConfig.name }} - {{- with .Values.nodeLabels }} - nodeSelector: - {{- toYaml . | nindent 8 }} - {{- end }} - {{- with .Values.affinity }} - affinity: - {{- toYaml . | nindent 8 }} - {{- end }} - {{- with .Values.securityContext }} - securityContext: - {{- toYaml . | nindent 8 }} - {{- end }} - {{- if .Values.hostNetwork }} - hostNetwork: true - dnsPolicy: ClusterFirstWithHostNet - {{- end }} - containers: - - name: {{ .Chart.Name }} - image: "{{ include "spannormalizerservice.image" . }}" - imagePullPolicy: {{ .Values.image.pullPolicy }} - ports: - - name: admin-port - containerPort: {{ .Values.containerAdminPort }} - protocol: TCP - env: - - name: SERVICE_NAME - value: "{{ .Chart.Name }}" - - name: BOOTSTRAP_CONFIG_URI - value: "file:///app/resources/configs" - - name: LOG4J_CONFIGURATION_FILE - value: "/var/{{ .Chart.Name }}/log/log4j2.properties" - - name: JAVA_TOOL_OPTIONS - value: {{ .Values.javaOpts | quote }} - volumeMounts: - - name: service-config - mountPath: /app/resources/configs/{{ .Chart.Name }}/application.conf - subPath: application.conf - - name: log4j-config - mountPath: /var/{{ .Chart.Name }}/log - livenessProbe: - initialDelaySeconds: {{ int .Values.livenessProbe.initialDelaySeconds }} - periodSeconds: {{ int .Values.livenessProbe.periodSeconds }} - tcpSocket: - port: admin-port - readinessProbe: - initialDelaySeconds: {{ int .Values.readinessProbe.initialDelaySeconds }} - periodSeconds: {{ int .Values.readinessProbe.periodSeconds }} - httpGet: - path: /health - port: {{ .Values.containerAdminPort }} - resources: - {{- toYaml .Values.resources | nindent 12 }} diff --git a/span-normalizer/helm/templates/hpa.yaml b/span-normalizer/helm/templates/hpa.yaml deleted file mode 100644 index 748933142..000000000 --- a/span-normalizer/helm/templates/hpa.yaml +++ /dev/null @@ -1,19 +0,0 @@ -{{- if .Values.hpa.enabled }} -apiVersion: autoscaling/v1 -kind: HorizontalPodAutoscaler -metadata: - name: {{ .Chart.Name }} - labels: - release: {{ .Release.Name }} - {{- with .Values.podLabels }} - {{- toYaml . | nindent 4 }} - {{- end }} -spec: - minReplicas: {{ int .Values.hpa.minReplicas }} - maxReplicas: {{ int .Values.hpa.maxReplicas }} - scaleTargetRef: - apiVersion: apps/v1 - kind: Deployment - name: {{ .Chart.Name }} - targetCPUUtilizationPercentage: {{ int .Values.hpa.targetCPUUtilizationPercentage }} -{{- end }} diff --git a/span-normalizer/helm/templates/logconfig.yaml b/span-normalizer/helm/templates/logconfig.yaml deleted file mode 100644 index 1a42795e7..000000000 --- a/span-normalizer/helm/templates/logconfig.yaml +++ /dev/null @@ -1,41 +0,0 @@ -apiVersion: v1 -kind: ConfigMap -metadata: - name: {{ .Values.logConfig.name }} - labels: - release: {{ .Release.Name }} -data: - log4j2.properties: |- - status = error - name = PropertiesConfig - {{- if .Values.logConfig.monitorInterval}} - monitorInterval = {{ .Values.logConfig.monitorInterval }} - {{- end }} - - appender.console.type = Console - appender.console.name = STDOUT - appender.console.layout.type = PatternLayout - appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %c{1.} - %msg%n - - {{- if .Values.logConfig.appender.rolling.enabled }} - appender.rolling.type = RollingFile - appender.rolling.name = ROLLING_FILE - appender.rolling.fileName = ${env:SERVICE_NAME:-service}.log - appender.rolling.filePattern = ${env:SERVICE_NAME:-service}-%d{MM-dd-yy-HH-mm-ss}-%i.log.gz - appender.rolling.layout.type = PatternLayout - appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %c{1.} - %msg%n - appender.rolling.policies.type = Policies - appender.rolling.policies.time.type = TimeBasedTriggeringPolicy - appender.rolling.policies.time.interval = 3600 - appender.rolling.policies.time.modulate = true - appender.rolling.policies.size.type = SizeBasedTriggeringPolicy - appender.rolling.policies.size.size = 20MB - appender.rolling.strategy.type = DefaultRolloverStrategy - appender.rolling.strategy.max = 5 - {{- end }} - - rootLogger.level = {{ .Values.logConfig.rootLogger.level }} - rootLogger.appenderRef.stdout.ref = STDOUT - {{- if .Values.logConfig.appender.rolling.enabled }} - rootLogger.appenderRef.rolling.ref = ROLLING_FILE - {{- end }} diff --git a/span-normalizer/helm/templates/span-normalizer-config.yaml b/span-normalizer/helm/templates/span-normalizer-config.yaml deleted file mode 100644 index 2032b4e5c..000000000 --- a/span-normalizer/helm/templates/span-normalizer-config.yaml +++ /dev/null @@ -1,125 +0,0 @@ -apiVersion: v1 -kind: ConfigMap -metadata: - name: {{ .Values.spanNormalizerConfig.name }} - labels: - release: {{ .Release.Name }} -data: - application.conf: |- - kafka.streams.config { - application.id = jaeger-spans-to-raw-spans-job - bootstrap.servers = "{{ .Values.spanNormalizerConfig.kafkaStreamsConfig.bootstrapServers }}" - schema.registry.url = "{{ .Values.spanNormalizerConfig.kafkaStreamsConfig.schemaRegistryUrl }}" - # kafka streams config - num.stream.threads = "{{ int .Values.spanNormalizerConfig.kafkaStreamsConfig.numStreamThreads }}" - commit.interval.ms = "{{ int .Values.spanNormalizerConfig.kafkaStreamsConfig.commitIntervalMs }}" - # Common client (prodcuer, consumer, admin) configs - receive.buffer.bytes = "{{ int .Values.spanNormalizerConfig.kafkaStreamsConfig.receiveBufferBytes }}" - send.buffer.bytes = "{{ int .Values.spanNormalizerConfig.kafkaStreamsConfig.sendBufferBytes }}" - # Producer configs - producer.acks = "{{ .Values.spanNormalizerConfig.kafkaStreamsConfig.producerAcks }}" - producer.batch.size = "{{ int .Values.spanNormalizerConfig.kafkaStreamsConfig.producerBatchSize }}" - producer.linger.ms = "{{ int .Values.spanNormalizerConfig.kafkaStreamsConfig.producerLingerMs }}" - producer.compression.type = "{{ .Values.spanNormalizerConfig.kafkaStreamsConfig.producerCompressionType }}" - producer.max.request.size = "{{ int .Values.spanNormalizerConfig.kafkaStreamsConfig.producerMaxRequestSize }}" - producer.buffer.memory = "{{ int .Values.spanNormalizerConfig.kafkaStreamsConfig.producerBufferMemory }}" - # Consumer configs - consumer.max.partition.fetch.bytes = "{{ int .Values.spanNormalizerConfig.kafkaStreamsConfig.consumerMaxPartitionFetchBytes }}" - consumer.max.poll.records = "{{ int .Values.spanNormalizerConfig.kafkaStreamsConfig.consumerMaxPollRecords }}" - consumer.session.timeout.ms = "{{ int .Values.spanNormalizerConfig.kafkaStreamsConfig.consumerSessionTimeoutMs }}" - # Exception handler configs - default.production.exception.handler = {{ .Values.spanNormalizerConfig.kafkaStreamsConfig.defaultProductionExceptionHandler }} - ignore.production.exception.classes = {{ .Values.spanNormalizerConfig.kafkaStreamsConfig.ignoreProductionExceptionClasses }} - # Others - metrics.recording.level = "{{ .Values.spanNormalizerConfig.kafkaStreamsConfig.metricsRecordingLevel }}" - {{- if .Values.spanNormalizerConfig.extraKafkaStreamsConfig }} - {{- range $key,$value := .Values.spanNormalizerConfig.extraKafkaStreamsConfig }} - {{ $key }} = {{ $value }} - {{- end }} - {{- end }} - } - - group.partitioner = { - enabled = {{ .Values.spanNormalizerConfig.groupPartitionerEnabled }} - service.host = {{ .Values.spanNormalizerConfig.groupPartitionerConfigServiceHost }} - service.port = {{ .Values.spanNormalizerConfig.groupPartitionerConfigServicePort }} - } - - {{- if hasKey .Values.spanNormalizerConfig "processor" }} - processor { - {{- if hasKey .Values.spanNormalizerConfig.processor "tenantIdTagKey" }} - tenantIdTagKey = "{{ .Values.spanNormalizerConfig.processor.tenantIdTagKey }}" - {{- end }} - - {{- if hasKey .Values.spanNormalizerConfig.processor "excludeTenantIds" }} - excludeTenantIds = {{ .Values.spanNormalizerConfig.processor.excludeTenantIds | toJson }} - {{- end }} - - {{- if hasKey .Values.spanNormalizerConfig.processor "defaultTenantId" }} - defaultTenantId = "{{ .Values.spanNormalizerConfig.processor.defaultTenantId }}" - {{- end }} - - {{- if hasKey .Values.spanNormalizerConfig.processor "spanDropCriterion" }} - spanDropCriterion = {{ .Values.spanNormalizerConfig.processor.spanDropCriterion | toJson }} - {{- end }} - - {{- if hasKey .Values.spanNormalizerConfig.processor "spanDropFilters" }} - spanDropFilters = {{ .Values.spanNormalizerConfig.processor.spanDropFilters | toJson }} - {{- end }} - - {{- if hasKey .Values.spanNormalizerConfig.processor "bypassKey" }} - bypass.key = "{{ .Values.spanNormalizerConfig.processor.bypassKey }}" - {{- end }} - - {{- if hasKey .Values.spanNormalizerConfig.processor "lateArrivalThresholdDuration" }} - late.arrival.threshold.duration = "{{ .Values.spanNormalizerConfig.processor.lateArrivalThresholdDuration }}" - {{- end }} - - {{- if hasKey .Values.spanNormalizerConfig.processor "allowedAttributesPrefixes" }} - allowed.attributes.prefixes = {{ .Values.spanNormalizerConfig.processor.allowedAttributesPrefixes | toJson }} - {{- end }} - - {{- if hasKey .Values.spanNormalizerConfig.processor "prefixedMatchedAllowedAttributes" }} - prefixed.matched.allowed.attributes = {{ .Values.spanNormalizerConfig.processor.prefixedMatchedAllowedAttributes | toJson }} - {{- end }} - - {{- if hasKey .Values.spanNormalizerConfig.processor "rootExitSpanDropCriterion" }} - rootExitSpanDropCriterion = {{ .Values.spanNormalizerConfig.processor.rootExitSpanDropCriterion | toJson }} - {{- end }} - - {{- if hasKey .Values.spanNormalizerConfig.processor "excludeLogsTenantIds" }} - excludeLogsTenantIds = {{ .Values.spanNormalizerConfig.processor.excludeLogsTenantIds | toJson }} - {{- end }} - } - {{- end }} - - {{- if hasKey .Values.spanNormalizerConfig "metrics" }} - metrics { - reporter { - names = {{- toJson .Values.spanNormalizerConfig.metrics.reporter.names | trim | nindent 12 }} - } - } - {{- end }} - clients { - config.service.config = { - host = {{ .Values.spanNormalizerConfig.configServiceHost }} - port = {{ .Values.spanNormalizerConfig.configServicePort }} - } - } - span.rules.exclude { - cache = { - refreshAfterWriteDuration = {{ .Values.spanNormalizerConfig.excludeSpanRulesConfig.cache.refreshAfterWriteDuration }} - expireAfterWriteDuration = {{ .Values.spanNormalizerConfig.excludeSpanRulesConfig.cache.expireAfterWriteDuration }} - } - } - {{- if hasKey .Values.spanNormalizerConfig "rateLimitConfig" }} - rate.limit.config = [ - {{- range $k,$v := $.Values.spanNormalizerConfig.rateLimitConfig }} - { - tenantId = {{ $v.tenantId }} - groupingKey = {{ $v.groupingKey }} - maxSpansPerMinute = {{ $v.maxSpansPerMinute }} - }, - {{- end }} - ] - {{- end }} diff --git a/span-normalizer/helm/values.yaml b/span-normalizer/helm/values.yaml deleted file mode 100644 index 47307cded..000000000 --- a/span-normalizer/helm/values.yaml +++ /dev/null @@ -1,165 +0,0 @@ -# Default values for the helm chart. -# This is a YAML-formatted file. -# Declare variables to be passed into your templates. -# -# Note about Namespace -# -------------------- -# It is deliberately left out here and using the helm -n or --namespace flag you can deploy your resources to the same -# namespace as the release. If you leave it out, your resources will be deployed to the default namespace. -# Also, not that the namespace you are deploying to should already exist otherwise the helm command will fail. -# You can always specify a different namespace for a resource by setting it directly in it's yaml file or -# making it configurable by defining it in this file. - -########### -# Deployment -########### -replicaCount: 1 -maxUnavailable: 0 - -image: - repository: hypertrace/span-normalizer - pullPolicy: IfNotPresent - tagOverride: "" - -imagePullSecrets: [] - -nodeLabels: {} - -# This is defined in resources/configs/span-normalizer/application.conf as service.admin.port -containerAdminPort: 8050 - -javaOpts: "-XX:InitialRAMPercentage=50.0 -XX:MaxRAMPercentage=75.0 -XX:MaxDirectMemorySize=128M" - -livenessProbe: - initialDelaySeconds: 10 - periodSeconds: 5 - -readinessProbe: - initialDelaySeconds: 2 - periodSeconds: 5 - -resources: - requests: - cpu: 0.1 - memory: 1024Mi - limits: - cpu: 0.2 - memory: 1024Mi - -deploymentLabels: - app: span-normalizer - -podLabels: - app: span-normalizer - -podAnnotations: {} - -affinity: {} - -securityContext: {} - -hostNetwork: false - -# The Deployment Selector match labels are different from the pod labels. Note that they should be a subset of the pod -# labels. You append new labels to them but cannot remove labels. If you remove or modify the labels you will need to -# delete the existing deployment bearing the same name and then redeploy. This is the reason why they are separated from -# the pod labels. You can add and remove pod labels without having an effect on the deployment. -# Also, please use "apiVersion: apps/v1" instead of the deprecated "apiVersion: extensions/v1beta1" for the deployment -# apiVersion in the yaml file. -deploymentSelectorMatchLabels: - app: span-normalizer - -serviceSelectorLabels: - app: span-normalizer - -########### -# Config Maps -########### -spanNormalizerConfig: - name: span-normalizer-config - # Important kafka streams configurations which are used in config template goes here. - kafkaStreamsConfig: - bootstrapServers: "bootstrap:9092" - schemaRegistryUrl: "http://schema-registry-service:8081" - # Core config - numStreamThreads: 2 # default = 1 - commitIntervalMs: 30000 # default = 30000 - # Common client (producer, consumer, admin) configs - receiveBufferBytes: 4194304 # default = 32768 (kafka streams default) - sendBufferBytes: 4194304 # default = 131072 (kafka streams default) - # Producer configs - producerAcks: all # default: 1 - producerBatchSize: 524288 # default = 16384 - producerLingerMs: 1000 # default = 100 (kafka streams default) - producerCompressionType: "gzip" # default = none - producerMaxRequestSize: 1048576 # default = 1048576 - producerBufferMemory: 134217728 # default = 33554432 - # Consumer configs - consumerMaxPartitionFetchBytes: 8388608 # default = 1048576 - consumerMaxPollRecords: 1000 # default = 1000 (kafka streams default) - consumerSessionTimeoutMs: 10000 # default = 10000 - # Exception handler configs - defaultProductionExceptionHandler: "org.hypertrace.core.kafkastreams.framework.exceptionhandlers.IgnoreProductionExceptionHandler" - ignoreProductionExceptionClasses: "org.apache.kafka.common.errors.RecordTooLargeException" - # Others - metricsRecordingLevel: INFO # default = INFO - # All other streams config goes here. - # Remove the flower braces and add key: value pair here. - extraKafkaStreamsConfig: {} - configServiceHost: config-service - configServicePort: 50101 - groupPartitionerEnabled: false - groupPartitionerConfigServiceHost: config-service - groupPartitionerConfigServicePort: 50104 - excludeSpanRulesConfig: - cache: - refreshAfterWriteDuration: 3m - expireAfterWriteDuration: 5m - rateLimitConfig: [] - -logConfig: - name: span-normalizer-log-appender-config - monitorInterval: 30 - rootLogger: - level: INFO - appender: - rolling: - enabled: false - -hpa: - enabled: false - minReplicas: 1 - maxReplicas: 5 - targetCPUUtilizationPercentage: 80 - -kafka-topic-creator: - enabled: true - jobName: raw-spans-topic-creator - helmHook: pre-install,pre-upgrade - kafka: - topics: - raw-spans-from-jaeger-spans: - replicationFactor: 3 - partitions: 8 - configs: - retention.bytes: 8589934592 # default = -1 - retention.ms: 86400000 # default = 604800000 (7 days) - max.message.bytes: 1048588 # default = 1048588 - raw-logs: - replicationFactor: 3 - partitions: 8 - configs: - retention.bytes: 8589934592 # default = -1 - retention.ms: 86400000 # default = 604800000 (7 days) - max.message.bytes: 1048588 # default = 1048588 - structured-traces-from-raw-spans: - replicationFactor: 3 - partitions: 8 - configs: - retention.bytes: 8589934592 # default = -1 - retention.ms: 86400000 # default = 604800000 (7 days) - max.message.bytes: 10485760 # default = 1048588 - zookeeper: - address: zookeeper:2181 - imagePullSecrets: [] - podAnnotations: {} diff --git a/span-normalizer/span-normalizer/build.gradle.kts b/span-normalizer/span-normalizer/build.gradle.kts deleted file mode 100644 index 45390d822..000000000 --- a/span-normalizer/span-normalizer/build.gradle.kts +++ /dev/null @@ -1,65 +0,0 @@ -plugins { - java - application - jacoco - id("org.hypertrace.docker-java-application-plugin") - id("org.hypertrace.docker-publish-plugin") - id("org.hypertrace.jacoco-report-plugin") -} - -application { - mainClass.set("org.hypertrace.core.serviceframework.PlatformServiceLauncher") -} - -hypertraceDocker { - defaultImage { - javaApplication { - adminPort.set(8050) - } - } -} - -// Config for gw run to be able to run this locally. Just execute gw run here on Intellij or on the console. -tasks.run { - jvmArgs = listOf("-Dservice.name=${project.name}") -} - -tasks.test { - useJUnitPlatform() -} - -dependencies { - implementation(project(":span-normalizer:raw-span-constants")) - implementation(project(":span-normalizer:span-normalizer-api")) - implementation(project(":span-normalizer:span-normalizer-constants")) - implementation(project(":semantic-convention-utils")) - - implementation("org.hypertrace.core.datamodel:data-model:0.1.27") - implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.49") - implementation("org.hypertrace.core.serviceframework:platform-metrics:0.1.49") - implementation("org.hypertrace.core.kafkastreams.framework:kafka-streams-framework:0.2.6") - implementation("org.hypertrace.core.kafkastreams.framework:weighted-group-partitioner:0.2.6") - implementation("org.hypertrace.config.service:span-processing-config-service-api:0.1.47") - implementation("org.hypertrace.config.service:span-processing-utils:0.1.47") - implementation("org.hypertrace.core.grpcutils:grpc-client-utils:0.11.2") - implementation("org.hypertrace.core.grpcutils:grpc-context-utils:0.11.2") - implementation("com.google.guava:guava:31.1-jre") - - // Required for the GRPC clients. - runtimeOnly("io.grpc:grpc-netty:1.50.0") - annotationProcessor("org.projectlombok:lombok:1.18.18") - compileOnly("org.projectlombok:lombok:1.18.18") - - implementation("de.javakaffee:kryo-serializers:0.45") - implementation("org.apache.commons:commons-lang3:3.12.0") - - // Logging - implementation("org.slf4j:slf4j-api:1.7.30") - runtimeOnly("org.apache.logging.log4j:log4j-slf4j-impl:2.17.1") - - testImplementation("org.junit.jupiter:junit-jupiter:5.9.0") - testImplementation("org.hypertrace.core.serviceframework:platform-metrics:0.1.49") - testImplementation("org.junit-pioneer:junit-pioneer:1.7.1") - testImplementation("org.mockito:mockito-core:4.7.0") - testImplementation("org.apache.kafka:kafka-streams-test-utils:7.2.1-ccs") -} diff --git a/span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/SpanNormalizer.java b/span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/SpanNormalizer.java deleted file mode 100644 index ddcf6c445..000000000 --- a/span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/SpanNormalizer.java +++ /dev/null @@ -1,113 +0,0 @@ -package org.hypertrace.core.spannormalizer; - -import static org.hypertrace.core.spannormalizer.constants.SpanNormalizerConstants.BYPASS_OUTPUT_TOPIC_CONFIG_KEY; -import static org.hypertrace.core.spannormalizer.constants.SpanNormalizerConstants.INPUT_TOPIC_CONFIG_KEY; -import static org.hypertrace.core.spannormalizer.constants.SpanNormalizerConstants.OUTPUT_TOPIC_CONFIG_KEY; -import static org.hypertrace.core.spannormalizer.constants.SpanNormalizerConstants.OUTPUT_TOPIC_RAW_LOGS_CONFIG_KEY; -import static org.hypertrace.core.spannormalizer.constants.SpanNormalizerConstants.SPAN_NORMALIZER_JOB_CONFIG; - -import com.typesafe.config.Config; -import io.jaegertracing.api_v2.JaegerSpanInternalModel.Span; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.streams.StreamsBuilder; -import org.apache.kafka.streams.kstream.Consumed; -import org.apache.kafka.streams.kstream.KStream; -import org.apache.kafka.streams.kstream.Produced; -import org.apache.kafka.streams.processor.StreamPartitioner; -import org.hypertrace.core.datamodel.RawSpan; -import org.hypertrace.core.kafkastreams.framework.KafkaStreamsApp; -import org.hypertrace.core.kafkastreams.framework.partitioner.GroupPartitionerBuilder; -import org.hypertrace.core.kafkastreams.framework.partitioner.KeyHashPartitioner; -import org.hypertrace.core.serviceframework.config.ConfigClient; -import org.hypertrace.core.spannormalizer.jaeger.JaegerSpanPreProcessor; -import org.hypertrace.core.spannormalizer.jaeger.JaegerSpanSerde; -import org.hypertrace.core.spannormalizer.jaeger.JaegerSpanToAvroRawSpanTransformer; -import org.hypertrace.core.spannormalizer.jaeger.JaegerSpanToLogRecordsTransformer; -import org.hypertrace.core.spannormalizer.jaeger.PreProcessedSpan; -import org.hypertrace.core.spannormalizer.rawspan.ByPassPredicate; -import org.hypertrace.core.spannormalizer.rawspan.RawSpanToStructuredTraceTransformer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class SpanNormalizer extends KafkaStreamsApp { - - private static final Logger logger = LoggerFactory.getLogger(SpanNormalizer.class); - - public SpanNormalizer(ConfigClient configClient) { - super(configClient); - } - - @Override - public StreamsBuilder buildTopology( - Map streamsProperties, - StreamsBuilder streamsBuilder, - Map> inputStreams) { - - Config jobConfig = getJobConfig(streamsProperties); - String inputTopic = jobConfig.getString(INPUT_TOPIC_CONFIG_KEY); - String outputTopic = jobConfig.getString(OUTPUT_TOPIC_CONFIG_KEY); - String bypassOutputTopic = jobConfig.getString(BYPASS_OUTPUT_TOPIC_CONFIG_KEY); - String outputTopicRawLogs = jobConfig.getString(OUTPUT_TOPIC_RAW_LOGS_CONFIG_KEY); - - KStream inputStream = (KStream) inputStreams.get(inputTopic); - if (inputStream == null) { - inputStream = - streamsBuilder.stream( - inputTopic, Consumed.with(Serdes.ByteArray(), new JaegerSpanSerde())); - inputStreams.put(inputTopic, inputStream); - } - - KStream preProcessedStream = - inputStream.transform(() -> new JaegerSpanPreProcessor(getGrpcChannelRegistry())); - - KStream[] branches = - preProcessedStream - .transform(JaegerSpanToAvroRawSpanTransformer::new) - .branch(new ByPassPredicate(jobConfig), (key, value) -> true); - branches[0].transform(RawSpanToStructuredTraceTransformer::new).to(bypassOutputTopic); - - StreamPartitioner groupPartitioner = - new GroupPartitionerBuilder() - .buildPartitioner( - "spans", - jobConfig, - (traceid, span) -> traceid.getTenantId(), - new KeyHashPartitioner<>(), - getGrpcChannelRegistry()); - branches[1].to(outputTopic, Produced.with(null, null, groupPartitioner)); - - preProcessedStream.transform(JaegerSpanToLogRecordsTransformer::new).to(outputTopicRawLogs); - return streamsBuilder; - } - - @Override - public String getJobConfigKey() { - return SPAN_NORMALIZER_JOB_CONFIG; - } - - @Override - public Logger getLogger() { - return logger; - } - - @Override - public List getInputTopics(Map properties) { - Config jobConfig = getJobConfig(properties); - return Collections.singletonList(jobConfig.getString(INPUT_TOPIC_CONFIG_KEY)); - } - - @Override - public List getOutputTopics(Map properties) { - Config jobConfig = getJobConfig(properties); - return List.of( - jobConfig.getString(OUTPUT_TOPIC_CONFIG_KEY), - jobConfig.getString(OUTPUT_TOPIC_RAW_LOGS_CONFIG_KEY)); - } - - private Config getJobConfig(Map properties) { - return (Config) properties.get(getJobConfigKey()); - } -} diff --git a/span-normalizer/span-normalizer/src/main/resources/configs/common/application.conf b/span-normalizer/span-normalizer/src/main/resources/configs/common/application.conf deleted file mode 100644 index 7bd8f412a..000000000 --- a/span-normalizer/span-normalizer/src/main/resources/configs/common/application.conf +++ /dev/null @@ -1,71 +0,0 @@ -service.name = span-normalizer -service.admin.port = 8050 - -main.class = org.hypertrace.core.spannormalizer.SpanNormalizer - -span.type = jaeger -input.topic = "jaeger-spans" -output.topic = "raw-spans-from-jaeger-spans" -bypass.output.topic = "structured-traces-from-raw-spans" -raw.logs.output.topic = "raw-logs" -precreate.topics = false -precreate.topics = ${?PRE_CREATE_TOPICS} - -kafka.streams.config = { - application.id = jaeger-spans-to-raw-spans-job - num.stream.threads = 2 - num.stream.threads = ${?NUM_STREAM_THREADS} - - bootstrap.servers = "localhost:9092" - bootstrap.servers = ${?KAFKA_BOOTSTRAP_SERVERS} - - schema.registry.url = "http://localhost:8081" - schema.registry.url = ${?SCHEMA_REGISTRY_URL} - value.subject.name.strategy = "io.confluent.kafka.serializers.subject.TopicRecordNameStrategy" -} - -group.partitioner = { - enabled = false - service.host = localhost - service.port = 50104 -} - -processor { - defaultTenantId = ${?DEFAULT_TENANT_ID} - late.arrival.threshold.duration = 365d - - # Configuration for dropping certain attributes that are captured by agent, but doesn't require in - # the processing pipeline. - # - # allowed.attributes.prefixes : the list of prefixes that should match for which allowed keys - # prefixed.matched.allowed.attributes : allowed keys from the subset of keys where prefix matched - # - # If either of config is empty allowed.attributes.prefixes or prefixed.matched.allowed.attributes, - # it will not drop any attributes. - # The above configuration doesn't impact if the key doesn't start with prefix. - allowed.attributes.prefixes = [] - prefixed.matched.allowed.attributes = [] -} - -logger.names = ["file"] -logger.file.dir = "/var/logs/span-normalizer" - -metrics.reporter.prefix = org.hypertrace.core.spannormalizer.jobSpanNormalizer -metrics.reporter.names = ["prometheus"] -metrics.reportInterval = 60 - -clients = { - config.service.config = { - host = localhost - port = 50101 - } -} - -span.rules.exclude { - cache = { - refreshAfterWriteDuration = 3m - expireAfterWriteDuration = 5m - } -} - -rate.limit.config = [] diff --git a/span-normalizer/span-normalizer/src/main/resources/log4j2.properties b/span-normalizer/span-normalizer/src/main/resources/log4j2.properties deleted file mode 100644 index d91bc7bfe..000000000 --- a/span-normalizer/span-normalizer/src/main/resources/log4j2.properties +++ /dev/null @@ -1,23 +0,0 @@ -status=error -name=PropertiesConfig -appender.console.type=Console -appender.console.name=STDOUT -appender.console.layout.type=PatternLayout -appender.console.layout.pattern=%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %c{1.} - %msg%n -appender.rolling.type=RollingFile -appender.rolling.name=ROLLING_FILE -appender.rolling.fileName=${sys:service.name:-service}.log -appender.rolling.filePattern=${sys:service.name:-service}-%d{MM-dd-yy-HH-mm-ss}-%i.log.gz -appender.rolling.layout.type=PatternLayout -appender.rolling.layout.pattern=%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %c{1.} - %msg%n -appender.rolling.policies.type=Policies -appender.rolling.policies.time.type=TimeBasedTriggeringPolicy -appender.rolling.policies.time.interval=3600 -appender.rolling.policies.time.modulate=true -appender.rolling.policies.size.type=SizeBasedTriggeringPolicy -appender.rolling.policies.size.size=20MB -appender.rolling.strategy.type=DefaultRolloverStrategy -appender.rolling.strategy.max=5 -rootLogger.level=INFO -rootLogger.appenderRef.stdout.ref=STDOUT -rootLogger.appenderRef.rolling.ref=ROLLING_FILE diff --git a/span-normalizer/span-normalizer/src/test/java/org/hypertrace/core/spannormalizer/SpanNormalizerTest.java b/span-normalizer/span-normalizer/src/test/java/org/hypertrace/core/spannormalizer/SpanNormalizerTest.java deleted file mode 100644 index 6276449bd..000000000 --- a/span-normalizer/span-normalizer/src/test/java/org/hypertrace/core/spannormalizer/SpanNormalizerTest.java +++ /dev/null @@ -1,691 +0,0 @@ -package org.hypertrace.core.spannormalizer; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertTrue; - -import com.google.protobuf.ByteString; -import com.google.protobuf.Timestamp; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; -import io.jaegertracing.api_v2.JaegerSpanInternalModel; -import io.jaegertracing.api_v2.JaegerSpanInternalModel.Log; -import io.jaegertracing.api_v2.JaegerSpanInternalModel.Span; -import java.time.Instant; -import java.time.temporal.ChronoUnit; -import java.util.HashMap; -import java.util.Map; -import java.util.Properties; -import org.apache.kafka.common.serialization.Serde; -import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.streams.KeyValue; -import org.apache.kafka.streams.StreamsBuilder; -import org.apache.kafka.streams.TestInputTopic; -import org.apache.kafka.streams.TestOutputTopic; -import org.apache.kafka.streams.TopologyTestDriver; -import org.hypertrace.core.datamodel.LogEvents; -import org.hypertrace.core.datamodel.RawSpan; -import org.hypertrace.core.datamodel.StructuredTrace; -import org.hypertrace.core.datamodel.shared.HexUtils; -import org.hypertrace.core.kafkastreams.framework.serdes.AvroSerde; -import org.hypertrace.core.serviceframework.config.ConfigClientFactory; -import org.hypertrace.core.spannormalizer.constants.SpanNormalizerConstants; -import org.hypertrace.core.spannormalizer.jaeger.JaegerSpanSerde; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junitpioneer.jupiter.SetEnvironmentVariable; - -class SpanNormalizerTest { - - private static final String SERVICE_NAME = "servicename"; - private SpanNormalizer underTest; - - @BeforeEach - @SetEnvironmentVariable(key = "SERVICE_NAME", value = "span-normalizer") - public void setUp() { - underTest = new SpanNormalizer(ConfigClientFactory.getClient()); - } - - @Test - @SetEnvironmentVariable(key = "SERVICE_NAME", value = "span-normalizer") - public void whenJaegerSpansAreProcessedExpectRawSpansToBeOutput() { - Config config = - ConfigFactory.parseURL( - getClass().getClassLoader().getResource("configs/span-normalizer/application.conf")); - - Map mergedProps = new HashMap<>(); - underTest.getBaseStreamsConfig().forEach(mergedProps::put); - underTest.getStreamsConfig(config).forEach(mergedProps::put); - mergedProps.put(SpanNormalizerConstants.SPAN_NORMALIZER_JOB_CONFIG, config); - - StreamsBuilder streamsBuilder = - underTest.buildTopology(mergedProps, new StreamsBuilder(), new HashMap<>()); - - Properties props = new Properties(); - mergedProps.forEach(props::put); - - TopologyTestDriver td = new TopologyTestDriver(streamsBuilder.build(), props); - TestInputTopic inputTopic = - td.createInputTopic( - config.getString(SpanNormalizerConstants.INPUT_TOPIC_CONFIG_KEY), - Serdes.ByteArray().serializer(), - new JaegerSpanSerde().serializer()); - - Serde rawSpanSerde = new AvroSerde<>(); - rawSpanSerde.configure(Map.of(), false); - - Serde spanIdentitySerde = new AvroSerde<>(); - spanIdentitySerde.configure(Map.of(), true); - - TestOutputTopic outputTopic = - td.createOutputTopic( - config.getString(SpanNormalizerConstants.OUTPUT_TOPIC_CONFIG_KEY), - spanIdentitySerde.deserializer(), - rawSpanSerde.deserializer()); - - TestOutputTopic rawLogOutputTopic = - td.createOutputTopic( - config.getString(SpanNormalizerConstants.OUTPUT_TOPIC_RAW_LOGS_CONFIG_KEY), - spanIdentitySerde.deserializer(), - new AvroSerde<>().deserializer()); - - Span span = - Span.newBuilder() - .setSpanId(ByteString.copyFrom("1".getBytes())) - .setTraceId(ByteString.copyFrom("trace-1".getBytes())) - .addTags( - JaegerSpanInternalModel.KeyValue.newBuilder() - .setKey("jaeger.servicename") - .setVStr(SERVICE_NAME) - .build()) - .addLogs( - Log.newBuilder() - .setTimestamp(Timestamp.newBuilder().setSeconds(5).build()) - .addFields( - JaegerSpanInternalModel.KeyValue.newBuilder() - .setKey("e1") - .setVStr("some event detail") - .build()) - .addFields( - JaegerSpanInternalModel.KeyValue.newBuilder() - .setKey("e2") - .setVStr("some event detail") - .build())) - .addLogs( - Log.newBuilder() - .setTimestamp(Timestamp.newBuilder().setSeconds(10).build()) - .addFields( - JaegerSpanInternalModel.KeyValue.newBuilder() - .setKey("z2") - .setVStr("some event detail") - .build())) - .build(); - inputTopic.pipeInput(span); - - KeyValue kv = outputTopic.readKeyValue(); - assertEquals("__default", kv.key.getTenantId()); - assertEquals( - HexUtils.getHex(ByteString.copyFrom("trace-1".getBytes()).toByteArray()), - HexUtils.getHex(kv.key.getTraceId().array())); - RawSpan value = kv.value; - assertEquals(HexUtils.getHex("1".getBytes()), HexUtils.getHex((value).getEvent().getEventId())); - assertEquals(SERVICE_NAME, value.getEvent().getServiceName()); - - KeyValue keyValue = rawLogOutputTopic.readKeyValue(); - LogEvents logEvents = keyValue.value; - Assertions.assertEquals(2, logEvents.getLogEvents().size()); - - // pipe in one more span which doesn't match spanDropFilters - Span span2 = - Span.newBuilder() - .setSpanId(ByteString.copyFrom("2".getBytes())) - .setTraceId(ByteString.copyFrom("trace-2".getBytes())) - .addTags( - JaegerSpanInternalModel.KeyValue.newBuilder() - .setKey("jaeger.servicename") - .setVStr(SERVICE_NAME) - .build()) - .addTags( - JaegerSpanInternalModel.KeyValue.newBuilder() - .setKey("http.method") - .setVStr("GET") - .build()) - .build(); - - inputTopic.pipeInput(span2); - KeyValue kv1 = outputTopic.readKeyValue(); - assertNotNull(kv1); - assertEquals("__default", kv1.key.getTenantId()); - assertEquals( - HexUtils.getHex(ByteString.copyFrom("trace-2".getBytes()).toByteArray()), - HexUtils.getHex(kv1.key.getTraceId().array())); - - // pipe in one more span which match one of spanDropFilters (http.method & http.url) - Span span3 = - Span.newBuilder() - .setSpanId(ByteString.copyFrom("3".getBytes())) - .setTraceId(ByteString.copyFrom("trace-3".getBytes())) - .addTags( - JaegerSpanInternalModel.KeyValue.newBuilder() - .setKey("jaeger.servicename") - .setVStr(SERVICE_NAME) - .build()) - .addTags( - JaegerSpanInternalModel.KeyValue.newBuilder() - .setKey("http.method") - .setVStr("GET") - .build()) - .addTags( - JaegerSpanInternalModel.KeyValue.newBuilder() - .setKey("http.url") - .setVStr("http://xyz.com/health/check") - .build()) - .build(); - - inputTopic.pipeInput(span3); - assertTrue(outputTopic.isEmpty()); - - // pipe in one more span which match one of spanDropFilters (grpc.url) - Span span4 = - Span.newBuilder() - .setSpanId(ByteString.copyFrom("3".getBytes())) - .setTraceId(ByteString.copyFrom("trace-3".getBytes())) - .addTags( - JaegerSpanInternalModel.KeyValue.newBuilder() - .setKey("jaeger.servicename") - .setVStr(SERVICE_NAME) - .build()) - .addTags( - JaegerSpanInternalModel.KeyValue.newBuilder() - .setKey("grpc.url") - .setVStr("doesn't match with input filter set") - .build()) - .build(); - - inputTopic.pipeInput(span4); - assertTrue(outputTopic.isEmpty()); - - // pipe in one more span which match one of spanDropFilters (operation_name, and span.kind not - // exists) - Span span5 = - Span.newBuilder() - .setSpanId(ByteString.copyFrom("4".getBytes())) - .setTraceId(ByteString.copyFrom("trace-4".getBytes())) - .setOperationName("/api/") - .addTags( - JaegerSpanInternalModel.KeyValue.newBuilder() - .setKey("jaeger.servicename") - .setVStr(SERVICE_NAME) - .build()) - .addTags( - JaegerSpanInternalModel.KeyValue.newBuilder() - .setKey("grpc1.url1") - .setVStr("xyz") - .build()) - .build(); - - inputTopic.pipeInput(span5); - assertTrue(outputTopic.isEmpty()); - - // pipe in one more span which does not match one of spanDropFilters (operation_name, and - // span.kind not - // exists) - Span span6 = - Span.newBuilder() - .setSpanId(ByteString.copyFrom("6".getBytes())) - .setTraceId(ByteString.copyFrom("trace-6".getBytes())) - .setOperationName("/api-should-be-there/") - .addTags( - JaegerSpanInternalModel.KeyValue.newBuilder() - .setKey("jaeger.servicename") - .setVStr(SERVICE_NAME) - .build()) - .addTags( - JaegerSpanInternalModel.KeyValue.newBuilder() - .setKey("grapc.url1") - .setVStr("xyz") - .build()) - .build(); - - inputTopic.pipeInput(span6); - KeyValue span6KV = outputTopic.readKeyValue(); - assertEquals("__default", kv1.key.getTenantId()); - assertEquals( - HexUtils.getHex(ByteString.copyFrom("trace-6".getBytes()).toByteArray()), - HexUtils.getHex(span6KV.key.getTraceId().array())); - - // pipe in one more span which does not match one of spanDropFilters (operation_name, and - // span.kind not - // exists) - Span span7 = - Span.newBuilder() - .setSpanId(ByteString.copyFrom("7".getBytes())) - .setTraceId(ByteString.copyFrom("trace-7".getBytes())) - .setOperationName("/api/") - .addTags( - JaegerSpanInternalModel.KeyValue.newBuilder() - .setKey("jaeger.servicename") - .setVStr(SERVICE_NAME) - .build()) - .addTags( - JaegerSpanInternalModel.KeyValue.newBuilder() - .setKey("span.kind") - .setVStr("client") - .build()) - .build(); - - inputTopic.pipeInput(span7); - KeyValue span7KV = outputTopic.readKeyValue(); - assertEquals("__default", kv1.key.getTenantId()); - assertEquals( - HexUtils.getHex(ByteString.copyFrom("trace-7".getBytes()).toByteArray()), - HexUtils.getHex(span7KV.key.getTraceId().array())); - } - - @Test - @SetEnvironmentVariable(key = "SERVICE_NAME", value = "span-normalizer") - public void whenByPassedExpectStructuredTraceToBeOutput() { - Config config = - ConfigFactory.parseURL( - getClass().getClassLoader().getResource("configs/span-normalizer/application.conf")); - - Map mergedProps = new HashMap<>(); - underTest.getBaseStreamsConfig().forEach(mergedProps::put); - underTest.getStreamsConfig(config).forEach(mergedProps::put); - mergedProps.put(SpanNormalizerConstants.SPAN_NORMALIZER_JOB_CONFIG, config); - - StreamsBuilder streamsBuilder = - underTest.buildTopology(mergedProps, new StreamsBuilder(), new HashMap<>()); - - Properties props = new Properties(); - mergedProps.forEach(props::put); - - TopologyTestDriver td = new TopologyTestDriver(streamsBuilder.build(), props); - TestInputTopic inputTopic = - td.createInputTopic( - config.getString(SpanNormalizerConstants.INPUT_TOPIC_CONFIG_KEY), - Serdes.ByteArray().serializer(), - new JaegerSpanSerde().serializer()); - - Serde rawSpanSerde = new AvroSerde<>(); - rawSpanSerde.configure(Map.of(), false); - - Serde structuredTraceSerde = new AvroSerde<>(); - structuredTraceSerde.configure(Map.of(), false); - - Serde spanIdentitySerde = new AvroSerde<>(); - spanIdentitySerde.configure(Map.of(), true); - - TestOutputTopic outputTopic = - td.createOutputTopic( - config.getString(SpanNormalizerConstants.OUTPUT_TOPIC_CONFIG_KEY), - spanIdentitySerde.deserializer(), - rawSpanSerde.deserializer()); - - TestOutputTopic bypassOutputTopic = - td.createOutputTopic( - config.getString(SpanNormalizerConstants.BYPASS_OUTPUT_TOPIC_CONFIG_KEY), - Serdes.String().deserializer(), - structuredTraceSerde.deserializer()); - - TestOutputTopic rawLogOutputTopic = - td.createOutputTopic( - config.getString(SpanNormalizerConstants.OUTPUT_TOPIC_RAW_LOGS_CONFIG_KEY), - spanIdentitySerde.deserializer(), - new AvroSerde<>().deserializer()); - - // with logs event, with bypass key - // expects no output to raw-span-grouper - // expects output to trace-enricher - // expects log output - Span span1 = - Span.newBuilder() - .setSpanId(ByteString.copyFrom("1".getBytes())) - .setTraceId(ByteString.copyFrom("trace-1".getBytes())) - .addTags( - JaegerSpanInternalModel.KeyValue.newBuilder() - .setKey("jaeger.servicename") - .setVStr(SERVICE_NAME) - .build()) - .addTags( - JaegerSpanInternalModel.KeyValue.newBuilder() - .setKey("test.bypass") - .setVStr("true") - .build()) - .addLogs( - Log.newBuilder() - .setTimestamp(Timestamp.newBuilder().setSeconds(10).build()) - .addFields( - JaegerSpanInternalModel.KeyValue.newBuilder() - .setKey("z2") - .setVStr("some event detail") - .build())) - .build(); - inputTopic.pipeInput(span1); - - // validate output for trace-enricher - assertFalse(bypassOutputTopic.isEmpty()); - KeyValue kv1 = bypassOutputTopic.readKeyValue(); - assertEquals("__default", kv1.value.getCustomerId()); - assertEquals( - HexUtils.getHex(ByteString.copyFrom("trace-1".getBytes()).toByteArray()), - HexUtils.getHex(kv1.value.getTraceId().array())); - - // validate no output for raw-spans-grouper - assertTrue(outputTopic.isEmpty()); - - // validate that no change in log traffic - assertFalse(rawLogOutputTopic.isEmpty()); - LogEvents logEvents = (LogEvents) rawLogOutputTopic.readKeyValue().value; - Assertions.assertEquals(1, logEvents.getLogEvents().size()); - - // with logs event, without bypass key - // expects output to raw-span-grouper - // expects no output to trace-enricher - // expects log output - Span span2 = - Span.newBuilder() - .setSpanId(ByteString.copyFrom("2".getBytes())) - .setTraceId(ByteString.copyFrom("trace-2".getBytes())) - .addTags( - JaegerSpanInternalModel.KeyValue.newBuilder() - .setKey("jaeger.servicename") - .setVStr(SERVICE_NAME) - .build()) - .addTags( - JaegerSpanInternalModel.KeyValue.newBuilder() - .setKey("http.method") - .setVStr("GET") - .build()) - .addLogs( - Log.newBuilder() - .setTimestamp(Timestamp.newBuilder().setSeconds(10).build()) - .addFields( - JaegerSpanInternalModel.KeyValue.newBuilder() - .setKey("z2") - .setVStr("some event detail") - .build())) - .build(); - - inputTopic.pipeInput(span2); - - // validate that no output to trace-enricher - assertTrue(bypassOutputTopic.isEmpty()); - - // validate that output to raw-spans-grouper - assertFalse(outputTopic.isEmpty()); - KeyValue kv2 = outputTopic.readKeyValue(); - assertEquals("__default", kv2.key.getTenantId()); - assertEquals( - HexUtils.getHex(ByteString.copyFrom("trace-2".getBytes()).toByteArray()), - HexUtils.getHex(kv2.key.getTraceId().array())); - - // validate that no change in log traffic - assertFalse(rawLogOutputTopic.isEmpty()); - logEvents = (LogEvents) rawLogOutputTopic.readKeyValue().value; - Assertions.assertEquals(1, logEvents.getLogEvents().size()); - - // with logs event, with bypass key but false value - // expects output to raw-span-grouper - // expects no output to trace-enricher - // expects log output - Span span3 = - Span.newBuilder() - .setSpanId(ByteString.copyFrom("3".getBytes())) - .setTraceId(ByteString.copyFrom("trace-3".getBytes())) - .addTags( - JaegerSpanInternalModel.KeyValue.newBuilder() - .setKey("jaeger.servicename") - .setVStr(SERVICE_NAME) - .build()) - .addTags( - JaegerSpanInternalModel.KeyValue.newBuilder() - .setKey("http.method") - .setVStr("GET") - .build()) - .addTags( - JaegerSpanInternalModel.KeyValue.newBuilder() - .setKey("test.bypass") - .setVStr("false") - .build()) - .addLogs( - Log.newBuilder() - .setTimestamp(Timestamp.newBuilder().setSeconds(10).build()) - .addFields( - JaegerSpanInternalModel.KeyValue.newBuilder() - .setKey("z2") - .setVStr("some event detail") - .build())) - .build(); - - inputTopic.pipeInput(span3); - - // validate that no output to trace-enricher - assertTrue(bypassOutputTopic.isEmpty()); - - // validate that output to raw-spans-grouper - assertFalse(outputTopic.isEmpty()); - KeyValue kv3 = outputTopic.readKeyValue(); - assertEquals("__default", kv3.key.getTenantId()); - assertEquals( - HexUtils.getHex(ByteString.copyFrom("trace-3".getBytes()).toByteArray()), - HexUtils.getHex(kv3.key.getTraceId().array())); - - // validate that no change in log traffic - assertFalse(rawLogOutputTopic.isEmpty()); - logEvents = (LogEvents) rawLogOutputTopic.readKeyValue().value; - Assertions.assertEquals(1, logEvents.getLogEvents().size()); - } - - @Test - @SetEnvironmentVariable(key = "SERVICE_NAME", value = "span-normalizer") - public void testLaterArrivalJaegerSpans() { - Config config = - ConfigFactory.parseURL( - getClass().getClassLoader().getResource("configs/span-normalizer/application.conf")); - - Map mergedProps = new HashMap<>(); - underTest.getBaseStreamsConfig().forEach(mergedProps::put); - underTest.getStreamsConfig(config).forEach(mergedProps::put); - mergedProps.put(SpanNormalizerConstants.SPAN_NORMALIZER_JOB_CONFIG, config); - - StreamsBuilder streamsBuilder = - underTest.buildTopology(mergedProps, new StreamsBuilder(), new HashMap<>()); - - Properties props = new Properties(); - mergedProps.forEach(props::put); - - TopologyTestDriver td = new TopologyTestDriver(streamsBuilder.build(), props); - TestInputTopic inputTopic = - td.createInputTopic( - config.getString(SpanNormalizerConstants.INPUT_TOPIC_CONFIG_KEY), - Serdes.ByteArray().serializer(), - new JaegerSpanSerde().serializer()); - - Serde rawSpanSerde = new AvroSerde<>(); - rawSpanSerde.configure(Map.of(), false); - - Serde spanIdentitySerde = new AvroSerde<>(); - spanIdentitySerde.configure(Map.of(), true); - - TestOutputTopic outputTopic = - td.createOutputTopic( - config.getString(SpanNormalizerConstants.OUTPUT_TOPIC_CONFIG_KEY), - spanIdentitySerde.deserializer(), - rawSpanSerde.deserializer()); - - TestOutputTopic rawLogOutputTopic = - td.createOutputTopic( - config.getString(SpanNormalizerConstants.OUTPUT_TOPIC_RAW_LOGS_CONFIG_KEY), - spanIdentitySerde.deserializer(), - new AvroSerde<>().deserializer()); - - // case 1: within threshold, expect output - Instant instant = Instant.now(); - Span span = - Span.newBuilder() - .setSpanId(ByteString.copyFrom("1".getBytes())) - .setTraceId(ByteString.copyFrom("trace-1".getBytes())) - .setStartTime(Timestamp.newBuilder().setSeconds(instant.getEpochSecond()).build()) - .addTags( - JaegerSpanInternalModel.KeyValue.newBuilder() - .setKey("jaeger.servicename") - .setVStr(SERVICE_NAME) - .build()) - .build(); - inputTopic.pipeInput(span); - - KeyValue kv = outputTopic.readKeyValue(); - assertEquals("__default", kv.key.getTenantId()); - assertEquals( - HexUtils.getHex(ByteString.copyFrom("trace-1".getBytes()).toByteArray()), - HexUtils.getHex(kv.key.getTraceId().array())); - RawSpan value = kv.value; - assertEquals(HexUtils.getHex("1".getBytes()), HexUtils.getHex((value).getEvent().getEventId())); - assertEquals(SERVICE_NAME, value.getEvent().getServiceName()); - - // outside threshold, except no output to RawSpan - Instant instant1 = Instant.now().minus(25, ChronoUnit.HOURS); - Span span2 = - Span.newBuilder() - .setSpanId(ByteString.copyFrom("2".getBytes())) - .setTraceId(ByteString.copyFrom("trace-2".getBytes())) - .setStartTime(Timestamp.newBuilder().setSeconds(instant1.getEpochSecond()).build()) - .addTags( - JaegerSpanInternalModel.KeyValue.newBuilder() - .setKey("jaeger.servicename") - .setVStr(SERVICE_NAME) - .build()) - .addTags( - JaegerSpanInternalModel.KeyValue.newBuilder() - .setKey("http.method") - .setVStr("GET") - .build()) - .build(); - - inputTopic.pipeInput(span2); - Assertions.assertTrue(outputTopic.isEmpty()); - } - - @Test - @SetEnvironmentVariable(key = "SERVICE_NAME", value = "span-normalizer") - public void testTagsFilteringForJaegerSpans() { - Config config = - ConfigFactory.parseURL( - getClass().getClassLoader().getResource("configs/span-normalizer/application.conf")); - - Map mergedProps = new HashMap<>(); - underTest.getBaseStreamsConfig().forEach(mergedProps::put); - underTest.getStreamsConfig(config).forEach(mergedProps::put); - mergedProps.put(SpanNormalizerConstants.SPAN_NORMALIZER_JOB_CONFIG, config); - - StreamsBuilder streamsBuilder = - underTest.buildTopology(mergedProps, new StreamsBuilder(), new HashMap<>()); - - Properties props = new Properties(); - mergedProps.forEach(props::put); - - TopologyTestDriver td = new TopologyTestDriver(streamsBuilder.build(), props); - TestInputTopic inputTopic = - td.createInputTopic( - config.getString(SpanNormalizerConstants.INPUT_TOPIC_CONFIG_KEY), - Serdes.ByteArray().serializer(), - new JaegerSpanSerde().serializer()); - - Serde rawSpanSerde = new AvroSerde<>(); - rawSpanSerde.configure(Map.of(), false); - - Serde spanIdentitySerde = new AvroSerde<>(); - spanIdentitySerde.configure(Map.of(), true); - - TestOutputTopic outputTopic = - td.createOutputTopic( - config.getString(SpanNormalizerConstants.OUTPUT_TOPIC_CONFIG_KEY), - spanIdentitySerde.deserializer(), - rawSpanSerde.deserializer()); - - TestOutputTopic rawLogOutputTopic = - td.createOutputTopic( - config.getString(SpanNormalizerConstants.OUTPUT_TOPIC_RAW_LOGS_CONFIG_KEY), - spanIdentitySerde.deserializer(), - new AvroSerde<>().deserializer()); - - // makes sure that e2e works, so it tests basic scenario, rest of the - // scenarios are covered in unit test of tagFilter - // so configure for http extension attributes - Instant instant = Instant.now(); - Span span = - Span.newBuilder() - .setSpanId(ByteString.copyFrom("1".getBytes())) - .setTraceId(ByteString.copyFrom("trace-1".getBytes())) - .setStartTime(Timestamp.newBuilder().setSeconds(instant.getEpochSecond()).build()) - .addTags( - JaegerSpanInternalModel.KeyValue.newBuilder() - .setKey("jaeger.servicename") - .setVStr(SERVICE_NAME) - .build()) - .addTags( - JaegerSpanInternalModel.KeyValue.newBuilder() - .setKey("http.request.header.x-allowed-1") - .setVStr(SERVICE_NAME) - .build()) - .addTags( - JaegerSpanInternalModel.KeyValue.newBuilder() - .setKey("http.response.header.x-allowed-2") - .setVStr(SERVICE_NAME) - .build()) - .addTags( - JaegerSpanInternalModel.KeyValue.newBuilder() - .setKey("http.request.header.x-not-allowed-1") - .setVStr(SERVICE_NAME) - .build()) - .addTags( - JaegerSpanInternalModel.KeyValue.newBuilder() - .setKey("http.response.header.x-not-allowed-2") - .setVStr(SERVICE_NAME) - .build()) - .build(); - - inputTopic.pipeInput(span); - - KeyValue kv = outputTopic.readKeyValue(); - assertEquals("__default", kv.key.getTenantId()); - assertEquals( - HexUtils.getHex(ByteString.copyFrom("trace-1".getBytes()).toByteArray()), - HexUtils.getHex(kv.key.getTraceId().array())); - RawSpan value = kv.value; - assertEquals(HexUtils.getHex("1".getBytes()), HexUtils.getHex((value).getEvent().getEventId())); - assertEquals(SERVICE_NAME, value.getEvent().getServiceName()); - - // test of attributes - Assertions.assertEquals(3, value.getEvent().getAttributes().getAttributeMap().size()); - - Assertions.assertTrue( - value - .getEvent() - .getAttributes() - .getAttributeMap() - .containsKey("http.request.header.x-allowed-1")); - Assertions.assertTrue( - value - .getEvent() - .getAttributes() - .getAttributeMap() - .containsKey("http.response.header.x-allowed-2")); - - Assertions.assertFalse( - value - .getEvent() - .getAttributes() - .getAttributeMap() - .containsKey("http.request.header.x-not-allowed-1")); - Assertions.assertFalse( - value - .getEvent() - .getAttributes() - .getAttributeMap() - .containsKey("http.response.header.x-not-allowed-2")); - } -} diff --git a/span-normalizer/span-normalizer/src/test/resources/configs/span-normalizer/application.conf b/span-normalizer/span-normalizer/src/test/resources/configs/span-normalizer/application.conf deleted file mode 100644 index 1dd51068e..000000000 --- a/span-normalizer/span-normalizer/src/test/resources/configs/span-normalizer/application.conf +++ /dev/null @@ -1,104 +0,0 @@ -service.name = span-normalizer -service.admin.port = 8050 - -main.class = org.hypertrace.core.spannormalizer.SpanNormalizer - -span.type = jaeger -input.topic = "jaeger-spans" -output.topic = "raw-spans-from-jaeger-spans" -bypass.output.topic = "structured-traces-from-raw-spans" -raw.logs.output.topic = "raw-logs" - -kafka.streams.config = { - application.id = jaeger-spans-to-raw-spans-job - metrics.recording.level = INFO - num.stream.threads = 2 - producer.compression.type = gzip - topology.optimization = all - - bootstrap.servers = "localhost:9092" - auto.offset.reset = "latest" - auto.commit.interval.ms = 5000 - schema.registry.url = "mock://localhost:8081" - default.key.serde="org.hypertrace.core.kafkastreams.framework.serdes.AvroSerde" - default.value.serde="org.hypertrace.core.kafkastreams.framework.serdes.AvroSerde" -} - -group.partitioner = { - enabled = false - service.host = localhost - service.port = 50104 -} - -processor { - defaultTenantId = "__default" -} - -processor { - spanDropFilters = [ - [ - { - "tagKey": "http.method", - "operator": "EQ", - "tagValue": "GET" - }, - { - "tagKey": "http.url", - "operator": "CONTAINS", - "tagValue": "health" - } - ], - [ - { - "tagKey": "grpc.url", - "operator": "NEQ", - "tagValue": "Sent.TestServiceGetEchos" - } - ], - [ - { - "tagKey": "ht.operation.name", - "operator": "EQ", - "tagValue": "/api/" - }, - { - "tagKey": "span.kind", - "operator": "NOT_EXISTS", - "tagValue": "" - } - ] - ] -} - -processor { - bypass.key = "test.bypass" - late.arrival.threshold.duration = "1d" - - # Configuration for dropping certain attributes that are captured by agent, but doesn't require in - # the processing pipeline. - # - # allowed.attributes.prefixes : the list of prefixes that should match for which allowed keys - # prefixed.matched.allowed.attributes : allowed keys from the subset of keys where prefix matched - # - # If either of config is empty allowed.attributes.prefixes or prefixed.matched.allowed.attributes, - # it will not drop any attributes. - # The above configuration doesn't impact if the key doesn't start with prefix. - allowed.attributes.prefixes = ["http.request.header.x-", "http.response.header.x-"] - prefixed.matched.allowed.attributes = ["http.request.header.x-allowed-1", "http.response.header.x-allowed-2"] -} - -clients = { - config.service.config = { - host = localhost - port = 50101 - } -} - -span.rules.exclude { - cache = { - refreshAfterWriteDuration = 3m - expireAfterWriteDuration = 5m - } -} - -rate.limit.config = []