Extract WaitAckCache to deduplicate OpenSearch bulk response handling#1869
Extract WaitAckCache to deduplicate OpenSearch bulk response handling#1869
Conversation
…rch bolts The waitAck cache logic and bulk response processing were duplicated across DeletionBolt, IndexerBolt, and StatusUpdaterBolt. This extracts the shared logic into a new WaitAckCache class and adds unit tests covering the core scenarios (success, failure, conflicts, eviction, duplicate doc IDs). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Signed-off-by: Julien Nioche <julien@digitalpebble.com>
Signed-off-by: Julien Nioche <julien@digitalpebble.com>
… working in my spare time) Signed-off-by: Julien Nioche <julien@digitalpebble.com>
| best = item; | ||
| } | ||
| } | ||
| if (failedCount > 0 && failedCount < items.size()) { |
There was a problem hiding this comment.
If all items are failed, best stays as items.get(0) (a failed item), which is actually correct behaviour but the warning only fires when there's a mix. If there are multiple failed items for the same ID, no warning is logged and the first one is silently used. Not a bug per se, but worth a comment.
|
|
||
| @Override | ||
| public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { | ||
| var idsToBulkItemsWithFailedFlag = |
There was a problem hiding this comment.
In the old DeletionBolt, a conflict was silently ignored (just LOG.debug). The new shared path in processBulkResponse also only logs at debug. That's fine. But the IndexerBolt path previously incremented eventCounter.scope("doc_conflicts") that counter now comes from the conflictCounter parameter, which DeletionBolt passes as null. So deletion conflicts are no longer counted anywhere. If that's intentional it should be documented; if not, it's a regression.
external/opensearch/src/test/java/org/apache/stormcrawler/opensearch/WaitAckCacheTest.java
Outdated
Show resolved
Hide resolved
external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/WaitAckCache.java
Show resolved
Hide resolved
Signed-off-by: Julien Nioche <julien@digitalpebble.com>
|
@rzo1 I pushed a commit addressing some of the comments in your review. thanks for this! |
|
Works for me. We need to resolve the conflict and move on as you suggested. |
Signed-off-by: Julien Nioche <julien@digitalpebble.com>
Signed-off-by: Julien Nioche <julien@digitalpebble.com>
Signed-off-by: Julien Nioche <julien@digitalpebble.com>
This commit aligns the opensearch-java module with recent legacy updates, completes the migration to HC5/API 3.x, and cleans up duplicated resources. Refactors and Alignment: - Ported DelegateRefresher for dynamic config reloading (apache#1870). - Adopted Storm V2 metrics bridge via CrawlerMetrics (apache#1846). - Aligned log messages and metric scopes to OpenSearch (apache#1871). - Ported WaitAckCache extraction to centralize bulk-ack logic (apache#1869). - Fixed a race condition in IndexerBolt by inverting the execution order, ensuring tuples are registered in waitAck before bulk dispatch. - Refactored BulkItemResponseToFailedFlag to a Java record with a compact constructor for strict null-safety. Maintenance and Cleanup: - Removed duplicated archetype, dashboards, and opensearch-conf.yaml to prevent maintenance overhead. - Updated README with a migration guide pointing to legacy resources. - Removed dead rat-exclude in root pom.xml.
This commit aligns the opensearch-java module with recent legacy updates, completes the migration to HC5/API 3.x, and cleans up duplicated resources. Refactors and Alignment: - Ported DelegateRefresher for dynamic config reloading (apache#1870). - Adopted Storm V2 metrics bridge via CrawlerMetrics (apache#1846). - Aligned log messages and metric scopes to OpenSearch (apache#1871). - Ported WaitAckCache extraction to centralize bulk-ack logic (apache#1869). - Fixed a race condition in IndexerBolt by inverting the execution order, ensuring tuples are registered in waitAck before bulk dispatch. - Refactored BulkItemResponseToFailedFlag to a Java record with a compact constructor for strict null-safety. Maintenance and Cleanup: - Removed duplicated archetype, dashboards, and opensearch-conf.yaml to prevent maintenance overhead. - Updated README with a migration guide pointing to legacy resources. - Removed dead rat-exclude in root pom.xml.
Summary
DeletionBolt,IndexerBolt, andStatusUpdaterBoltinto a new sharedWaitAckCacheclassWaitAckCachecovering success, failure, conflicts, eviction, duplicate doc IDs (Elasticsearch IndexerBolt: tuples with canonical URL may not get acked #832), and edge cases — no OpenSearch container neededTest plan
mvn test -pl external/opensearch -Dtest=WaitAckCacheTest— all 11 new unit tests passmvn compile -pl external/opensearch -am— clean compileIndexerBoltTest,StatusBoltTest) pass against OpenSearch container🤖 Generated with Claude Code