Subscriptions
A server's catalog is not fixed. Tools get registered at runtime, resources change behind their URIs. The client side of that story is a subscription: on the 2026-07-28 protocol, a client that wants to hear about changes sends one subscriptions/listen request, and the response to that request is the stream — it stays open, carrying exactly the notification kinds the client asked for.
Your side of it is one line: publish the change.
from mcp.server.mcpserver import Context, MCPServer
mcp = MCPServer("Notebook")
NOTES = {"todo": "buy milk", "journal": "day one"}
@mcp.resource("note://{name}")
def note(name: str) -> str:
return NOTES[name]
@mcp.tool()
async def edit_note(name: str, text: str, ctx: Context) -> str:
NOTES[name] = text
await ctx.notify_resource_updated(f"note://{name}")
return "saved"
def search(query: str) -> list[str]:
return [name for name, text in NOTES.items() if query in text]
@mcp.tool()
async def enable_search(ctx: Context) -> str:
mcp.add_tool(search)
await ctx.notify_tools_changed()
return "search is live"
await ctx.notify_resource_updated("note://todo")deliversnotifications/resources/updatedto every open listen stream that subscribed to that URI. Not to anyone else.await ctx.notify_tools_changed()deliversnotifications/tools/list_changedto every stream that asked for tool-list changes. A client that receives it callstools/listagain — and now seessearch.- The siblings are
notify_prompts_changed()andnotify_resources_changed(), for the other two list-changed kinds. - No subscribers, no work: publishing to an idle server is a no-op. You don't check whether anyone is listening; you state what changed.
The SDK serves subscriptions/listen for you — MCPServer registers the handler at construction, and the wire obligations (the acknowledgment as the first frame, the per-stream filtering, the subscription id tagged onto every frame) are its job, not yours.
Check
On the wire, a stream whose filter named note://todo looks like this after edit_note runs:
{"method": "notifications/subscriptions/acknowledged",
"params": {"notifications": {"resourceSubscriptions": ["note://todo"]}, "_meta": {"io.modelcontextprotocol/subscriptionId": 7}}}
{"method": "notifications/resources/updated",
"params": {"uri": "note://todo", "_meta": {"io.modelcontextprotocol/subscriptionId": 7}}}
The acknowledgment echoes the filter the server agreed to honor, and every frame carries the
listen request's JSON-RPC id under _meta — that id is the subscription id.
Only what was asked for
The filter is a contract. A stream that requested tool-list changes and one resource URI receives those two kinds and nothing else — publish a prompt change and that stream stays silent. Resource URIs are matched as exact strings: note://todo does not cover note://todo/draft.
Warning
Filters are honored without per-client authorization: any client may name any URI —
including one it cannot read — and will receive update notifications for it (resource
existence and change timing, never content). On a multi-tenant server, don't publish
sensitive per-user URIs through notify_resource_updated, or serve the method with
your own handler on the low-level Server and narrow the filter there before acking —
the honored subset exists in the protocol precisely so servers can do this.
Two more things the stream is not:
- It is not a replay log. A dropped stream is gone; events published while nobody was connected are not queued. The client's contract is to re-listen and re-fetch what it cares about.
- It is not the 2025 path. Clients on earlier protocol versions that called
resources/subscribeare served byctx.session.send_resource_updated(uri)— thenotify_*methods reachsubscriptions/listenstreams only.
Streamable HTTP only, for now
subscriptions/listen is served on the streamable-HTTP transport. Over stdio (and other
stream-pair transports) a 2026-07-28 connection rejects it with METHOD_NOT_FOUND — the
open-stream semantics haven't been built for that transport yet, even though
server/discover still advertises the subscription capabilities there.
One process is the default. More takes a bus
Publishes travel from your handler to the open streams over a SubscriptionBus. The default is in-memory: one process, every stream in it. That is the right answer until you run replicas behind a load balancer — then a client's stream is pinned to one replica, and a publish on another replica has to reach it.
That seam is yours to implement: two methods over your pub/sub backend.
class RedisSubscriptionBus:
async def publish(self, event: ServerEvent) -> None:
await self.redis.publish("mcp-events", encode(event)) # to every replica
def subscribe(self, listener: Callable[[ServerEvent], None]) -> Callable[[], None]:
... # register the local listener; a reader task calls it for arriving events
mcp = MCPServer("Notebook", subscriptions=RedisSubscriptionBus(...))
The bus carries typed ServerEvent values — four small dataclasses — never JSON-RPC. Stamping, filtering, and stream lifecycles stay in the SDK, so a bus implementation cannot break the protocol; it can only move events between processes. To publish from outside a request, keep a reference to the bus you constructed and await bus.publish(ToolsListChanged()) — the server holds the same instance.
The low-level composition
Down on the low-level Server there is no pre-wired anything — and the same parts assemble in three lines:
from typing import Any
import mcp_types as types
from mcp.server.context import ServerRequestContext
from mcp.server.lowlevel import Server
from mcp.server.subscriptions import InMemorySubscriptionBus, ListenHandler, ResourceUpdated
bus = InMemorySubscriptionBus()
NOTES = {"todo": "buy milk"}
EDIT_NOTE_SCHEMA: dict[str, Any] = {
"type": "object",
"properties": {"name": {"type": "string"}, "text": {"type": "string"}},
"required": ["name", "text"],
}
async def list_tools(
ctx: ServerRequestContext[Any], params: types.PaginatedRequestParams | None
) -> types.ListToolsResult:
return types.ListToolsResult(
tools=[types.Tool(name="edit_note", description="Replace a note's text.", input_schema=EDIT_NOTE_SCHEMA)]
)
async def call_tool(ctx: ServerRequestContext[Any], params: types.CallToolRequestParams) -> types.CallToolResult:
args = params.arguments or {}
NOTES[args["name"]] = args["text"]
await bus.publish(ResourceUpdated(uri=f"note://{args['name']}"))
return types.CallToolResult(content=[types.TextContent(type="text", text="saved")])
server = Server(
"notebook",
on_list_tools=list_tools,
on_call_tool=call_tool,
on_subscriptions_listen=ListenHandler(bus),
)
- You own the bus, so you publish to it directly:
await bus.publish(ResourceUpdated(uri=...)). Put it wherever your handlers can reach it — module scope here, the lifespan in a bigger app. ListenHandler(bus)is the same handlerMCPServerregisters;on_subscriptions_listen=is an ordinary handler slot. Don't want the SDK's semantics? Write your own handler for the slot — the spec obligations come with it.ListenHandler.close()gracefully ends every open stream: each one receives the listen request's result as its final frame, the spec's signal that the server ended the subscription deliberately — a clean end, as opposed to the abrupt drop a client may treat as a cue to reconnect. Without it, streams end when the client disconnects.
The client side
Consuming a subscription is one context manager:
from mcp import Client
from mcp.client.subscriptions import ResourceUpdated
from .tutorial001 import mcp
async def watch_todo() -> str:
"""Wait for the todo note to change once, then stop listening."""
async with Client(mcp) as client:
async with client.listen(resource_subscriptions=["note://todo"]) as sub:
async for event in sub:
assert isinstance(event, ResourceUpdated)
return f"changed: {event.uri}"
return "the server closed the stream before any change"
client.listen(...)takes the filter as keyword arguments — they mirror the wireSubscriptionFilterfield for field. Entering sends the request and returns once the server's acknowledgment arrives, sosub.honored(the subset the server agreed to deliver) is always there before the first event.- Iteration yields the same four typed events the server publishes:
ToolsListChanged,PromptsListChanged,ResourcesListChanged, andResourceUpdated(uri=...)— where the URI may be a sub-resource of one you subscribed to, at the server's discretion. An event is a cue to refetch — it carries no payload beyond identity, and duplicates pending consumption collapse into one. - Leaving the block ends the subscription, with the transport's own spelling: over streamable HTTP the request's response stream is closed (that is the 2026 cancellation signal), on stream transports
notifications/cancelledis sent. - The stream's two endings are control flow. The server closing gracefully simply ends the
async for; an abrupt drop raisesSubscriptionLost. The distinction is diagnostic — a clean end versus a connection worth suspecting — not a difference in what to do next: either way the stream is gone, nothing is replayed, and a watcher that still cares re-listens and refetches. Servers close streams gracefully for their own reasons — shutdown, or shedding a subscriber whose backlog grew past bounds, as this SDK'sListenHandlerdoes — so a graceful close is not a signal to stop watching:
import anyio
from mcp import Client
from mcp.client.subscriptions import SubscriptionLost
async def watch(client: Client, uri: str) -> None:
"""Keep one resource fresh for as long as the client lives."""
while True:
try:
async with client.listen(resource_subscriptions=[uri]) as sub:
await client.read_resource(uri) # refetch: no replay across streams
async for _event in sub:
await client.read_resource(uri)
except SubscriptionLost:
pass
# Graceful close or abrupt drop, the stream is gone either way. Back
# off before re-listening - a graceful close may be the server
# shedding load, and reconnecting instantly recreates the pressure.
await anyio.sleep(1)
- Checking the acknowledgment (the spec's client SHOULD) is reading
sub.honored— the kinds this stream will actually receive. A server may narrow the filter it agrees to honor (a multi-tenant server declining a URI, say), andsub.honoredis that delivery contract — it says nothing about what exists in the catalog. Multiple subscriptions may be open concurrently; each demultiplexes by its own subscription id. - Tool calls and other requests run freely beside an open stream — from the same task between events, or from sibling tasks sharing the client. A watcher task that refetches inside its event loop is the intended pattern, not a re-entrancy hazard.
listen()requires a 2026-07-28 connection and raisesListenNotSupportedErroron older ones, steering to the deprecatedsubscribe_resourceandmessage_handlerspelling those wires use.
Recap
- A client opts in with one
subscriptions/listenrequest; the response is the stream. There is nothing to configure server-side — serving it is built in. - You publish:
await ctx.notify_resource_updated(uri),notify_tools_changed(),notify_prompts_changed(),notify_resources_changed(). Idle servers make these free. - Streams receive only what their filter requested; URIs match exactly; nothing is replayed.
- Scaling out means implementing
SubscriptionBus— two methods — over your own pub/sub, and passing it asMCPServer(subscriptions=...). - The low-level spelling is the same machinery held in your hands: a bus,
ListenHandler(bus), one constructor argument. - Consuming is
async with client.listen(...)andasync for event in sub— typed events, honored filter on the handle, clean end vsSubscriptionLost.