diff --git a/docs/ARCHITECTURE.md b/docs/ARCHITECTURE.md index 69a036961..25b8091fa 100644 --- a/docs/ARCHITECTURE.md +++ b/docs/ARCHITECTURE.md @@ -108,7 +108,7 @@ This section describes the major functional components that make up LCore. Each - **Lifecycle Management**: - **Startup**: Load configuration, initialize Llama Stack client, load MCP server configuration and register all defined servers with Llama Stack to build the tools list, establish database connections - **Shutdown**: Clean up A2A storage resources (database connections and other resources are cleaned up automatically by Python's context managers) -- **Router Registration**: Mount all endpoint routers (query, conversation, model info, auth, metrics, A2A, feedback, admin, mcp_auth) +- **Router Registration**: Mount all endpoint routers (query, conversation, model info, auth, metrics, A2A, feedback, admin, mcp_auth, mcp_servers) **Note:** All configured MCP servers must be running and accessible at startup time for LCore to initialize successfully. @@ -207,6 +207,9 @@ The system defines 30+ actions that can be authorized. Examples (see `docs/auth. **Agent-to-Agent Protocol:** - `A2A_JSONRPC` - A2A protocol access +**MCP Server Management:** +- `REGISTER_MCP_SERVER`, `LIST_MCP_SERVERS`, `DELETE_MCP_SERVER` + **Metadata Operations:** - `LIST_MODELS`, `LIST_SHIELDS`, `LIST_TOOLS`, `LIST_PROVIDERS` @@ -325,7 +328,7 @@ MCP servers are remote HTTP services that expose tools/capabilities to LLMs (e.g **How It Works:** -1. **Configuration:** MCP servers are defined in the config file with name, URL, and authorization headers +1. **Configuration:** MCP servers are defined in the config file with name, URL, and authorization headers. Servers can also be registered dynamically at runtime via `POST /v1/mcp-servers`. 2. **Registration at Startup:** LCore tells Llama Stack about each MCP server by calling `toolgroups.register()` - this makes the MCP server's tools available in Llama Stack's tool registry 3. **Query Processing:** When processing a query, LCore determines which tools to make available to the LLM and finalizes authorization headers (e.g., merging client-provided tokens with configured headers) 4. **Tool Execution:** When the LLM calls a tool, Llama Stack routes the request to the appropriate MCP server URL with the finalized authorization headers @@ -489,6 +492,11 @@ This section documents the REST API endpoints exposed by LCore for client intera - Returns MCP servers that accept client-provided authentication tokens - Includes header names that need to be provided via MCP-HEADERS +**MCP Server Management:** +- `POST /v1/mcp-servers` - Register a new MCP server at runtime +- `GET /v1/mcp-servers` - List all registered MCP servers (static and dynamic) +- `DELETE /v1/mcp-servers/{name}` - Unregister a dynamically registered MCP server + **List Shields:** `GET /shields` - Returns available guardrails diff --git a/docs/config.md b/docs/config.md index c1807a8b1..183176336 100644 --- a/docs/config.md +++ b/docs/config.md @@ -159,7 +159,7 @@ Global service configuration. | llama_stack | | This section contains Llama Stack configuration. Lightspeed Core Stack service can call Llama Stack in library mode or in server mode. | | user_data_collection | | This section contains configuration for subsystem that collects user data(transcription history and feedbacks). | | database | | Configuration for database to store conversation IDs and other runtime data | -| mcp_servers | array | MCP (Model Context Protocol) servers provide tools and capabilities to the AI agents. These are configured in this section. Only MCP servers defined in the lightspeed-stack.yaml configuration are available to the agents. Tools configured in the llama-stack run.yaml are not accessible to lightspeed-core agents. | +| mcp_servers | array | MCP (Model Context Protocol) servers provide tools and capabilities to the AI agents. These are configured in this section. Servers can also be registered dynamically at runtime via the `POST /v1/mcp-servers` API endpoint. Only MCP servers defined in lightspeed-stack.yaml or registered via the API are available to the agents. Tools configured in the llama-stack run.yaml are not accessible to lightspeed-core agents. | | authentication | | Authentication configuration | | authorization | | Lightspeed Core Stack implements a modular authentication and authorization system with multiple authentication methods. Authorization is configurable through role-based access control. Authentication is handled through selectable modules configured via the module field in the authentication configuration. | | customization | | It is possible to customize Lightspeed Core Stack via this section. System prompt can be customized and also different parts of the service can be replaced by custom Python modules. | @@ -359,10 +359,10 @@ Useful resources: Model context protocol server configuration. MCP (Model Context Protocol) servers provide tools and capabilities to the -AI agents. These are configured by this structure. Only MCP servers -defined in the lightspeed-stack.yaml configuration are available to the -agents. Tools configured in the llama-stack run.yaml are not accessible to -lightspeed-core agents. +AI agents. These are configured by this structure. MCP servers defined in +lightspeed-stack.yaml and servers registered at runtime via the +`POST /v1/mcp-servers` API are available to agents. Tools configured in +the llama-stack run.yaml are not accessible to lightspeed-core agents. Useful resources: diff --git a/docs/getting_started.md b/docs/getting_started.md index 5431c3a3e..ea21c3923 100644 --- a/docs/getting_started.md +++ b/docs/getting_started.md @@ -240,7 +240,7 @@ mcp_servers: url: "http://localhost:3002" ``` -**Important**: Only MCP servers defined in the `lightspeed-stack.yaml` configuration are available to the AI agents. Tools configured in the llama-stack `run.yaml` are not accessible to LCS agents. +**Important**: MCP servers defined in `lightspeed-stack.yaml` or registered dynamically via the API (see [Dynamic MCP Server Management](#dynamic-mcp-server-management-via-api)) are available to the AI agents. Tools configured in the llama-stack `run.yaml` are not accessible to LCS agents. #### Step 3: Pass authentication or metadata via MCP headers (optional) @@ -255,4 +255,84 @@ curl -X POST "http://localhost:8080/v1/query" \ ``` #### Step 4: Verify connectivity -After starting the MCP servers and updating `lightspeed-stack.yaml`, test by sending a prompt to the AI agent. LCS evaluates the prompt against available tools’ metadata, selects the appropriate tool, calls the corresponding MCP server, and uses the result to generate more accurate agent response. +After starting the MCP servers and updating `lightspeed-stack.yaml`, test by sending a prompt to the AI agent. LCS evaluates the prompt against available tools' metadata, selects the appropriate tool, calls the corresponding MCP server, and uses the result to generate more accurate agent response. + +### Dynamic MCP Server Management via API + +In addition to static configuration in `lightspeed-stack.yaml`, MCP servers can be registered, listed, and removed at runtime through the REST API. This is useful for development, testing, or scenarios where MCP servers are provisioned dynamically. + +When authorization is enabled, callers need the following permissions: +- `REGISTER_MCP_SERVER` for `POST /v1/mcp-servers` +- `LIST_MCP_SERVERS` for `GET /v1/mcp-servers` +- `DELETE_MCP_SERVER` for `DELETE /v1/mcp-servers/{name}` + +#### Register an MCP server + +```bash +curl -X POST "http://localhost:8080/v1/mcp-servers" \ + -H "Content-Type: application/json" \ + -d '{ + "name": "my-dynamic-tools", + "url": "http://localhost:9000/mcp", + "provider_id": "model-context-protocol" + }' +``` + +Response (201 Created): +```json +{ + "name": "my-dynamic-tools", + "url": "http://localhost:9000/mcp", + "provider_id": "model-context-protocol", + "message": "MCP server 'my-dynamic-tools' registered successfully" +} +``` + +Optional fields in the request body: +- `authorization_headers` (object): Headers to send to the MCP server (e.g., `{"Authorization": "Bearer token123"}`). +- `headers` (array): List of HTTP header names to forward from incoming requests to this MCP server. +- `timeout` (integer): Request timeout in seconds. + +#### List all MCP servers + +```bash +curl "http://localhost:8080/v1/mcp-servers" +``` + +Response (200 OK): +```json +{ + "servers": [ + { + "name": "filesystem-tools", + "url": "http://localhost:3000", + "provider_id": "model-context-protocol", + "source": "config" + }, + { + "name": "my-dynamic-tools", + "url": "http://localhost:9000/mcp", + "provider_id": "model-context-protocol", + "source": "api" + } + ] +} +``` + +Each server has a `source` field indicating how it was registered: `"config"` for servers defined in `lightspeed-stack.yaml`, or `"api"` for servers registered via the REST API. + +#### Delete a dynamically registered MCP server + +```bash +curl -X DELETE "http://localhost:8080/v1/mcp-servers/my-dynamic-tools" +``` + +Response (200 OK): +```json +{ + "name": "my-dynamic-tools", + "message": "MCP server 'my-dynamic-tools' unregistered successfully" +} +``` + +**Note:** Only dynamically registered servers (source `"api"`) can be deleted via the API. Attempting to delete a statically configured server returns 403 Forbidden. Dynamically registered servers do not persist across service restarts. diff --git a/tests/e2e/configuration/library-mode/lightspeed-stack-mcp-auth.yaml b/tests/e2e/configuration/library-mode/lightspeed-stack-mcp-auth.yaml new file mode 100644 index 000000000..99b2df6d2 --- /dev/null +++ b/tests/e2e/configuration/library-mode/lightspeed-stack-mcp-auth.yaml @@ -0,0 +1,25 @@ +name: Lightspeed Core Service (LCS) +service: + host: 0.0.0.0 + port: 8080 + auth_enabled: false + workers: 1 + color_log: true + access_log: true +llama_stack: + # Library mode - embeds llama-stack as library + use_as_library_client: true + library_client_config_path: run.yaml +user_data_collection: + feedback_enabled: true + feedback_storage: "/tmp/data/feedback" + transcripts_enabled: true + transcripts_storage: "/tmp/data/transcripts" +authentication: + module: "noop-with-token" +mcp_servers: + - name: "mcp-oauth" + provider_id: "model-context-protocol" + url: "http://mock-mcp:3001" + authorization_headers: + Authorization: "oauth" diff --git a/tests/e2e/configuration/library-mode/lightspeed-stack-no-mcp.yaml b/tests/e2e/configuration/library-mode/lightspeed-stack-no-mcp.yaml new file mode 100644 index 000000000..47257bfb1 --- /dev/null +++ b/tests/e2e/configuration/library-mode/lightspeed-stack-no-mcp.yaml @@ -0,0 +1,19 @@ +name: Lightspeed Core Service (LCS) +service: + host: 0.0.0.0 + port: 8080 + auth_enabled: false + workers: 1 + color_log: true + access_log: true +llama_stack: + # Library mode - embeds llama-stack as library + use_as_library_client: true + library_client_config_path: run.yaml +user_data_collection: + feedback_enabled: true + feedback_storage: "/tmp/data/feedback" + transcripts_enabled: true + transcripts_storage: "/tmp/data/transcripts" +authentication: + module: "noop" diff --git a/tests/e2e/configuration/server-mode/lightspeed-stack-mcp-auth.yaml b/tests/e2e/configuration/server-mode/lightspeed-stack-mcp-auth.yaml new file mode 100644 index 000000000..ed6251fcc --- /dev/null +++ b/tests/e2e/configuration/server-mode/lightspeed-stack-mcp-auth.yaml @@ -0,0 +1,26 @@ +name: Lightspeed Core Service (LCS) +service: + host: 0.0.0.0 + port: 8080 + auth_enabled: false + workers: 1 + color_log: true + access_log: true +llama_stack: + # Server mode - connects to separate llama-stack service + use_as_library_client: false + url: http://llama-stack:8321 + api_key: xyzzy +user_data_collection: + feedback_enabled: true + feedback_storage: "/tmp/data/feedback" + transcripts_enabled: true + transcripts_storage: "/tmp/data/transcripts" +authentication: + module: "noop-with-token" +mcp_servers: + - name: "mcp-oauth" + provider_id: "model-context-protocol" + url: "http://mock-mcp:3001" + authorization_headers: + Authorization: "oauth" diff --git a/tests/e2e/configuration/server-mode/lightspeed-stack-no-mcp.yaml b/tests/e2e/configuration/server-mode/lightspeed-stack-no-mcp.yaml new file mode 100644 index 000000000..cc699ba89 --- /dev/null +++ b/tests/e2e/configuration/server-mode/lightspeed-stack-no-mcp.yaml @@ -0,0 +1,20 @@ +name: Lightspeed Core Service (LCS) +service: + host: 0.0.0.0 + port: 8080 + auth_enabled: false + workers: 1 + color_log: true + access_log: true +llama_stack: + # Server mode - connects to separate llama-stack service + use_as_library_client: false + url: http://llama-stack:8321 + api_key: xyzzy +user_data_collection: + feedback_enabled: true + feedback_storage: "/tmp/data/feedback" + transcripts_enabled: true + transcripts_storage: "/tmp/data/transcripts" +authentication: + module: "noop" diff --git a/tests/e2e/features/environment.py b/tests/e2e/features/environment.py index 70f294f26..14204d45b 100644 --- a/tests/e2e/features/environment.py +++ b/tests/e2e/features/environment.py @@ -76,6 +76,14 @@ "tests/e2e/configuration/{mode_dir}/lightspeed-stack-mcp-oauth-auth.yaml", "tests/e2e-prow/rhoai/configs/lightspeed-stack-mcp-oauth-auth.yaml", ), + "mcp-auth": ( + "tests/e2e/configuration/{mode_dir}/lightspeed-stack-mcp-auth.yaml", + "tests/e2e-prow/rhoai/configs/lightspeed-stack-mcp-auth.yaml", + ), + "no-mcp": ( + "tests/e2e/configuration/{mode_dir}/lightspeed-stack-no-mcp.yaml", + "tests/e2e-prow/rhoai/configs/lightspeed-stack-no-mcp.yaml", + ), } @@ -435,6 +443,24 @@ def before_feature(context: Context, feature: Feature) -> None: switch_config(context.feature_config) restart_container("lightspeed-stack") + if "MCPFileAuth" in feature.tags: + context.feature_config = _get_config_path("mcp-file-auth", mode_dir) + context.default_config_backup = create_config_backup("lightspeed-stack.yaml") + switch_config(context.feature_config) + restart_container("lightspeed-stack") + + if "MCPServerAPIAuth" in feature.tags: + context.feature_config = _get_config_path("mcp-auth", mode_dir) + context.default_config_backup = create_config_backup("lightspeed-stack.yaml") + switch_config(context.feature_config) + restart_container("lightspeed-stack") + + if "MCPNoConfig" in feature.tags: + context.feature_config = _get_config_path("no-mcp", mode_dir) + context.default_config_backup = create_config_backup("lightspeed-stack.yaml") + switch_config(context.feature_config) + restart_container("lightspeed-stack") + def after_feature(context: Context, feature: Feature) -> None: """Run after each feature file is exercised. @@ -467,3 +493,18 @@ def after_feature(context: Context, feature: Feature) -> None: switch_config(context.default_config_backup) restart_container("lightspeed-stack") remove_config_backup(context.default_config_backup) + + if "MCPFileAuth" in feature.tags: + switch_config(context.default_config_backup) + restart_container("lightspeed-stack") + remove_config_backup(context.default_config_backup) + + if "MCPServerAPIAuth" in feature.tags: + switch_config(context.default_config_backup) + restart_container("lightspeed-stack") + remove_config_backup(context.default_config_backup) + + if "MCPNoConfig" in feature.tags: + switch_config(context.default_config_backup) + restart_container("lightspeed-stack") + remove_config_backup(context.default_config_backup) diff --git a/tests/e2e/features/mcp_servers_api.feature b/tests/e2e/features/mcp_servers_api.feature new file mode 100644 index 000000000..ed9289788 --- /dev/null +++ b/tests/e2e/features/mcp_servers_api.feature @@ -0,0 +1,133 @@ +@MCP +Feature: MCP Server Management API tests + + Tests for the dynamic MCP server management endpoints: + POST /v1/mcp-servers, GET /v1/mcp-servers, DELETE /v1/mcp-servers/{name} + + Background: + Given The service is started locally + And REST API service prefix is /v1 + + Scenario: List MCP servers returns pre-configured servers + Given The system is in default state + When I access REST API endpoint "mcp-servers" using HTTP GET method + Then The status code of the response is 200 + And The body of the response contains mcp-oauth + And The body of the response contains config + + Scenario: List MCP servers response has expected structure + Given The system is in default state + When I access REST API endpoint "mcp-servers" using HTTP GET method + Then The status code of the response is 200 + And the body of the response has the following structure + """ + { + "servers": [ + { + "name": "mcp-oauth", + "url": "http://mock-mcp:3001", + "provider_id": "model-context-protocol", + "source": "config" + } + ] + } + """ + + Scenario: Register duplicate MCP server returns 409 + Given The system is in default state + When I access REST API endpoint "mcp-servers" using HTTP POST method + """ + {"name": "mcp-oauth", "url": "http://mock-mcp:3001", "provider_id": "model-context-protocol"} + """ + Then The status code of the response is 409 + And The body of the response contains already exists + + Scenario: Delete statically configured MCP server returns 403 + Given The system is in default state + When I access REST API endpoint "mcp-servers/mcp-oauth" using HTTP DELETE method + Then The status code of the response is 403 + And The body of the response contains statically configured + + Scenario: Delete non-existent MCP server returns 404 + Given The system is in default state + When I access REST API endpoint "mcp-servers/non-existent-server" using HTTP DELETE method + Then The status code of the response is 404 + And The body of the response contains Mcp Server not found + + Scenario: Register MCP server with missing required fields returns 422 + Given The system is in default state + When I access REST API endpoint "mcp-servers" using HTTP POST method + """ + {"url": "http://mock-mcp:3001"} + """ + Then The status code of the response is 422 + And The body of the response contains name + + Scenario: Register MCP server with invalid URL scheme returns 422 + Given The system is in default state + When I access REST API endpoint "mcp-servers" using HTTP POST method + """ + {"name": "bad-url-server", "url": "ftp://mock-mcp:3001", "provider_id": "model-context-protocol"} + """ + Then The status code of the response is 422 + And The body of the response contains scheme + + @skip-in-library-mode + Scenario: Register MCP server returns 503 when Llama Stack is unreachable + Given The system is in default state + And The llama-stack connection is disrupted + When I access REST API endpoint "mcp-servers" using HTTP POST method + """ + {"name": "unreachable-server", "url": "http://mock-mcp:3001", "provider_id": "model-context-protocol"} + """ + Then The status code of the response is 503 + And The body of the response contains Llama Stack + + @skip-in-library-mode + Scenario: Register and delete MCP server lifecycle + Given The system is in default state + When I access REST API endpoint "mcp-servers" using HTTP POST method + """ + {"name": "e2e-lifecycle-server", "url": "http://mock-mcp:3001", "provider_id": "model-context-protocol"} + """ + Then The status code of the response is 201 + And The body of the response contains e2e-lifecycle-server + And The body of the response contains registered successfully + When I access REST API endpoint "mcp-servers" using HTTP GET method + Then The status code of the response is 200 + And the body of the response has the following structure + """ + { + "servers": [ + { + "name": "mcp-oauth", + "source": "config" + }, + { + "name": "e2e-lifecycle-server", + "url": "http://mock-mcp:3001", + "provider_id": "model-context-protocol", + "source": "api" + } + ] + } + """ + When I access REST API endpoint "mcp-servers/e2e-lifecycle-server" using HTTP DELETE method + Then The status code of the response is 200 + And The body of the response contains e2e-lifecycle-server + And The body of the response contains unregistered successfully + When I access REST API endpoint "mcp-servers/e2e-lifecycle-server" using HTTP DELETE method + Then The status code of the response is 404 + When I access REST API endpoint "mcp-servers" using HTTP GET method + Then The status code of the response is 200 + And the body of the response has the following structure + """ + { + "servers": [ + { + "name": "mcp-oauth", + "source": "config" + } + ] + } + """ diff --git a/tests/e2e/features/mcp_servers_api_auth.feature b/tests/e2e/features/mcp_servers_api_auth.feature new file mode 100644 index 000000000..327cc87f6 --- /dev/null +++ b/tests/e2e/features/mcp_servers_api_auth.feature @@ -0,0 +1,30 @@ +@MCPServerAPIAuth +Feature: MCP Server Management API authentication tests + + Tests that the MCP server management endpoints enforce authentication + when authentication is enabled (noop-with-token module). + + Background: + Given The service is started locally + And REST API service prefix is /v1 + + Scenario: List MCP servers returns 401 without auth token + Given The system is in default state + And I remove the auth header + When I access REST API endpoint "mcp-servers" using HTTP GET method + Then The status code of the response is 401 + + Scenario: Register MCP server returns 401 without auth token + Given The system is in default state + And I remove the auth header + When I access REST API endpoint "mcp-servers" using HTTP POST method + """ + {"name": "auth-test-server", "url": "http://mock-mcp:3001", "provider_id": "model-context-protocol"} + """ + Then The status code of the response is 401 + + Scenario: Delete MCP server returns 401 without auth token + Given The system is in default state + And I remove the auth header + When I access REST API endpoint "mcp-servers/mcp-oauth" using HTTP DELETE method + Then The status code of the response is 401 diff --git a/tests/e2e/features/mcp_servers_api_no_config.feature b/tests/e2e/features/mcp_servers_api_no_config.feature new file mode 100644 index 000000000..49e5651a8 --- /dev/null +++ b/tests/e2e/features/mcp_servers_api_no_config.feature @@ -0,0 +1,18 @@ +@MCPNoConfig +Feature: MCP Server API tests without configured MCP servers + + Tests that the MCP server management endpoints work correctly + when no MCP servers are configured in lightspeed-stack.yaml. + + Background: + Given The service is started locally + And REST API service prefix is /v1 + + Scenario: List MCP servers returns empty list when none configured + Given The system is in default state + When I access REST API endpoint "mcp-servers" using HTTP GET method + Then The status code of the response is 200 + And The body of the response is the following + """ + {"servers": []} + """ diff --git a/tests/e2e/features/steps/common_http.py b/tests/e2e/features/steps/common_http.py index 07cb61940..5729edd6a 100644 --- a/tests/e2e/features/steps/common_http.py +++ b/tests/e2e/features/steps/common_http.py @@ -338,6 +338,19 @@ def access_rest_api_endpoint_post(context: Context, endpoint: str) -> None: ) +@when("I access REST API endpoint {endpoint} using HTTP DELETE method") +def access_rest_api_endpoint_delete(context: Context, endpoint: str) -> None: + """Send DELETE HTTP request to tested service.""" + endpoint = normalize_endpoint(endpoint) + base = f"http://{context.hostname}:{context.port}" + path = f"{context.api_prefix}/{endpoint}".replace("//", "/") + url = base + path + headers = context.auth_headers if hasattr(context, "auth_headers") else {} + context.response = None + + context.response = requests.delete(url, headers=headers, timeout=DEFAULT_TIMEOUT) + + @when("I access REST API endpoint {endpoint} using HTTP PUT method") def access_rest_api_endpoint_put(context: Context, endpoint: str) -> None: """Send PUT HTTP request with JSON payload to tested service. diff --git a/tests/e2e/test_list.txt b/tests/e2e/test_list.txt index 0da5cae41..988232bfa 100644 --- a/tests/e2e/test_list.txt +++ b/tests/e2e/test_list.txt @@ -16,4 +16,7 @@ features/rlsapi_v1_errors.feature features/streaming_query.feature features/rest_api.feature features/mcp.feature +features/mcp_servers_api.feature +features/mcp_servers_api_auth.feature +features/mcp_servers_api_no_config.feature features/models.feature diff --git a/tests/integration/test_openapi_json.py b/tests/integration/test_openapi_json.py index 05ccc83f8..46193eff6 100644 --- a/tests/integration/test_openapi_json.py +++ b/tests/integration/test_openapi_json.py @@ -220,6 +220,17 @@ def test_servers_section_present_from_url(spec_from_url: dict[str, Any]) -> None "get", {"200", "401", "403", "404", "500", "503"}, ), + ( + "/v1/mcp-servers", + "post", + {"201", "401", "403", "409", "500", "503"}, + ), + ("/v1/mcp-servers", "get", {"200", "401", "403", "500"}), + ( + "/v1/mcp-servers/{name}", + "delete", + {"200", "401", "403", "404", "500", "503"}, + ), ("/v1/query", "post", {"200", "401", "403", "404", "422", "429", "500", "503"}), ( "/v1/streaming_query", @@ -312,6 +323,17 @@ def test_paths_and_responses_exist_from_file( "get", {"200", "401", "403", "404", "500", "503"}, ), + ( + "/v1/mcp-servers", + "post", + {"201", "401", "403", "409", "500", "503"}, + ), + ("/v1/mcp-servers", "get", {"200", "401", "403", "500"}), + ( + "/v1/mcp-servers/{name}", + "delete", + {"200", "401", "403", "404", "500", "503"}, + ), ("/v1/query", "post", {"200", "401", "403", "404", "422", "429", "500", "503"}), ( "/v1/streaming_query",