Add opensearch-java module using java client#1868
Conversation
- Cloned external/opensearch to external/opensearch-java to introduce the new client as a drop-in replacement. - Updated Maven artifactId and names in the new local POMs (including the archetype). - Registered the new module in the root POM. This commit isolates the pure file duplication. The actual migration to the opensearch-java client will be done in the next commit to ensure a clean, readable Git diff for reviewers.
Introduces the external/opensearch-java module, replacing the deprecated RestHighLevelClient with the official opensearch-java client. Designed as a drop-in replacement for `external/opensearch` with identical configurations. Key improvements: - Implemented AsyncBulkProcessor (Semaphore + dedicated ThreadPool) to ensure strict backpressure and replace the legacy BulkProcessor. - Fixed historical tuple-ack race conditions in IndexerBolt and DeletionBolt. - Maintained RestClientTransport to seamlessly support the Sniffer and bypass the 100MB response buffer limit. - Synced recent upstream bugfixes, adapting resource cleanup to the new async architecture.
|
thanks @dpol1 |
Will do - I focused on the core logic first (
They are identical to the existing module - removing them in this PR.
Correct - the only change is the artifactId from |
Would be good to fix that in the existing module as a separate PR. Am about to push a refactoring of that module though, so maybe best to do after that if still applies? see #1869 |
|
Thanks by all means, I'll keep the race-condition fix as-is in this module for now. Once your PR on the legacy module lands, I'll double-check the legacy module afterwards and if needed apply to this module as well. |
Think there is only one PR left ( #1869 ) and we can move on. |
|
@dpol1 Think the blocker is gone. |
This commit aligns the opensearch-java module with recent legacy updates, completes the migration to HC5/API 3.x, and cleans up duplicated resources. Refactors and Alignment: - Ported DelegateRefresher for dynamic config reloading (apache#1870). - Adopted Storm V2 metrics bridge via CrawlerMetrics (apache#1846). - Aligned log messages and metric scopes to OpenSearch (apache#1871). - Ported WaitAckCache extraction to centralize bulk-ack logic (apache#1869). - Fixed a race condition in IndexerBolt by inverting the execution order, ensuring tuples are registered in waitAck before bulk dispatch. - Refactored BulkItemResponseToFailedFlag to a Java record with a compact constructor for strict null-safety. Maintenance and Cleanup: - Removed duplicated archetype, dashboards, and opensearch-conf.yaml to prevent maintenance overhead. - Updated README with a migration guide pointing to legacy resources. - Removed dead rat-exclude in root pom.xml.
de005ac to
d530818
Compare
|
Thanks @dpol1 Looks good, here are a couple of possible issues flagged by Claude 1. Timestamp serialization format change (MAJOR — data-compat risk) The new module writes timestamps as ISO-8601 strings where the legacy module writes epoch-millis longs:
Impact depends on index mapping:
Recommendation: either match legacy (toInstant().toEpochMilli()) or document the format change in the module README and make sure the example mappings under src/test/resources/{status,metrics,indexer}.mapping declare a format that 2. responseBufferSize config key silently dropped (MINOR regression) Legacy external/opensearch/.../OpenSearchConnection.java:283 reads opensearch.*.responseBufferSize (default 100 MB) and sets it on the HTTP client. The new OpenSearchConnection.java does not reference it. Users who tuned this for large Recommendation: either port the setting to the HC5 client builder or list it as a removed key in the new module's README. |
I tried switching to toEpochMilli(), but it immediately broke AggregationSpout (which expects a String) and caused the tests to fail against the date_optional_time mapping. I thought an hybrid approch which covers both but I don't think is solid - Maybe I forgot something but I've reverted to .toInstant().toString().
Yes properly documented now and for the sniffer as well! |
|
@dpol1 did you run a crawl with this new module? |
|
Yep, run a crawl locally - Injection worked fine, but the crawler blew up with a ClassCastException in AggregationSpout - Think I'll investigate in this way. Suggestions are welcome :-) |
|
Can you Post the full stacktrace? |
Summary
This PR brings
external/opensearch-java(the new module targeting OpenSearch Java Client 3.x + Apache HttpClient 5) up to date with the recent refactors landed inexternal/opensearch, and completes the initial scaffolding by removing duplicated resources that only make sense in the legacy module.This module migrates StormCrawler from the deprecated
RestHighLevelClientto the officialopensearch-javaclient (v3.8.0). Following the community suggestion, this is built as a separate module to act as a drop-in replacement. Users can migrate seamlessly by simply updating theirpom.xmlartifactId, with zero changes required for Flux topologies or YAML configuration keys.The legacy
external/opensearchmodule remains untouched in this PR to allow a gradual phase-out.Architectural Decisions & Engineering
Since the new
opensearch-javaclient introduces a completely different paradigm (fluent builders, strict JSON mappers) and removes several legacy utility classes, the following architectural decisions were made:1.
AsyncBulkProcessor& BackpressureThe legacy
BulkProcessorwas removed in the new client. To preventOutOfMemoryErrors and preserve Storm's backpressure, I implemented a customAsyncBulkProcessor:BulkOperations and flushes based on action count or aScheduledExecutorServicetimer.Semaphoreto limit concurrent in-flight HTTP requests.ThreadPoolExecutorwithCallerRunsPolicyto process async callbacks without starving the JVM'sForkJoinPool.commonPool().2. Transport & Modernization (HC5)
I evaluated the transport layer and decided to fully adopt the Apache HttpClient 5 (HC5) via
ApacheHttpClient5TransportBuilder.opensearch-rest-client-snifferis tightly coupled with the legacy HC4RestClient. Given the goal of modernizing the module, this was deemed an acceptable trade-off for this new 3.x-based implementation.3. Concurrency & Race Condition Fixes
During the migration, I identified and fixed a race condition in
IndexerBoltandDeletionBoltwhere tuples were added to the processor before being safely locked in thewaitAckmap. The locking order has been inverted to guarantee zero tuple loss during high-throughput flushes. [implemented in #1869 and aligned in this module as well]4. Upstream Bugfixes Sync
This module is perfectly aligned with
main. It incorporates the recent bugfixes applied to the legacy module, adapted for the new asynchronous paradigm:AbstractSpout.OpenSearchConnectionduringSnifferinitialization failures.TimerandOpenSearchClientmemory leaks inJSONResourceWrapperandJSONURLFilterWrapperby properly implementingcleanup().OpenSearch_queries), and fixed SLF4J placeholders.CrawlerMetricsbridge across all bolts and spouts to ensure compatibility with Storm 2.x metrics.OpenSearchClient.5. Maintenance Cleanup
To keep the codebase DRY, I removed duplicated resources that are identical to the legacy module (
archetype/,dashboards/, andopensearch-conf.yaml). The moduleREADME.mdhas been rewritten to guide users on how to reference the compatible legacy resources.Test plan
IndexerBolt,StatusUpdaterBolt,DeletionBolt, etc.) against a real OpenSearch instance using Testcontainers.AsyncBulkProcessorunder load to ensure it correctly flushes based on size/time thresholds and strictly respects theSemaphoreconcurrency limits without dropping tuples.nextFetchDateandtimestampfields conform to ISO-8601 format to prevent OpenSearch mapping errors.opensearch-javadependency and its associated configurations.Closes #1515