Skip to content

Extract WaitAckCache to deduplicate OpenSearch bulk response handling#1869

Merged
rzo1 merged 8 commits intomainfrom
refactor/opensearch-waitack-cache
Apr 10, 2026
Merged

Extract WaitAckCache to deduplicate OpenSearch bulk response handling#1869
rzo1 merged 8 commits intomainfrom
refactor/opensearch-waitack-cache

Conversation

@jnioche
Copy link
Copy Markdown
Contributor

@jnioche jnioche commented Apr 4, 2026

Summary

  • Extracts duplicated waitAck cache and bulk response processing logic from DeletionBolt, IndexerBolt, and StatusUpdaterBolt into a new shared WaitAckCache class
  • Reduces ~595 lines of duplicated code across the three bolts into a single reusable component with bolt-specific callbacks
  • Adds 11 unit tests for WaitAckCache covering success, failure, conflicts, eviction, duplicate doc IDs (Elasticsearch IndexerBolt: tuples with canonical URL may not get acked #832), and edge cases — no OpenSearch container needed

Test plan

  • mvn test -pl external/opensearch -Dtest=WaitAckCacheTest — all 11 new unit tests pass
  • mvn compile -pl external/opensearch -am — clean compile
  • Existing integration tests (IndexerBoltTest, StatusBoltTest) pass against OpenSearch container

🤖 Generated with Claude Code

…rch bolts

The waitAck cache logic and bulk response processing were duplicated across
DeletionBolt, IndexerBolt, and StatusUpdaterBolt. This extracts the shared
logic into a new WaitAckCache class and adds unit tests covering the core
scenarios (success, failure, conflicts, eviction, duplicate doc IDs).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@jnioche jnioche added this to the 3.6.0 milestone Apr 4, 2026
Signed-off-by: Julien Nioche <julien@digitalpebble.com>
jnioche added 2 commits April 4, 2026 15:21
Signed-off-by: Julien Nioche <julien@digitalpebble.com>
… working in my spare time)

Signed-off-by: Julien Nioche <julien@digitalpebble.com>
@jnioche jnioche requested a review from rzo1 April 6, 2026 17:00
Copy link
Copy Markdown
Contributor

@rzo1 rzo1 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some minor comments.

best = item;
}
}
if (failedCount > 0 && failedCount < items.size()) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If all items are failed, best stays as items.get(0) (a failed item), which is actually correct behaviour but the warning only fires when there's a mix. If there are multiple failed items for the same ID, no warning is logged and the first one is silently used. Not a bug per se, but worth a comment.


@Override
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
var idsToBulkItemsWithFailedFlag =
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the old DeletionBolt, a conflict was silently ignored (just LOG.debug). The new shared path in processBulkResponse also only logs at debug. That's fine. But the IndexerBolt path previously incremented eventCounter.scope("doc_conflicts") that counter now comes from the conflictCounter parameter, which DeletionBolt passes as null. So deletion conflicts are no longer counted anywhere. If that's intentional it should be documented; if not, it's a regression.

@jnioche jnioche marked this pull request as draft April 6, 2026 17:43
Signed-off-by: Julien Nioche <julien@digitalpebble.com>
@jnioche
Copy link
Copy Markdown
Contributor Author

jnioche commented Apr 9, 2026

@rzo1 I pushed a commit addressing some of the comments in your review. thanks for this!
The only remaining one is the DeletionBolt conflict counter, I will first merge the changes to the counters from main so that we use the same objects.

@rzo1
Copy link
Copy Markdown
Contributor

rzo1 commented Apr 9, 2026

Works for me. We need to resolve the conflict and move on as you suggested.

jnioche added 3 commits April 10, 2026 10:29
Signed-off-by: Julien Nioche <julien@digitalpebble.com>
Signed-off-by: Julien Nioche <julien@digitalpebble.com>
Signed-off-by: Julien Nioche <julien@digitalpebble.com>
@jnioche jnioche marked this pull request as ready for review April 10, 2026 09:40
@jnioche jnioche requested a review from rzo1 April 10, 2026 09:52
Copy link
Copy Markdown
Contributor

@rzo1 rzo1 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

@rzo1 rzo1 merged commit 2d306c4 into main Apr 10, 2026
2 checks passed
@rzo1 rzo1 deleted the refactor/opensearch-waitack-cache branch April 10, 2026 09:57
dpol1 added a commit to dpol1/stormcrawler that referenced this pull request Apr 10, 2026
This commit aligns the opensearch-java module with recent legacy updates,
completes the migration to HC5/API 3.x, and cleans up duplicated resources.

Refactors and Alignment:
- Ported DelegateRefresher for dynamic config reloading (apache#1870).
- Adopted Storm V2 metrics bridge via CrawlerMetrics (apache#1846).
- Aligned log messages and metric scopes to OpenSearch (apache#1871).
- Ported WaitAckCache extraction to centralize bulk-ack logic (apache#1869).
- Fixed a race condition in IndexerBolt by inverting the execution order,
  ensuring tuples are registered in waitAck before bulk dispatch.
- Refactored BulkItemResponseToFailedFlag to a Java record with a compact
  constructor for strict null-safety.

Maintenance and Cleanup:
- Removed duplicated archetype, dashboards, and opensearch-conf.yaml
  to prevent maintenance overhead.
- Updated README with a migration guide pointing to legacy resources.
- Removed dead rat-exclude in root pom.xml.
dpol1 added a commit to dpol1/stormcrawler that referenced this pull request Apr 10, 2026
This commit aligns the opensearch-java module with recent legacy updates,
completes the migration to HC5/API 3.x, and cleans up duplicated resources.

Refactors and Alignment:
- Ported DelegateRefresher for dynamic config reloading (apache#1870).
- Adopted Storm V2 metrics bridge via CrawlerMetrics (apache#1846).
- Aligned log messages and metric scopes to OpenSearch (apache#1871).
- Ported WaitAckCache extraction to centralize bulk-ack logic (apache#1869).
- Fixed a race condition in IndexerBolt by inverting the execution order,
  ensuring tuples are registered in waitAck before bulk dispatch.
- Refactored BulkItemResponseToFailedFlag to a Java record with a compact
  constructor for strict null-safety.

Maintenance and Cleanup:
- Removed duplicated archetype, dashboards, and opensearch-conf.yaml
  to prevent maintenance overhead.
- Updated README with a migration guide pointing to legacy resources.
- Removed dead rat-exclude in root pom.xml.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants