Skip to content

http_server: spawn internal server in threaded mode#11811

Open
edsiper wants to merge 3 commits into
masterfrom
http-internal-threaded
Open

http_server: spawn internal server in threaded mode#11811
edsiper wants to merge 3 commits into
masterfrom
http-internal-threaded

Conversation

@edsiper
Copy link
Copy Markdown
Member

@edsiper edsiper commented May 16, 2026


Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.

Summary by CodeRabbit

  • Bug Fixes

    • Fixed HTTP response header handling in the internal HTTP server
  • Improvements

    • Improved thread-safety and synchronization for HTTP server cached data and health metrics
    • Safer acquisition and lifecycle management of metrics and storage payloads to prevent races
    • Enhanced stability for concurrent metrics, storage and health-check requests
  • Tests

    • Added integration test validating HTTP server responsiveness under concurrent load

Review Change Stack

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented May 16, 2026

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 78abe450-8e4d-4bcc-b304-4520de10435c

📥 Commits

Reviewing files that changed from the base of the PR and between 7e48b19 and 1a91745.

📒 Files selected for processing (2)
  • include/fluent-bit/http_server/flb_hs.h
  • src/http_server/flb_hs.c
🚧 Files skipped from review as they are similar to previous changes (2)
  • include/fluent-bit/http_server/flb_hs.h
  • src/http_server/flb_hs.c

📝 Walkthrough

Walkthrough

Adds POSIX mutexes and a new flb_hs_buf_acquire API to make HTTP server cached buffers and health metrics thread-safe; refactors ingestion to use locked buffer set/acquire/release, updates endpoints to rely on acquisition instead of manual refcounts, and adds an integration test exercising concurrent requests.

Changes

HTTP Server Thread-Safety Implementation

Layer / File(s) Summary
Public API contracts and struct extensions
include/fluent-bit/http_server/flb_hs.h
Adds fluent-bit/flb_pthread.h, pthread_mutex_t lock to struct flb_hs_buf, pthread_mutex_t health_metrics_lock to struct flb_hs, and declares flb_hs_buf_acquire.
Mutex-protected buffer lifecycle operations
src/http_server/flb_hs.c
Implements flb_hs_buf_init and refactors flb_hs_buf_destroy, flb_hs_buf_set, flb_hs_buf_acquire, flb_hs_buf_release to coordinate users/pending_free under a buffer mutex.
State initialization and centralized cleanup
src/http_server/flb_hs.c
Adds flb_hs_state_init / flb_hs_state_destroy to initialize/destroy buffer mutexes and centralize health_metrics cleanup; integrates these into flb_hs_create/flb_hs_destroy and adjusts HTTP server options.
Health metrics thread-safety
src/http_server/api/v1/health.c, src/http_server/flb_hs.c
Guards health counter and health_metrics list with hs->health_metrics_lock in both read and write paths.
Metrics ingestion via unified buffer API
src/http_server/flb_hs.c
Refactors pipeline, metrics-v2, and storage metrics ingestion to call flb_hs_buf_set and handle allocation / decoded-state cleanup on failure.
Endpoint consumers using synchronized buffer acquisition
src/http_server/api/v1/metrics.c, src/http_server/api/v1/storage.c, src/http_server/api/v2/metrics.c
Endpoints obtain buffers with flb_hs_buf_acquire(...), remove manual buf->users++ increments, and continue to release buffers with flb_hs_buf_release.
HTTP response buffer assignment fix
src/http_server/flb_http_server_http1.c
Assigns response_buffer to the SDS result of cfl_sds_cat(...) so later concatenations use the updated buffer.
Integration test for concurrent HTTP server responsiveness
tests/integration/scenarios/internal_http_server/config/internal_http_server_exec_deadlock.yaml, tests/integration/scenarios/internal_http_server/tests/test_internal_http_server_exec_deadlock_001.py
Adds scenario config and an integration test that polls multiple internal endpoints serially and concurrently to validate the server remains responsive.

Sequence Diagram

sequenceDiagram
  participant Client as HTTP Client
  participant Endpoint as Metrics Endpoint
  participant Acquire as flb_hs_buf_acquire
  participant BufLock as buffer mutex
  participant Buffer as flb_hs_buf
  participant Release as flb_hs_buf_release
  
  Client->>Endpoint: GET /api/v1/metrics
  Endpoint->>Acquire: call flb_hs_buf_acquire(buf, require_data, require_raw_data)
  Acquire->>BufLock: pthread_mutex_lock(buffer->lock)
  BufLock->>Buffer: validate data/raw_data and increment users
  BufLock->>Acquire: pthread_mutex_unlock(buffer->lock)
  Acquire-->>Endpoint: return buffer or NULL
  Endpoint->>Endpoint: encode response using buffer->data / raw_data
  Endpoint->>Release: flb_hs_buf_release(buffer, NULL)
  Release->>BufLock: pthread_mutex_lock(buffer->lock)
  BufLock->>Buffer: decrement users and perform deferred cleanup if needed
  BufLock->>Release: pthread_mutex_unlock(buffer->lock)
  Endpoint-->>Client: 200 OK with payload
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Suggested reviewers

  • cosmo0920
  • patrick-stephens
  • niedbalski

Poem

A rabbit hops through mutexes three,
Guarding metrics for all to see,
Buffers locked and threads set free,
Acquire the data, hold it tight,
Release at dusk and sleep tonight. 🐇🔐

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 56.25% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title clearly and concisely describes the main change: running the internal HTTP server in threaded (worker thread) mode, which aligns with the PR's primary objective to spawn the internal server in threaded mode.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch http-internal-threaded

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 11dc0c62b9

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment thread src/http_server/flb_hs.c
Comment on lines +179 to +182
if (buffer->users > 0) {
buffer->pending_free = FLB_TRUE;
pthread_mutex_unlock(&buffer->lock);
return -1;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Preserve metrics snapshots during concurrent scrapes

With the monitoring server now running in its own thread, this branch is hit whenever a metrics/storage push overlaps a request that has acquired the current buffer. The new snapshot is rejected and freed, while pending_free causes the reader's later release to clean the old buffer and leave data/raw_data NULL, so subsequent /api/v1/metrics, /api/v2/metrics, or storage requests return 404 until a later push wins the race. Instead, keep serving the old snapshot while queueing or swapping in the new one after active users drain.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
tests/integration/scenarios/internal_http_server/config/internal_http_server_exec_deadlock.yaml (1)

15-17: ⚡ Quick win

Add a request timeout to the exec curl command.

Line 15 uses curl -s without connect/overall time limits; if the endpoint stalls, this scenario can block longer than necessary and make the test flaky.

Suggested patch
-      command: curl -s http://127.0.0.1:${FLUENT_BIT_HTTP_MONITORING_PORT}/api/v1/metrics/prometheus
+      command: curl -s --connect-timeout 1 --max-time 2 http://127.0.0.1:${FLUENT_BIT_HTTP_MONITORING_PORT}/api/v1/metrics/prometheus
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In
`@tests/integration/scenarios/internal_http_server/config/internal_http_server_exec_deadlock.yaml`
around lines 15 - 17, The curl command used in the exec step (the string
beginning with "curl -s
http://127.0.0.1:${FLUENT_BIT_HTTP_MONITORING_PORT}/api/v1/metrics/prometheus")
has no timeouts; update it to include connection and overall time limits (e.g.,
add --connect-timeout and --max-time or short flags -m) so the exec won't hang
indefinitely if the endpoint stalls, ensuring the test scenario fails fast
instead of blocking.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In
`@tests/integration/scenarios/internal_http_server/tests/test_internal_http_server_exec_deadlock_001.py`:
- Around line 65-75: The lambda passed to service.service.wait_for_condition
closes over loop variables path and pattern causing late-binding (Ruff B023);
fix by binding them as default parameters in the lambda signature so each
iteration captures the current values (i.e., change the callback to use lambda
path=path, pattern=pattern: ...), keeping the same inner logic that calls
service.request(path) and checks response["status_code"] and pattern in
response["body"].

---

Nitpick comments:
In
`@tests/integration/scenarios/internal_http_server/config/internal_http_server_exec_deadlock.yaml`:
- Around line 15-17: The curl command used in the exec step (the string
beginning with "curl -s
http://127.0.0.1:${FLUENT_BIT_HTTP_MONITORING_PORT}/api/v1/metrics/prometheus")
has no timeouts; update it to include connection and overall time limits (e.g.,
add --connect-timeout and --max-time or short flags -m) so the exec won't hang
indefinitely if the endpoint stalls, ensuring the test scenario fails fast
instead of blocking.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 4bf0a9f2-f6ff-49e6-a5a6-aa531bce241c

📥 Commits

Reviewing files that changed from the base of the PR and between 9ee8687 and 11dc0c6.

📒 Files selected for processing (9)
  • include/fluent-bit/http_server/flb_hs.h
  • src/http_server/api/v1/health.c
  • src/http_server/api/v1/metrics.c
  • src/http_server/api/v1/storage.c
  • src/http_server/api/v2/metrics.c
  • src/http_server/flb_hs.c
  • src/http_server/flb_http_server_http1.c
  • tests/integration/scenarios/internal_http_server/config/internal_http_server_exec_deadlock.yaml
  • tests/integration/scenarios/internal_http_server/tests/test_internal_http_server_exec_deadlock_001.py

Comment on lines +65 to +75
for path, pattern in endpoints:
result = service.service.wait_for_condition(
lambda: (
response
if response["status_code"] == 200 and pattern in response["body"]
else None
) if (response := service.request(path)) else None,
timeout=10,
interval=0.5,
description=f"internal endpoint {path}",
)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

🧩 Analysis chain

🏁 Script executed:

# Check if file exists and read the relevant section
file_path="tests/integration/scenarios/internal_http_server/tests/test_internal_http_server_exec_deadlock_001.py"
if [ -f "$file_path" ]; then
  echo "=== File contents (lines 60-80) ==="
  sed -n '60,80p' "$file_path" | cat -n
else
  echo "File not found: $file_path"
  # Try to find similar files
  find . -name "*test_internal_http_server*" -type f 2>/dev/null | head -10
fi

Repository: fluent/fluent-bit

Length of output: 1040


🏁 Script executed:

# Also check the repository structure around this file
ls -la tests/integration/scenarios/internal_http_server/tests/ 2>/dev/null || echo "Directory not found, listing integration tests:"
find tests/integration -name "*.py" -type f 2>/dev/null | head -20

Repository: fluent/fluent-bit

Length of output: 1782


🏁 Script executed:

# Run Ruff linting on the file to check for B023 warnings
python_file="tests/integration/scenarios/internal_http_server/tests/test_internal_http_server_exec_deadlock_001.py"
echo "=== Running Ruff on the file ==="
python -m pip install ruff -q 2>/dev/null
python -m ruff check "$python_file" --select B023 2>&1 || true

Repository: fluent/fluent-bit

Length of output: 1447


🏁 Script executed:

# Get the full context around the lambda to understand the code better
echo "=== Full file content ==="
wc -l tests/integration/scenarios/internal_http_server/tests/test_internal_http_server_exec_deadlock_001.py

Repository: fluent/fluent-bit

Length of output: 192


🏁 Script executed:

# Read the full file
cat -n tests/integration/scenarios/internal_http_server/tests/test_internal_http_server_exec_deadlock_001.py

Repository: fluent/fluent-bit

Length of output: 3715


Bind loop variables in the lambda to avoid late-binding issues flagged by Ruff B023.

Lines 69 and 71 reference path and pattern from the loop without binding them in the lambda signature. Add them as default arguments to ensure each callback iteration captures the correct loop values.

Suggested fix
         for path, pattern in endpoints:
             result = service.service.wait_for_condition(
-                lambda: (
+                lambda path=path, pattern=pattern: (
                     response
                     if response["status_code"] == 200 and pattern in response["body"]
                     else None
                 ) if (response := service.request(path)) else None,
🧰 Tools
🪛 Ruff (0.15.12)

[warning] 69-69: Function definition does not bind loop variable pattern

(B023)


[warning] 71-71: Function definition does not bind loop variable path

(B023)

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In
`@tests/integration/scenarios/internal_http_server/tests/test_internal_http_server_exec_deadlock_001.py`
around lines 65 - 75, The lambda passed to service.service.wait_for_condition
closes over loop variables path and pattern causing late-binding (Ruff B023);
fix by binding them as default parameters in the lambda signature so each
iteration captures the current values (i.e., change the callback to use lambda
path=path, pattern=pattern: ...), keeping the same inner logic that calls
service.request(path) and checks response["status_code"] and pattern in
response["body"].

edsiper added 2 commits May 16, 2026 06:26
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

♻️ Duplicate comments (1)
tests/integration/scenarios/internal_http_server/tests/test_internal_http_server_exec_deadlock_001.py (1)

65-75: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Bind loop variables in the lambda to avoid late-binding capture.

Line 67 closes over path/pattern from the for loop; this is the Ruff B023 warning and can mis-bind values if callback execution is deferred. Bind both as default args.

Proposed fix
         for path, pattern in endpoints:
             result = service.service.wait_for_condition(
-                lambda: (
+                lambda path=path, pattern=pattern: (
                     response
                     if response["status_code"] == 200 and pattern in response["body"]
                     else None
                 ) if (response := service.request(path)) else None,
#!/bin/bash
# Verify Ruff B023 on this file
python -m pip install -q ruff
python -m ruff check tests/integration/scenarios/internal_http_server/tests/test_internal_http_server_exec_deadlock_001.py --select B023
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In
`@tests/integration/scenarios/internal_http_server/tests/test_internal_http_server_exec_deadlock_001.py`
around lines 65 - 75, The lambda passed to service.service.wait_for_condition
closes over the loop variables path and pattern causing late-binding (Ruff
B023); fix by binding them as default arguments in the lambda (e.g., lambda
path=path, pattern=pattern: ...) so each callback captures the current loop
values, keeping the existing condition logic that calls service.request(path)
and checks response["status_code"] and pattern in response["body"] intact.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Duplicate comments:
In
`@tests/integration/scenarios/internal_http_server/tests/test_internal_http_server_exec_deadlock_001.py`:
- Around line 65-75: The lambda passed to service.service.wait_for_condition
closes over the loop variables path and pattern causing late-binding (Ruff
B023); fix by binding them as default arguments in the lambda (e.g., lambda
path=path, pattern=pattern: ...) so each callback captures the current loop
values, keeping the existing condition logic that calls service.request(path)
and checks response["status_code"] and pattern in response["body"] intact.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 2bd8f604-663b-476e-89a5-af79256ddeaf

📥 Commits

Reviewing files that changed from the base of the PR and between 11dc0c6 and 7e48b19.

📒 Files selected for processing (9)
  • include/fluent-bit/http_server/flb_hs.h
  • src/http_server/api/v1/health.c
  • src/http_server/api/v1/metrics.c
  • src/http_server/api/v1/storage.c
  • src/http_server/api/v2/metrics.c
  • src/http_server/flb_hs.c
  • src/http_server/flb_http_server_http1.c
  • tests/integration/scenarios/internal_http_server/config/internal_http_server_exec_deadlock.yaml
  • tests/integration/scenarios/internal_http_server/tests/test_internal_http_server_exec_deadlock_001.py
🚧 Files skipped from review as they are similar to previous changes (8)
  • src/http_server/flb_http_server_http1.c
  • include/fluent-bit/http_server/flb_hs.h
  • tests/integration/scenarios/internal_http_server/config/internal_http_server_exec_deadlock.yaml
  • src/http_server/api/v1/metrics.c
  • src/http_server/api/v2/metrics.c
  • src/http_server/api/v1/storage.c
  • src/http_server/api/v1/health.c
  • src/http_server/flb_hs.c

Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant