diff --git a/lib/cmetrics/CMakeLists.txt b/lib/cmetrics/CMakeLists.txt index 14a3e3f5bfd..9c02b3ad810 100644 --- a/lib/cmetrics/CMakeLists.txt +++ b/lib/cmetrics/CMakeLists.txt @@ -6,7 +6,7 @@ set(CMAKE_EXPORT_COMPILE_COMMANDS ON) # CMetrics Version set(CMT_VERSION_MAJOR 2) set(CMT_VERSION_MINOR 1) -set(CMT_VERSION_PATCH 3) +set(CMT_VERSION_PATCH 4) set(CMT_VERSION_STR "${CMT_VERSION_MAJOR}.${CMT_VERSION_MINOR}.${CMT_VERSION_PATCH}") # Include helpers diff --git a/lib/cmetrics/src/cmt_decode_prometheus.c b/lib/cmetrics/src/cmt_decode_prometheus.c index 32b34babe55..67e3572b53f 100644 --- a/lib/cmetrics/src/cmt_decode_prometheus.c +++ b/lib/cmetrics/src/cmt_decode_prometheus.c @@ -124,14 +124,16 @@ int cmt_decode_prometheus_create( result = cmt_decode_prometheus_parse(scanner, &context); - if (result == 0) { + if (context.errcode) { + result = context.errcode; + cmt_destroy(cmt); + reset_context(&context, true); + } + else if (result == 0) { *out_cmt = cmt; } else { cmt_destroy(cmt); - if (context.errcode) { - result = context.errcode; - } reset_context(&context, true); } diff --git a/lib/cmetrics/src/cmt_decode_prometheus.l b/lib/cmetrics/src/cmt_decode_prometheus.l index 8055f52deee..d3fc013a198 100644 --- a/lib/cmetrics/src/cmt_decode_prometheus.l +++ b/lib/cmetrics/src/cmt_decode_prometheus.l @@ -7,11 +7,79 @@ %{ #include +#include #define STRBUF_RET \ yylval->str = context->strbuf; \ context->strbuf = NULL +static void set_allocation_error(struct cmt_decode_prometheus_context *context) +{ + context->errcode = CMT_DECODE_PROMETHEUS_ALLOCATION_ERROR; + + if (context->opts.errbuf != NULL && context->opts.errbuf_size > 0) { + snprintf(context->opts.errbuf, + context->opts.errbuf_size - 1, + "memory allocation failed"); + } +} + +static int reset_strbuf(struct cmt_decode_prometheus_context *context) +{ + if (context->strbuf != NULL) { + cfl_sds_destroy(context->strbuf); + } + + context->strbuf = cfl_sds_create_size(256); + if (context->strbuf == NULL) { + set_allocation_error(context); + + return -1; + } + + return 0; +} + +static int append_strbuf(struct cmt_decode_prometheus_context *context, + const char *text, int length) +{ + cfl_sds_t result; + + result = cfl_sds_cat(context->strbuf, text, length); + if (result == NULL) { + set_allocation_error(context); + + return -1; + } + + context->strbuf = result; + + return 0; +} + +#define STRBUF_CREATE() \ + do { \ + if (reset_strbuf(context) != 0) { \ + return 0; \ + } \ + } while (0) + +#define STRBUF_APPEND(text, length) \ + do { \ + if (append_strbuf(context, (text), (length)) != 0) { \ + return 0; \ + } \ + } while (0) + +#define SET_STR_TOKEN() \ + do { \ + yylval->str = cfl_sds_create(yytext); \ + if (yylval->str == NULL) { \ + set_allocation_error(context); \ + return 0; \ + } \ + } while (0) + %} /* here we define some states that allow us to create rules only @@ -74,7 +142,7 @@ [^ \t]+ { // The next token will be the metric name - yylval->str = cfl_sds_create(yytext); + SET_STR_TOKEN(); return YYSTATE == HELPTAG ? HELP : TYPE; } @@ -86,7 +154,7 @@ // separate start condition for this to handle "\\" and "\n" escapes // more easily. BEGIN(INHELPTAG); - context->strbuf = cfl_sds_create_size(256); + STRBUF_CREATE(); } else { // For TYPETAG we enter INTYPETAG start condition to check only valid @@ -107,17 +175,17 @@ \\n { // Process linefeed escape sequence - context->strbuf = cfl_sds_cat(context->strbuf, "\n", 1); + STRBUF_APPEND("\n", 1); } \\\\ { // Process backslack escape sequence - context->strbuf = cfl_sds_cat(context->strbuf, "\\", 1); + STRBUF_APPEND("\\", 1); } [^\r\n\\]+ { // Put everything that is not a backslash or a line feed into strbuf - context->strbuf = cfl_sds_cat(context->strbuf, yytext, yyleng); + STRBUF_APPEND(yytext, yyleng); } counter { @@ -146,26 +214,23 @@ ["] { BEGIN(INQUOTE); - if (context->strbuf != NULL) { - cfl_sds_destroy(context->strbuf); - } - context->strbuf = cfl_sds_create_size(256); + STRBUF_CREATE(); } [\\]["] { - context->strbuf = cfl_sds_cat(context->strbuf, "\"", 1); + STRBUF_APPEND("\"", 1); } \\n { - context->strbuf = cfl_sds_cat(context->strbuf, "\n", 1); + STRBUF_APPEND("\n", 1); } \\\\ { - context->strbuf = cfl_sds_cat(context->strbuf, "\\", 1); + STRBUF_APPEND("\\", 1); } [^\r\n\\"]+ { - context->strbuf = cfl_sds_cat(context->strbuf, yytext, yyleng); + STRBUF_APPEND(yytext, yyleng); } ["] { @@ -180,7 +245,7 @@ } [a-zA-Z_][a-zA-Z_0-9]* { - yylval->str = cfl_sds_create(yytext); + SET_STR_TOKEN(); return IDENTIFIER; } diff --git a/plugins/out_kafka/kafka.c b/plugins/out_kafka/kafka.c index a406ac0c97f..6b8d9b080ae 100644 --- a/plugins/out_kafka/kafka.c +++ b/plugins/out_kafka/kafka.c @@ -308,6 +308,7 @@ int produce_message(struct flb_time *tm, msgpack_object *map, flb_info("[out_kafka] new topic added: %s", dynamic_topic); } } + flb_utils_split_free(topics); flb_free(dynamic_topic); } } diff --git a/plugins/out_opentelemetry/opentelemetry.c b/plugins/out_opentelemetry/opentelemetry.c index 0b795215eab..50bd7e05edc 100644 --- a/plugins/out_opentelemetry/opentelemetry.c +++ b/plugins/out_opentelemetry/opentelemetry.c @@ -585,14 +585,10 @@ int opentelemetry_post(struct opentelemetry_context *ctx, if (result != 0) { flb_plg_error(ctx->ins, "error setting http authorization data"); + flb_http_client_request_destroy(request, FLB_TRUE); return FLB_RETRY; } - - flb_http_request_set_authorization(request, - HTTP_WWW_AUTHORIZATION_SCHEME_BASIC, - ctx->http_user, - ctx->http_passwd); } #ifdef FLB_HAVE_SIGNV4 diff --git a/src/flb_http_client.c b/src/flb_http_client.c index 5eb99f2e7e7..3b55b86f6ad 100644 --- a/src/flb_http_client.c +++ b/src/flb_http_client.c @@ -2899,6 +2899,9 @@ static int flb_http_encode_basic_auth_value(cfl_sds_t *output_buffer, *output_buffer = sds_result; } else { + cfl_sds_destroy(*output_buffer); + *output_buffer = NULL; + result = -1; } } @@ -2913,7 +2916,7 @@ static int flb_http_encode_basic_auth_value(cfl_sds_t *output_buffer, cfl_sds_destroy(encoded_value); cfl_sds_destroy(raw_value); - return 0; + return result; } static int flb_http_encode_bearer_auth_value(cfl_sds_t *output_buffer, @@ -3082,34 +3085,59 @@ int flb_http_request_set_parameters_internal( if (value_type == FLB_HTTP_CLIENT_ARGUMENT_TYPE_METHOD) { method = va_arg(arguments, size_t); - flb_http_request_set_method(request, (int) method); + result = flb_http_request_set_method(request, (int) method); + if (result != 0) { + flb_debug("http request method error"); + + failure_detected = FLB_TRUE; + } } else if (value_type == FLB_HTTP_CLIENT_ARGUMENT_TYPE_HOST) { host = va_arg(arguments, char *); - flb_http_request_set_host(request, host); + result = flb_http_request_set_host(request, host); + if (result != 0) { + flb_debug("http request host error"); + + failure_detected = FLB_TRUE; + } } else if (value_type == FLB_HTTP_CLIENT_ARGUMENT_TYPE_URL) { url = va_arg(arguments, char *); - flb_http_request_set_url(request, url); + result = flb_http_request_set_url(request, url); + if (result != 0) { + flb_debug("http request URL error"); + + failure_detected = FLB_TRUE; + } } else if (value_type == FLB_HTTP_CLIENT_ARGUMENT_TYPE_URI) { uri = va_arg(arguments, char *); - flb_http_request_set_uri(request, uri); + result = flb_http_request_set_uri(request, uri); + if (result != 0) { + flb_debug("http request URI error"); + + failure_detected = FLB_TRUE; + } } else if (value_type == FLB_HTTP_CLIENT_ARGUMENT_TYPE_USER_AGENT) { user_agent = va_arg(arguments, char *); - flb_http_request_set_user_agent(request, user_agent); + result = flb_http_request_set_user_agent(request, user_agent); + if (result != 0) { + flb_debug("http request user agent error"); + + failure_detected = FLB_TRUE; + } } else if (value_type == FLB_HTTP_CLIENT_ARGUMENT_TYPE_CONTENT_TYPE) { content_type = va_arg(arguments, char *); result = flb_http_request_set_content_type(request, content_type); - if (request == NULL) { + if (result != 0) { flb_debug("http request : error setting content type"); failure_detected = FLB_TRUE; @@ -3125,8 +3153,8 @@ int flb_http_request_set_parameters_internal( body_len, compression_algorithm); - if (request == NULL) { - flb_debug("http request creation error"); + if (result != 0) { + flb_debug("http request body error"); failure_detected = FLB_TRUE; } @@ -3188,17 +3216,31 @@ int flb_http_request_set_parameters_internal( username = va_arg(arguments, char *); password = va_arg(arguments, char *); - flb_http_request_set_authorization(request, - HTTP_WWW_AUTHORIZATION_SCHEME_BASIC, - username, - password); + result = flb_http_request_set_authorization( + request, + HTTP_WWW_AUTHORIZATION_SCHEME_BASIC, + username, + password); + + if (result != 0) { + flb_debug("http request basic authorization error"); + + failure_detected = FLB_TRUE; + } } else if (value_type == FLB_HTTP_CLIENT_ARGUMENT_TYPE_AUTH_BEARER_TOKEN) { bearer_token = va_arg(arguments, char *); - flb_http_request_set_authorization(request, - HTTP_WWW_AUTHORIZATION_SCHEME_BEARER, - bearer_token); + result = flb_http_request_set_authorization( + request, + HTTP_WWW_AUTHORIZATION_SCHEME_BEARER, + bearer_token); + + if (result != 0) { + flb_debug("http request bearer authorization error"); + + failure_detected = FLB_TRUE; + } } else if (value_type == FLB_HTTP_CLIENT_ARGUMENT_TYPE_AUTH_SIGNV4) { aws_region = va_arg(arguments, char *); @@ -3209,6 +3251,12 @@ int flb_http_request_set_parameters_internal( aws_region, aws_service, aws_provider); + + if (result != 0) { + flb_debug("http request signv4 authorization error"); + + failure_detected = FLB_TRUE; + } } } while (!failure_detected && value_type != FLB_HTTP_CLIENT_ARGUMENT_TYPE_TERMINATOR); diff --git a/src/fluent-bit.c b/src/fluent-bit.c index fd4d3103b93..14e5f5682a4 100644 --- a/src/fluent-bit.c +++ b/src/fluent-bit.c @@ -1030,7 +1030,7 @@ static int flb_main_run(int argc, char **argv) #ifdef FLB_HAVE_CHUNK_TRACE char *trace_input = NULL; - char *trace_output = flb_strdup("stdout"); + char *trace_output = NULL; struct mk_list *trace_props = NULL; #endif @@ -1365,6 +1365,7 @@ static int flb_main_run(int argc, char **argv) if (access(cfg_file, R_OK) != 0) { flb_free(cfg_file); flb_cf_destroy(cf_opts); + flb_destroy(ctx); flb_utils_error(FLB_ERR_CFG_FILE); } } @@ -1372,6 +1373,7 @@ static int flb_main_run(int argc, char **argv) if (flb_reload_reconstruct_cf(cf_opts, cf) != 0) { flb_free(cfg_file); flb_cf_destroy(cf_opts); + flb_destroy(ctx); fprintf(stderr, "reconstruct format context is failed\n"); exit(EXIT_FAILURE); } @@ -1381,12 +1383,14 @@ static int flb_main_run(int argc, char **argv) flb_free(cfg_file); if (!tmp) { flb_cf_destroy(cf_opts); + flb_destroy(ctx); flb_utils_error(FLB_ERR_CFG_FILE_STOP); } #else tmp = service_configure(cf, config, "fluent-bit.conf"); if (!tmp) { flb_cf_destroy(cf_opts); + flb_destroy(ctx); flb_utils_error(FLB_ERR_CFG_FILE_STOP); } @@ -1475,7 +1479,8 @@ static int flb_main_run(int argc, char **argv) #ifdef FLB_HAVE_CHUNK_TRACE if (trace_input != NULL) { - enable_trace_input(ctx, trace_input, NULL /* prefix ... */, trace_output, trace_props); + enable_trace_input(ctx, trace_input, NULL /* prefix ... */, + trace_output ? trace_output : "stdout", trace_props); } #endif diff --git a/src/http_server/flb_http_server_http1.c b/src/http_server/flb_http_server_http1.c index a84a9683628..a8e35b81df9 100644 --- a/src/http_server/flb_http_server_http1.c +++ b/src/http_server/flb_http_server_http1.c @@ -420,6 +420,8 @@ int flb_http1_response_commit(struct flb_http_response *response) return -9; } + response_buffer = sds_result; + if (response->body != NULL) { sds_result = cfl_sds_cat(response_buffer, response->body, diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index a67afaaa66a..2d820825e46 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -19,6 +19,8 @@ import logging import pytest +from utils.test_service import stop_active_services + # Configure logging def configure_logging(): logger = logging.getLogger(__name__) @@ -52,6 +54,10 @@ def pytest_sessionstart(session): #flb = FluentBitManager(GLOBAL_CONFIG['fluent_bit']['config_path']) #flb = FluentBitManager() +@pytest.hookimpl(trylast=True) +def pytest_runtest_teardown(item, nextitem): + stop_active_services() + @pytest.hookimpl(trylast=True) def pytest_sessionfinish(session, exitstatus): pass #logger.info("Finishing pytest session") diff --git a/tests/integration/scenarios/out_http/tests/test_out_http_001.py b/tests/integration/scenarios/out_http/tests/test_out_http_001.py index 01dea075922..21c963643b8 100644 --- a/tests/integration/scenarios/out_http/tests/test_out_http_001.py +++ b/tests/integration/scenarios/out_http/tests/test_out_http_001.py @@ -16,6 +16,13 @@ logger = logging.getLogger(__name__) +def _valgrind_timeout(timeout): + if os.environ.get("VALGRIND"): + return max(timeout * 3, 30) + + return timeout + + def _wait_for_http_server(port, timeout=5): deadline = time.time() + timeout @@ -128,7 +135,7 @@ def stop(self): def wait_for_requests(self, minimum_count, timeout=10): return self.service.wait_for_condition( lambda: data_storage["requests"] if len(data_storage["requests"]) >= minimum_count else None, - timeout=timeout, + timeout=_valgrind_timeout(timeout), interval=0.5, description=f"{minimum_count} outbound HTTP requests", ) @@ -148,7 +155,7 @@ def _read_log(): return self.service.wait_for_condition( _read_log, - timeout=timeout, + timeout=_valgrind_timeout(timeout), interval=0.25, description=f"log message '{pattern}'", ) diff --git a/tests/integration/src/utils/test_service.py b/tests/integration/src/utils/test_service.py index aab3965b5bb..35fd4c662b1 100644 --- a/tests/integration/src/utils/test_service.py +++ b/tests/integration/src/utils/test_service.py @@ -7,6 +7,22 @@ from utils.network import find_available_port +_active_services = set() + + +def stop_active_services(): + errors = [] + + for service in list(_active_services): + try: + service.stop() + except Exception as exc: + errors.append(exc) + + if errors: + raise errors[0] + + class FluentBitTestService: def __init__( self, @@ -69,20 +85,29 @@ def start(self): for key, value in self.extra_env.items(): self._set_env(key, str(value)) - if self.pre_start: - self.pre_start(self) + _active_services.add(self) - self.flb.start() + try: + if self.pre_start: + self.pre_start(self) + + self.flb.start() + except Exception: + self.stop() + raise def stop(self): + had_allocated_ports = bool(self._allocated_ports) + try: if self.flb: self.flb.stop() finally: - if self.post_stop: + if self.post_stop and had_allocated_ports: self.post_stop(self) self._restore_env() self._allocated_ports.clear() + _active_services.discard(self) def wait_for_http_endpoint(self, url, *, timeout=10, interval=0.5): deadline = time.time() + timeout