From b2b5ce068e99f0d5213ef7b6e73e2cf398051f19 Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Sat, 4 Oct 2025 15:24:10 -0600 Subject: [PATCH 1/3] out_vivo_exporter: change API endpoints to /api/v1/ Signed-off-by: Eduardo Silva --- plugins/out_vivo_exporter/vivo.c | 155 ++++++++++++++++---------- plugins/out_vivo_exporter/vivo.h | 3 + plugins/out_vivo_exporter/vivo_http.c | 86 +++++++++++--- 3 files changed, 170 insertions(+), 74 deletions(-) diff --git a/plugins/out_vivo_exporter/vivo.c b/plugins/out_vivo_exporter/vivo.c index 9a6289bc42d..d1c279cbb5b 100644 --- a/plugins/out_vivo_exporter/vivo.c +++ b/plugins/out_vivo_exporter/vivo.c @@ -20,6 +20,7 @@ #include #include #include +#include #include #include @@ -27,16 +28,19 @@ #include "vivo_http.h" #include "vivo_stream.h" -static flb_sds_t format_logs(struct flb_event_chunk *event_chunk, struct flb_config *config) +static flb_sds_t format_logs(struct flb_input_instance *src_ins, + struct flb_event_chunk *event_chunk, struct flb_config *config) { - struct flb_log_event_decoder log_decoder; - struct flb_log_event log_event; + int len; int result; - int i; + char *name; flb_sds_t out_js; flb_sds_t out_buf = NULL; msgpack_sbuffer tmp_sbuf; msgpack_packer tmp_pck; + struct flb_log_event log_event; + struct flb_log_event_decoder log_decoder; + struct flb_mp_map_header mh; result = flb_log_event_decoder_init(&log_decoder, (char *) event_chunk->data, @@ -56,81 +60,110 @@ static flb_sds_t format_logs(struct flb_event_chunk *event_chunk, struct flb_con msgpack_sbuffer_init(&tmp_sbuf); msgpack_packer_init(&tmp_pck, &tmp_sbuf, msgpack_sbuffer_write); + /* + * Here is an example of the packaging done for Logs + * + * { + * "source_type": "forward", + * "source_name": "forward.0", + * "tag": "dummy.0", + * "records": [ + * { + * "timestamp": 1759591426808913765, + * "metadata": { + * "level": "info" + * }, + * "message": "dummy" + * }, + * { + * "timestamp": 1759591426908563348, + * "metadata": { + * "level": "debug", + * "service": "auth" + * }, + * "message": "dummy" + * } + * ] + * } + */ + + msgpack_pack_map(&tmp_pck, 4); + + /* source_type: internal type of the plugin */ + name = src_ins->p->name; + len = strlen(name); + + msgpack_pack_str(&tmp_pck, 11); + msgpack_pack_str_body(&tmp_pck, "source_type", 11); + msgpack_pack_str(&tmp_pck, len); + msgpack_pack_str_body(&tmp_pck, name, len); + + /* source_name: internal name or alias set by the user */ + name = (char *) flb_input_name(src_ins); + len = strlen(name); + msgpack_pack_str(&tmp_pck, 11); + msgpack_pack_str_body(&tmp_pck, "source_name", 11); + msgpack_pack_str(&tmp_pck, len); + msgpack_pack_str_body(&tmp_pck, name, len); + + /* tag */ + msgpack_pack_str(&tmp_pck, 3); + msgpack_pack_str_body(&tmp_pck, "tag", 3); + msgpack_pack_str(&tmp_pck, flb_sds_len(event_chunk->tag)); + msgpack_pack_str_body(&tmp_pck, event_chunk->tag, flb_sds_len(event_chunk->tag)); + + /* records */ + msgpack_pack_str(&tmp_pck, 7); + msgpack_pack_str_body(&tmp_pck, "records", 7); + + flb_mp_array_header_init(&mh, &tmp_pck); + while ((result = flb_log_event_decoder_next( &log_decoder, &log_event)) == FLB_EVENT_DECODER_SUCCESS) { + + flb_mp_array_header_append(&mh); + /* - * If the caller specified FLB_PACK_JSON_DATE_FLUENT, we format the data - * by using the following structure: - * - * [[TIMESTAMP, {"_tag": "...", ...MORE_METADATA}], {RECORD CONTENT}] + * [[TIMESTAMP, {"....": "...", ...MORE_METADATA}], {RECORD CONTENT}] */ msgpack_pack_array(&tmp_pck, 2); msgpack_pack_array(&tmp_pck, 2); msgpack_pack_uint64(&tmp_pck, flb_time_to_nanosec(&log_event.timestamp)); - /* add tag only */ - msgpack_pack_map(&tmp_pck, 1 + log_event.metadata->via.map.size); - - msgpack_pack_str(&tmp_pck, 4); - msgpack_pack_str_body(&tmp_pck, "_tag", 4); - - msgpack_pack_str(&tmp_pck, flb_sds_len(event_chunk->tag)); - msgpack_pack_str_body(&tmp_pck, event_chunk->tag, flb_sds_len(event_chunk->tag)); - - /* Append remaining keys/values */ - for (i = 0; - i < log_event.metadata->via.map.size; - i++) { - msgpack_pack_object(&tmp_pck, - log_event.metadata->via.map.ptr[i].key); - msgpack_pack_object(&tmp_pck, - log_event.metadata->via.map.ptr[i].val); - } + /* pack metadata */ + msgpack_pack_object(&tmp_pck, *log_event.metadata); /* pack the remaining content */ - msgpack_pack_map(&tmp_pck, log_event.body->via.map.size); - - /* Append remaining keys/values */ - for (i = 0; - i < log_event.body->via.map.size; - i++) { - msgpack_pack_object(&tmp_pck, - log_event.body->via.map.ptr[i].key); - msgpack_pack_object(&tmp_pck, - log_event.body->via.map.ptr[i].val); - } - - /* Concatenate by using break lines */ - out_js = flb_msgpack_raw_to_json_sds(tmp_sbuf.data, tmp_sbuf.size, - config->json_escape_unicode); - if (!out_js) { - flb_sds_destroy(out_buf); - msgpack_sbuffer_destroy(&tmp_sbuf); - flb_log_event_decoder_destroy(&log_decoder); - return NULL; - } - - /* - * One map record has been converted, now append it to the - * outgoing out_buf sds variable. - */ - flb_sds_cat_safe(&out_buf, out_js, flb_sds_len(out_js)); - flb_sds_cat_safe(&out_buf, "\n", 1); - - flb_sds_destroy(out_js); - msgpack_sbuffer_clear(&tmp_sbuf); + msgpack_pack_object(&tmp_pck, *log_event.body); } + flb_mp_array_header_end(&mh); + /* Release the unpacker */ flb_log_event_decoder_destroy(&log_decoder); + /* Convert the complete msgpack structure to JSON */ + out_js = flb_msgpack_raw_to_json_sds(tmp_sbuf.data, tmp_sbuf.size, + config->json_escape_unicode); + msgpack_sbuffer_destroy(&tmp_sbuf); - return out_buf; + /* append a newline */ + flb_sds_cat_safe(&out_js, "\n", 1); + + if (!out_js) { + flb_sds_destroy(out_buf); + return NULL; + } + + /* Replace out_buf with the complete JSON */ + flb_sds_destroy(out_buf); + return out_js; } static int logs_event_chunk_append(struct vivo_exporter *ctx, + struct flb_input_instance *src_ins, struct flb_event_chunk *event_chunk, struct flb_config *config) { @@ -138,8 +171,7 @@ static int logs_event_chunk_append(struct vivo_exporter *ctx, flb_sds_t json; struct vivo_stream_entry *entry; - - json = format_logs(event_chunk, config); + json = format_logs(src_ins, event_chunk, config); if (!json) { flb_plg_error(ctx->ins, "cannot convert logs chunk to JSON"); return -1; @@ -207,6 +239,7 @@ static int cb_vivo_init(struct flb_output_instance *ins, return -1; } ctx->ins = ins; + ctx->config = config; ret = flb_output_config_map_set(ins, (void *) ctx); if (ret == -1) { @@ -272,7 +305,7 @@ static void cb_vivo_flush(struct flb_event_chunk *event_chunk, } #endif if (event_chunk->type == FLB_EVENT_TYPE_LOGS) { - ret = logs_event_chunk_append(ctx, event_chunk, config); + ret = logs_event_chunk_append(ctx, ins, event_chunk, config); } else if (event_chunk->type == FLB_EVENT_TYPE_TRACES) { ret = metrics_traces_event_chunk_append(ctx, ctx->stream_traces, event_chunk, config); diff --git a/plugins/out_vivo_exporter/vivo.h b/plugins/out_vivo_exporter/vivo.h index 36d2cd4c8fe..8e012a8bf57 100644 --- a/plugins/out_vivo_exporter/vivo.h +++ b/plugins/out_vivo_exporter/vivo.h @@ -23,6 +23,8 @@ #include #include +struct flb_config; + #define VIVO_RING_BUFFER_SIZE 10 /* Plugin context */ @@ -40,6 +42,7 @@ struct vivo_exporter { /* instance context */ struct flb_output_instance *ins; + struct flb_config *config; }; #endif diff --git a/plugins/out_vivo_exporter/vivo_http.c b/plugins/out_vivo_exporter/vivo_http.c index ad3e7904995..8f20045b8c0 100644 --- a/plugins/out_vivo_exporter/vivo_http.c +++ b/plugins/out_vivo_exporter/vivo_http.c @@ -19,6 +19,11 @@ #include #include +#include +#include +#include + +#include #include "vivo.h" #include "vivo_http.h" @@ -64,14 +69,8 @@ static int stream_get_uri_properties(mk_request_t *request, return 0; } -static void headers_set(mk_request_t *request, struct vivo_stream *vs) +static void headers_set_common(struct vivo_exporter *ctx, mk_request_t *request) { - struct vivo_exporter *ctx; - - - /* parent context */ - ctx = vs->parent; - /* content type */ mk_http_header(request, VIVO_CONTENT_TYPE, sizeof(VIVO_CONTENT_TYPE) - 1, @@ -90,13 +89,25 @@ static void headers_set(mk_request_t *request, struct vivo_stream *vs) sizeof("Access-Control-Allow-Headers") - 1, "Origin, X-Requested-With, Content-Type, Accept", sizeof("Origin, X-Requested-With, Content-Type, Accept") - 1); + } +} +static void headers_set(mk_request_t *request, struct vivo_stream *vs) +{ + struct vivo_exporter *ctx; + + + /* parent context */ + ctx = vs->parent; + + headers_set_common(ctx, request); + + if (ctx->http_cors_allow_origin) { mk_http_header(request, "Access-Control-Expose-Headers", sizeof("Access-Control-Expose-Headers") - 1, "vivo-stream-start-id, vivo-stream-end-id", sizeof("vivo-stream-start-id, vivo-stream-end-id") - 1); - } } @@ -159,7 +170,7 @@ static void serve_content(mk_request_t *request, struct vivo_stream *vs) flb_sds_destroy(str_end); } -/* HTTP endpoint: /logs */ +/* HTTP endpoint: /api/v1/logs */ static void cb_logs(mk_request_t *request, void *data) { struct vivo_exporter *ctx; @@ -170,7 +181,7 @@ static void cb_logs(mk_request_t *request, void *data) mk_http_done(request); } -/* HTTP endpoint: /metrics */ +/* HTTP endpoint: /api/v1/metrics */ static void cb_metrics(mk_request_t *request, void *data) { struct vivo_exporter *ctx; @@ -181,6 +192,7 @@ static void cb_metrics(mk_request_t *request, void *data) mk_http_done(request); } +/* HTTP endpoint: /api/v1/traces */ static void cb_traces(mk_request_t *request, void *data) { struct vivo_exporter *ctx; @@ -191,6 +203,53 @@ static void cb_traces(mk_request_t *request, void *data) mk_http_done(request); } +/* HTTP endpoint: /api/v1/internal/metrics */ +static void cb_internal_metrics(mk_request_t *request, void *data) +{ + int ret; + char *mp_buf = NULL; + size_t mp_size = 0; + flb_sds_t json = NULL; + struct cmt *cmt = NULL; + struct vivo_exporter *ctx; + + ctx = (struct vivo_exporter *) data; + + cmt = flb_me_get_cmetrics(ctx->config); + if (!cmt) { + mk_http_status(request, 500); + mk_http_done(request); + return; + } + + ret = cmt_encode_msgpack_create(cmt, &mp_buf, &mp_size); + if (ret != 0) { + cmt_destroy(cmt); + mk_http_status(request, 500); + mk_http_done(request); + return; + } + + json = flb_msgpack_raw_to_json_sds(mp_buf, mp_size, + ctx->config->json_escape_unicode); + + cmt_encode_msgpack_destroy(mp_buf); + cmt_destroy(cmt); + + if (!json) { + mk_http_status(request, 500); + mk_http_done(request); + return; + } + + mk_http_status(request, 200); + headers_set_common(ctx, request); + mk_http_send(request, json, flb_sds_len(json), NULL); + mk_http_done(request); + + flb_sds_destroy(json); +} + /* HTTP endpoint: / (root) */ static void cb_root(mk_request_t *request, void *data) { @@ -236,9 +295,10 @@ struct vivo_http *vivo_http_server_create(struct vivo_exporter *ctx, ph->vid = vid; /* Set HTTP URI callbacks */ - mk_vhost_handler(ph->ctx, vid, "/logs", cb_logs, ctx); - mk_vhost_handler(ph->ctx, vid, "/metrics", cb_metrics, ctx); - mk_vhost_handler(ph->ctx, vid, "/traces", cb_traces, ctx); + mk_vhost_handler(ph->ctx, vid, "/api/v1/logs", cb_logs, ctx); + mk_vhost_handler(ph->ctx, vid, "/api/v1/metrics", cb_metrics, ctx); + mk_vhost_handler(ph->ctx, vid, "/api/v1/traces", cb_traces, ctx); + mk_vhost_handler(ph->ctx, vid, "/api/v1/internal/metrics", cb_internal_metrics, ctx); mk_vhost_handler(ph->ctx, vid, "/", cb_root, NULL); return ph; From d16acdfbc3d64142914b568254f3929b5df79392 Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Mon, 6 Oct 2025 12:11:28 -0600 Subject: [PATCH 2/3] out_vivo_exporter: add groups support Signed-off-by: Eduardo Silva --- plugins/out_vivo_exporter/vivo.c | 130 +++++++++++++++++++++++- plugins/out_vivo_exporter/vivo_http.c | 41 ++++---- plugins/out_vivo_exporter/vivo_http.h | 8 ++ plugins/out_vivo_exporter/vivo_stream.c | 21 +++- plugins/out_vivo_exporter/vivo_stream.h | 3 +- 5 files changed, 181 insertions(+), 22 deletions(-) diff --git a/plugins/out_vivo_exporter/vivo.c b/plugins/out_vivo_exporter/vivo.c index d1c279cbb5b..79312717117 100644 --- a/plugins/out_vivo_exporter/vivo.c +++ b/plugins/out_vivo_exporter/vivo.c @@ -23,11 +23,35 @@ #include #include #include +#include #include "vivo.h" #include "vivo_http.h" #include "vivo_stream.h" +static msgpack_object *find_map_value(msgpack_object *map, + const char *key, size_t key_len) +{ + size_t i; + + if (!map || map->type != MSGPACK_OBJECT_MAP) { + return NULL; + } + + for (i = 0; i < map->via.map.size; i++) { + if (map->via.map.ptr[i].key.type != MSGPACK_OBJECT_STR) { + continue; + } + + if (map->via.map.ptr[i].key.via.str.size == key_len && + strncmp(map->via.map.ptr[i].key.via.str.ptr, key, key_len) == 0) { + return &map->via.map.ptr[i].val; + } + } + + return NULL; +} + static flb_sds_t format_logs(struct flb_input_instance *src_ins, struct flb_event_chunk *event_chunk, struct flb_config *config) { @@ -38,9 +62,19 @@ static flb_sds_t format_logs(struct flb_input_instance *src_ins, flb_sds_t out_buf = NULL; msgpack_sbuffer tmp_sbuf; msgpack_packer tmp_pck; + int group_mismatch = FLB_FALSE; + int is_otlp = FLB_FALSE; struct flb_log_event log_event; struct flb_log_event_decoder log_decoder; struct flb_mp_map_header mh; + struct flb_mp_map_header root_map; + struct flb_mp_map_header otlp_map; + struct flb_mp_map_header group_map; + msgpack_object *group_metadata = NULL; + msgpack_object *group_attributes = NULL; + msgpack_object *schema_value = NULL; + msgpack_object *resource_value = NULL; + msgpack_object *scope_value = NULL; result = flb_log_event_decoder_init(&log_decoder, (char *) event_chunk->data, @@ -87,9 +121,10 @@ static flb_sds_t format_logs(struct flb_input_instance *src_ins, * } */ - msgpack_pack_map(&tmp_pck, 4); + flb_mp_map_header_init(&root_map, &tmp_pck); /* source_type: internal type of the plugin */ + flb_mp_map_header_append(&root_map); name = src_ins->p->name; len = strlen(name); @@ -99,6 +134,7 @@ static flb_sds_t format_logs(struct flb_input_instance *src_ins, msgpack_pack_str_body(&tmp_pck, name, len); /* source_name: internal name or alias set by the user */ + flb_mp_map_header_append(&root_map); name = (char *) flb_input_name(src_ins); len = strlen(name); msgpack_pack_str(&tmp_pck, 11); @@ -107,12 +143,14 @@ static flb_sds_t format_logs(struct flb_input_instance *src_ins, msgpack_pack_str_body(&tmp_pck, name, len); /* tag */ + flb_mp_map_header_append(&root_map); msgpack_pack_str(&tmp_pck, 3); msgpack_pack_str_body(&tmp_pck, "tag", 3); msgpack_pack_str(&tmp_pck, flb_sds_len(event_chunk->tag)); msgpack_pack_str_body(&tmp_pck, event_chunk->tag, flb_sds_len(event_chunk->tag)); /* records */ + flb_mp_map_header_append(&root_map); msgpack_pack_str(&tmp_pck, 7); msgpack_pack_str_body(&tmp_pck, "records", 7); @@ -122,6 +160,24 @@ static flb_sds_t format_logs(struct flb_input_instance *src_ins, &log_decoder, &log_event)) == FLB_EVENT_DECODER_SUCCESS) { + if (log_event.group_metadata != NULL) { + if (group_metadata == NULL) { + group_metadata = log_event.group_metadata; + } + else if (group_metadata != log_event.group_metadata) { + group_mismatch = FLB_TRUE; + } + } + + if (log_event.group_attributes != NULL) { + if (group_attributes == NULL) { + group_attributes = log_event.group_attributes; + } + else if (group_attributes != log_event.group_attributes) { + group_mismatch = FLB_TRUE; + } + } + flb_mp_array_header_append(&mh); /* @@ -140,6 +196,78 @@ static flb_sds_t format_logs(struct flb_input_instance *src_ins, flb_mp_array_header_end(&mh); + if (group_mismatch == FLB_FALSE && + (group_metadata != NULL || group_attributes != NULL)) { + if (group_metadata != NULL) { + schema_value = find_map_value(group_metadata, "schema", 6); + } + + if (schema_value && + schema_value->type == MSGPACK_OBJECT_STR && + schema_value->via.str.size == 4 && + strncmp(schema_value->via.str.ptr, "otlp", 4) == 0) { + is_otlp = FLB_TRUE; + } + + if (is_otlp == FLB_TRUE) { + resource_value = NULL; + scope_value = NULL; + + if (group_attributes != NULL && + group_attributes->type == MSGPACK_OBJECT_MAP) { + resource_value = find_map_value(group_attributes, "resource", 8); + scope_value = find_map_value(group_attributes, "scope", 5); + } + + flb_mp_map_header_append(&root_map); + msgpack_pack_str(&tmp_pck, 4); + msgpack_pack_str_body(&tmp_pck, "otlp", 4); + + flb_mp_map_header_init(&otlp_map, &tmp_pck); + + if (resource_value != NULL) { + flb_mp_map_header_append(&otlp_map); + msgpack_pack_str(&tmp_pck, 8); + msgpack_pack_str_body(&tmp_pck, "resource", 8); + msgpack_pack_object(&tmp_pck, *resource_value); + } + + if (scope_value != NULL) { + flb_mp_map_header_append(&otlp_map); + msgpack_pack_str(&tmp_pck, 5); + msgpack_pack_str_body(&tmp_pck, "scope", 5); + msgpack_pack_object(&tmp_pck, *scope_value); + } + + flb_mp_map_header_end(&otlp_map); + } + else { + flb_mp_map_header_append(&root_map); + msgpack_pack_str(&tmp_pck, 9); + msgpack_pack_str_body(&tmp_pck, "flb_group", 9); + + flb_mp_map_header_init(&group_map, &tmp_pck); + + if (group_metadata != NULL) { + flb_mp_map_header_append(&group_map); + msgpack_pack_str(&tmp_pck, 8); + msgpack_pack_str_body(&tmp_pck, "metadata", 8); + msgpack_pack_object(&tmp_pck, *group_metadata); + } + + if (group_attributes != NULL) { + flb_mp_map_header_append(&group_map); + msgpack_pack_str(&tmp_pck, 4); + msgpack_pack_str_body(&tmp_pck, "body", 4); + msgpack_pack_object(&tmp_pck, *group_attributes); + } + + flb_mp_map_header_end(&group_map); + } + } + + flb_mp_map_header_end(&root_map); + /* Release the unpacker */ flb_log_event_decoder_destroy(&log_decoder); diff --git a/plugins/out_vivo_exporter/vivo_http.c b/plugins/out_vivo_exporter/vivo_http.c index 8f20045b8c0..d3d9e65c565 100644 --- a/plugins/out_vivo_exporter/vivo_http.c +++ b/plugins/out_vivo_exporter/vivo_http.c @@ -31,9 +31,6 @@ #define VIVO_CONTENT_TYPE "Content-Type" #define VIVO_CONTENT_TYPE_JSON "application/json" -#define VIVO_STREAM_START_ID "Vivo-Stream-Start-ID" -#define VIVO_STREAM_END_ID "Vivo-Stream-End-ID" - static int stream_get_uri_properties(mk_request_t *request, int64_t *from, int64_t *to, int64_t *limit) { @@ -106,21 +103,23 @@ static void headers_set(mk_request_t *request, struct vivo_stream *vs) mk_http_header(request, "Access-Control-Expose-Headers", sizeof("Access-Control-Expose-Headers") - 1, - "vivo-stream-start-id, vivo-stream-end-id", - sizeof("vivo-stream-start-id, vivo-stream-end-id") - 1); + "vivo-stream-start-id, vivo-stream-end-id, vivo-stream-next-id", + sizeof("vivo-stream-start-id, vivo-stream-end-id, vivo-stream-next-id") - 1); } } -static void serve_content(mk_request_t *request, struct vivo_stream *vs) +void vivo_http_serve_content(mk_request_t *request, struct vivo_stream *vs) { int64_t from = -1; int64_t to = -1; int64_t limit = -1; int64_t stream_start_id = -1; int64_t stream_end_id = -1; + int64_t stream_next_id = -1; flb_sds_t payload; flb_sds_t str_start; flb_sds_t str_end; + flb_sds_t str_next; if (request->query_string.len > 0) { @@ -128,24 +127,31 @@ static void serve_content(mk_request_t *request, struct vivo_stream *vs) } payload = vivo_stream_get_content(vs, from, to, limit, - &stream_start_id, &stream_end_id); + &stream_start_id, &stream_end_id, + &stream_next_id); if (!payload) { mk_http_status(request, 500); return; } - if (flb_sds_len(payload) == 0) { - mk_http_status(request, 200); - headers_set(request, vs); - flb_sds_destroy(payload); - return; - } - mk_http_status(request, 200); /* set response headers */ headers_set(request, vs); + str_next = flb_sds_create_size(32); + flb_sds_printf(&str_next, "%" PRId64, stream_next_id); + + mk_http_header(request, + VIVO_STREAM_NEXT_ID, sizeof(VIVO_STREAM_NEXT_ID) - 1, + str_next, flb_sds_len(str_next)); + + if (flb_sds_len(payload) == 0) { + flb_sds_destroy(payload); + flb_sds_destroy(str_next); + return; + } + /* stream ids served: compose buffer and set headers */ str_start = flb_sds_create_size(32); flb_sds_printf(&str_start, "%" PRId64, stream_start_id); @@ -168,6 +174,7 @@ static void serve_content(mk_request_t *request, struct vivo_stream *vs) flb_sds_destroy(payload); flb_sds_destroy(str_start); flb_sds_destroy(str_end); + flb_sds_destroy(str_next); } /* HTTP endpoint: /api/v1/logs */ @@ -177,7 +184,7 @@ static void cb_logs(mk_request_t *request, void *data) ctx = (struct vivo_exporter *) data; - serve_content(request, ctx->stream_logs); + vivo_http_serve_content(request, ctx->stream_logs); mk_http_done(request); } @@ -188,7 +195,7 @@ static void cb_metrics(mk_request_t *request, void *data) ctx = (struct vivo_exporter *) data; - serve_content(request, ctx->stream_metrics); + vivo_http_serve_content(request, ctx->stream_metrics); mk_http_done(request); } @@ -199,7 +206,7 @@ static void cb_traces(mk_request_t *request, void *data) ctx = (struct vivo_exporter *) data; - serve_content(request, ctx->stream_traces); + vivo_http_serve_content(request, ctx->stream_traces); mk_http_done(request); } diff --git a/plugins/out_vivo_exporter/vivo_http.h b/plugins/out_vivo_exporter/vivo_http.h index 3d589ec6e04..7e8a0ee63e5 100644 --- a/plugins/out_vivo_exporter/vivo_http.h +++ b/plugins/out_vivo_exporter/vivo_http.h @@ -25,6 +25,12 @@ #include "vivo.h" +#define VIVO_STREAM_START_ID "Vivo-Stream-Start-ID" +#define VIVO_STREAM_END_ID "Vivo-Stream-End-ID" +#define VIVO_STREAM_NEXT_ID "Vivo-Stream-Next-ID" + +struct vivo_stream; + /* HTTP response payload received through a Message Queue */ struct vivo_http_buf { int users; @@ -53,4 +59,6 @@ int vivo_http_server_stop(struct vivo_http *ph); int vivo_http_server_mq_push_metrics(struct vivo_http *ph, void *data, size_t size); +void vivo_http_serve_content(mk_request_t *request, struct vivo_stream *vs); + #endif diff --git a/plugins/out_vivo_exporter/vivo_stream.c b/plugins/out_vivo_exporter/vivo_stream.c index a878a2cf5f8..8f2dd2f9339 100644 --- a/plugins/out_vivo_exporter/vivo_stream.c +++ b/plugins/out_vivo_exporter/vivo_stream.c @@ -136,7 +136,8 @@ void vivo_stream_destroy(struct vivo_stream *vs) flb_sds_t vivo_stream_get_content(struct vivo_stream *vs, int64_t from, int64_t to, int64_t limit, - int64_t *stream_start_id, int64_t *stream_end_id) + int64_t *stream_start_id, int64_t *stream_end_id, + int64_t *stream_next_id) { int64_t count = 0; flb_sds_t buf; @@ -151,6 +152,18 @@ flb_sds_t vivo_stream_get_content(struct vivo_stream *vs, int64_t from, int64_t stream_lock(vs); + if (stream_start_id) { + *stream_start_id = -1; + } + + if (stream_end_id) { + *stream_end_id = -1; + } + + if (stream_next_id) { + *stream_next_id = vs->entries_added; + } + mk_list_foreach(head, &vs->entries) { e = mk_list_entry(head, struct vivo_stream_entry, _head); @@ -162,13 +175,15 @@ flb_sds_t vivo_stream_get_content(struct vivo_stream *vs, int64_t from, int64_t break; } - if (count == 0) { + if (count == 0 && stream_start_id) { *stream_start_id = e->id; } flb_sds_cat_safe(&buf, e->data, flb_sds_len(e->data)); - *stream_end_id = e->id; + if (stream_end_id) { + *stream_end_id = e->id; + } count++; if (limit > 0 && count >= limit) { diff --git a/plugins/out_vivo_exporter/vivo_stream.h b/plugins/out_vivo_exporter/vivo_stream.h index c0f24f57334..1c1341f154e 100644 --- a/plugins/out_vivo_exporter/vivo_stream.h +++ b/plugins/out_vivo_exporter/vivo_stream.h @@ -54,6 +54,7 @@ struct vivo_stream_entry *vivo_stream_append(struct vivo_stream *vs, void *data, size_t size); flb_sds_t vivo_stream_get_content(struct vivo_stream *vs, int64_t from, int64_t to, int64_t limit, - int64_t *stream_start_id, int64_t *stream_end_id); + int64_t *stream_start_id, int64_t *stream_end_id, + int64_t *stream_next_id); #endif From 20db128f8838f8c92fb64f1eddad2c5518d4eee2 Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Fri, 31 Oct 2025 20:39:11 -0600 Subject: [PATCH 3/3] out_vivo_exporter: fix regression on buffer check Signed-off-by: Eduardo Silva --- plugins/out_vivo_exporter/vivo.c | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/plugins/out_vivo_exporter/vivo.c b/plugins/out_vivo_exporter/vivo.c index 79312717117..b677dcdee65 100644 --- a/plugins/out_vivo_exporter/vivo.c +++ b/plugins/out_vivo_exporter/vivo.c @@ -277,14 +277,18 @@ static flb_sds_t format_logs(struct flb_input_instance *src_ins, msgpack_sbuffer_destroy(&tmp_sbuf); - /* append a newline */ - flb_sds_cat_safe(&out_js, "\n", 1); - if (!out_js) { flb_sds_destroy(out_buf); return NULL; } + /* append a newline */ + if (flb_sds_cat_safe(&out_js, "\n", 1) < 0) { + flb_sds_destroy(out_js); + flb_sds_destroy(out_buf); + return NULL; + } + /* Replace out_buf with the complete JSON */ flb_sds_destroy(out_buf); return out_js;