Skip to content

Add client subscription and server event broadcasting #1512

@venetak

Description

@venetak

Question

Hello,

I am trying to implement a pub/sub pattern for client event subscriptions. I need it to enable communication between AI agents via an MCP server. This is the message flow that I'm trying to achieve, but between two clients like this:

  1. ClientA subscribe → MCP

    • ClientA registers itself as a subscriber with the MCP server
    • It's expressing interest in receiving certain types of events/messages
  2. ClientB trigger event → MCP

    • ClientB sends an event or message to the MCP server
    • ClientB acts as a publisher/producer of events
  3. MCP broadcast → subscribers(ClientA)

    • The MCP server receives the event from ClientB
    • It then broadcasts/forwards that event to all registered subscribers
    • In this case, ClientA receives the event

I can send notifications through the context(this is pseudo code):

MCP Server:

async def logging_enabled_tool(ctx: Context) -> None:
    """Send back notifications to current session."""

    log_notification = LoggingMessageNotification(
        method="notifications/message",
        params={ "level": "info", "data": "Some message..."}
    )

    await ctx.session.send_notification(log_notification)

MCP Client:

async def handle_notifications(params: LoggingMessageNotificationParams) -> None:
    """Handle logging notifications."""
    print(f"Received log message: {params}")

 async with streamablehttp_client(
            f"{app_url}", auth=DatabricksOAuthClientProvider(workspace_client)
        ) as (read_stream, write_stream, _), ClientSession(
            read_stream, write_stream, logging_callback=handle_notifications
        ) as session:
             await session.initialize()

            result = await session.call_tool("logging_enabled_tool")
            print(f"Result from tool: {result.content}")

This example works because the notification is sent back to the client during tool execution. However, I need an option to subscribe multiple sessions to events that will be asynchronously broadcast by the server at any point in time.

ClientA:

client_session.call_tool("subscribe", { "event": "event-name" })
# response: successfully subscribed to event...

Server:

def subscribe(server, ctx: Context) -> str:
    # keep reference to all subscribed sessions
    server.subscribers.append(ctx.session)
    return "Subscribed to notifications."

async def broadcast_message(arguments: Dict[str, Any]) -> None:
    """Broadcast a message to all connected sessions."""
    # include arguments.get("event") in the LoggingMessageNotification
    log_notification = LoggingMessageNotification(...)

    for session in server.subscribers:
        await session.send_notification(log_notification)

ClientB:

client_session.call_tool("broadcast_message", { "event": "event-name" })
# response: broadcasted message to N clients...

For this to work, ClientA should await indefinitely; otherwise, the handle_notifications callback is not called:

 async with streamablehttp_client(...) as (...), ClientSession(
            read_stream, write_stream, logging_callback=handle_notifications
        ) as session:
           # initialize... 
           # subscribe...
          
           # and wait
           await asyncio.Future()

This approach doesn't work for my needs as I am initializing connection to the MCP from an AI agent and I need the agent to stay responsive, this blocks all execution.

Not sure if the streamablehttp transport is the best approach, I'm trying to follow the documentation as much as possible. Other suggestions are more than welcome. The core functionality that I need is async communication between Agents. The MCP is also required as we need it for other important tools. We thought we can also use it for communication between agents.

A little more context about the environment:

Additional Context

No response

Metadata

Metadata

Assignees

No one assigned

    Labels

    questionFurther information is requested

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions