Skip to content

subscriptions

Client-side subscriptions/listen driver (2026-07-28, SEP-2575).

listen() opens the stream as an async context manager: entering waits for the server's acknowledgment, iteration yields typed change events, a graceful server close ends the loop, and an abrupt drop raises SubscriptionLost. There is no replay and no automatic re-listen: a client that re-opens a subscription refetches what it depends on.

PromptsListChanged dataclass

The server's prompt list changed.

Source code in src/mcp/shared/subscriptions.py
44
45
46
@dataclass(frozen=True)
class PromptsListChanged:
    """The server's prompt list changed."""

ResourcesListChanged dataclass

The server's resource list changed.

Source code in src/mcp/shared/subscriptions.py
49
50
51
@dataclass(frozen=True)
class ResourcesListChanged:
    """The server's resource list changed."""

ResourceUpdated dataclass

The resource at uri changed and may need to be read again.

Source code in src/mcp/shared/subscriptions.py
54
55
56
57
58
@dataclass(frozen=True)
class ResourceUpdated:
    """The resource at `uri` changed and may need to be read again."""

    uri: str

ServerEvent module-attribute

An event a server publishes for delivery to listen subscribers.

ToolsListChanged dataclass

The server's tool list changed.

Source code in src/mcp/shared/subscriptions.py
39
40
41
@dataclass(frozen=True)
class ToolsListChanged:
    """The server's tool list changed."""

ListenNotSupportedError

Bases: RuntimeError

subscriptions/listen requires a 2026-07-28 connection.

Source code in src/mcp/client/subscriptions.py
59
60
61
62
63
64
65
66
67
68
class ListenNotSupportedError(RuntimeError):
    """`subscriptions/listen` requires a 2026-07-28 connection."""

    def __init__(self, negotiated_version: str | None) -> None:
        self.negotiated_version = negotiated_version
        super().__init__(
            f"subscriptions/listen is not available at protocol version {negotiated_version!r}; it requires "
            "2026-07-28. On earlier versions use subscribe_resource() and the change notifications delivered "
            "through message_handler."
        )

SubscriptionLost

Bases: RuntimeError

The stream ended without the server's graceful close; re-listen and refetch.

Source code in src/mcp/client/subscriptions.py
71
72
class SubscriptionLost(RuntimeError):
    """The stream ended without the server's graceful close; re-listen and refetch."""

ListenRoute

Package-internal demux state for one listen stream, fed synchronously in receive order by the session.

Source code in src/mcp/client/subscriptions.py
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
class ListenRoute:
    """Package-internal demux state for one listen stream, fed synchronously in receive order by the session."""

    def __init__(self) -> None:
        self.honored: types.SubscriptionFilter | None = None
        self.acked = anyio.Event()
        self.error: MCPError | None = None
        self.end: _SubscriptionEnd | None = None
        self._honored_uris: frozenset[str] = frozenset()
        self._pending: dict[ServerEvent, None] = {}
        self._wake = anyio.Event()

    def set_acked(self, honored: types.SubscriptionFilter) -> None:
        """Record the acknowledged filter; the first ack wins."""
        if not self.acked.is_set():
            self.honored = honored
            self._honored_uris = frozenset(honored.resource_subscriptions or ())
            self.acked.set()

    def deliver(self, event: ServerEvent) -> None:
        """Queue an event within the honored filter, deduplicated against the backlog.

        Any `ResourceUpdated` is admitted once URI subscriptions were honored at
        all: the spec allows the stamped URI to be a sub-resource of a subscribed one.
        """
        if self.end is not None or self.honored is None:
            return
        if isinstance(event, ResourceUpdated):
            admitted = bool(self._honored_uris)
        else:
            admitted = event_matches(self.honored, self._honored_uris, event)
        if not admitted or event in self._pending:
            return
        if len(self._pending) >= _MAX_PENDING_EVENTS:
            self.settle(
                "lost",
                error=MCPError(
                    types.INTERNAL_ERROR,
                    f"subscription backlog exceeded {_MAX_PENDING_EVENTS} unconsumed events; re-listen and refetch",
                ),
            )
            return
        self._pending[event] = None
        self._wake.set()

    def settle(self, end: _SubscriptionEnd, error: MCPError | None = None) -> None:
        """Record the stream's end; the first reason wins and wakes both waiters."""
        if self.end is None:
            self.end = end
            self.error = error
            self.acked.set()
            self._wake.set()

    async def next_event(self) -> ServerEvent | _SubscriptionEnd:
        """Peek the next pending event, or the stream's end once the backlog drains.

        A "local" end short-circuits the backlog; the other endings drain it first,
        so a graceful close never swallows events that preceded it.
        """
        while True:
            # Snapshot the wake event before checking state so a deliver landing after the checks cannot be missed.
            wake = self._wake
            if self.end == "local":
                return self.end
            if self._pending:
                return next(iter(self._pending))
            if self.end is not None:
                return self.end
            await wake.wait()
            self._wake = anyio.Event()

    def consume(self, event: ServerEvent) -> None:
        """Remove a peeked event from the backlog."""
        self._pending.pop(event, None)

set_acked

set_acked(honored: SubscriptionFilter) -> None

Record the acknowledged filter; the first ack wins.

Source code in src/mcp/client/subscriptions.py
87
88
89
90
91
92
def set_acked(self, honored: types.SubscriptionFilter) -> None:
    """Record the acknowledged filter; the first ack wins."""
    if not self.acked.is_set():
        self.honored = honored
        self._honored_uris = frozenset(honored.resource_subscriptions or ())
        self.acked.set()

deliver

deliver(event: ServerEvent) -> None

Queue an event within the honored filter, deduplicated against the backlog.

Any ResourceUpdated is admitted once URI subscriptions were honored at all: the spec allows the stamped URI to be a sub-resource of a subscribed one.

Source code in src/mcp/client/subscriptions.py
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
def deliver(self, event: ServerEvent) -> None:
    """Queue an event within the honored filter, deduplicated against the backlog.

    Any `ResourceUpdated` is admitted once URI subscriptions were honored at
    all: the spec allows the stamped URI to be a sub-resource of a subscribed one.
    """
    if self.end is not None or self.honored is None:
        return
    if isinstance(event, ResourceUpdated):
        admitted = bool(self._honored_uris)
    else:
        admitted = event_matches(self.honored, self._honored_uris, event)
    if not admitted or event in self._pending:
        return
    if len(self._pending) >= _MAX_PENDING_EVENTS:
        self.settle(
            "lost",
            error=MCPError(
                types.INTERNAL_ERROR,
                f"subscription backlog exceeded {_MAX_PENDING_EVENTS} unconsumed events; re-listen and refetch",
            ),
        )
        return
    self._pending[event] = None
    self._wake.set()

settle

settle(
    end: _SubscriptionEnd, error: MCPError | None = None
) -> None

Record the stream's end; the first reason wins and wakes both waiters.

Source code in src/mcp/client/subscriptions.py
120
121
122
123
124
125
126
def settle(self, end: _SubscriptionEnd, error: MCPError | None = None) -> None:
    """Record the stream's end; the first reason wins and wakes both waiters."""
    if self.end is None:
        self.end = end
        self.error = error
        self.acked.set()
        self._wake.set()

next_event async

next_event() -> ServerEvent | _SubscriptionEnd

Peek the next pending event, or the stream's end once the backlog drains.

A "local" end short-circuits the backlog; the other endings drain it first, so a graceful close never swallows events that preceded it.

Source code in src/mcp/client/subscriptions.py
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
async def next_event(self) -> ServerEvent | _SubscriptionEnd:
    """Peek the next pending event, or the stream's end once the backlog drains.

    A "local" end short-circuits the backlog; the other endings drain it first,
    so a graceful close never swallows events that preceded it.
    """
    while True:
        # Snapshot the wake event before checking state so a deliver landing after the checks cannot be missed.
        wake = self._wake
        if self.end == "local":
            return self.end
        if self._pending:
            return next(iter(self._pending))
        if self.end is not None:
            return self.end
        await wake.wait()
        self._wake = anyio.Event()

consume

consume(event: ServerEvent) -> None

Remove a peeked event from the backlog.

Source code in src/mcp/client/subscriptions.py
146
147
148
def consume(self, event: ServerEvent) -> None:
    """Remove a peeked event from the backlog."""
    self._pending.pop(event, None)

OnEvent module-attribute

OnEvent = Callable[[ServerEvent], Awaitable[None]]

Per-event barrier awaited before a Subscription returns each event to its consumer.

Subscription

One open subscriptions/listen stream: an async iterator of typed events.

Produced by listen() / Client.listen(), not constructed directly.

Source code in src/mcp/client/subscriptions.py
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
class Subscription:
    """One open `subscriptions/listen` stream: an async iterator of typed events.

    Produced by `listen()` / `Client.listen()`, not constructed directly.
    """

    def __init__(
        self,
        route: ListenRoute,
        subscription_id: types.RequestId,
        honored: types.SubscriptionFilter,
        on_event: OnEvent | None = None,
    ):
        self._route = route
        self._on_event = on_event
        self.subscription_id = subscription_id
        """The listen request's JSON-RPC id, stamped into every frame's `_meta`."""
        self.honored = honored
        """The subset of the requested filter the server agreed to deliver."""

    def __aiter__(self) -> Subscription:
        return self

    async def __anext__(self) -> ServerEvent:
        """Yield the next change event; the loop ends when the stream does.

        Raises:
            SubscriptionLost: the stream dropped without the server's graceful close.
        """
        outcome = await self._route.next_event()
        if isinstance(outcome, str):
            if outcome == "lost":
                raise SubscriptionLost(
                    f"subscription {self.subscription_id!r} ended without the server's graceful close;"
                    " re-listen and refetch"
                ) from self._route.error
            raise StopAsyncIteration
        if self._on_event is not None:
            # The event stays pending while the barrier runs: a cancellation or a
            # raising barrier leaves it for the next anext instead of dropping it.
            await self._on_event(outcome)
        self._route.consume(outcome)
        return outcome

subscription_id instance-attribute

subscription_id = subscription_id

The listen request's JSON-RPC id, stamped into every frame's _meta.

honored instance-attribute

honored = honored

The subset of the requested filter the server agreed to deliver.

__anext__ async

__anext__() -> ServerEvent

Yield the next change event; the loop ends when the stream does.

Raises:

Type Description
SubscriptionLost

the stream dropped without the server's graceful close.

Source code in src/mcp/client/subscriptions.py
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
async def __anext__(self) -> ServerEvent:
    """Yield the next change event; the loop ends when the stream does.

    Raises:
        SubscriptionLost: the stream dropped without the server's graceful close.
    """
    outcome = await self._route.next_event()
    if isinstance(outcome, str):
        if outcome == "lost":
            raise SubscriptionLost(
                f"subscription {self.subscription_id!r} ended without the server's graceful close;"
                " re-listen and refetch"
            ) from self._route.error
        raise StopAsyncIteration
    if self._on_event is not None:
        # The event stays pending while the barrier runs: a cancellation or a
        # raising barrier leaves it for the next anext instead of dropping it.
        await self._on_event(outcome)
    self._route.consume(outcome)
    return outcome

listen async

listen(
    session: ClientSession,
    *,
    tools_list_changed: bool = False,
    prompts_list_changed: bool = False,
    resources_list_changed: bool = False,
    resource_subscriptions: Sequence[str] = (),
    on_event: OnEvent | None = None
) -> AsyncIterator[Subscription]

Open one subscriptions/listen stream on session (2026-07-28 only).

Entering sends the request and returns once the server's acknowledgment arrives; exiting ends the subscription. on_event is awaited before each event is returned - the seam Client.listen uses to finish cache eviction before the consumer can refetch.

Raises:

Type Description
ListenNotSupportedError

negotiated version predates 2026-07-28.

MCPError

the server rejected the request, or the connection failed pre-ack.

SubscriptionLost

the stream ended before it was acknowledged.

TimeoutError

the session's read timeout elapsed before the acknowledgment.

Source code in src/mcp/client/subscriptions.py
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
@asynccontextmanager
async def listen(
    session: ClientSession,
    *,
    tools_list_changed: bool = False,
    prompts_list_changed: bool = False,
    resources_list_changed: bool = False,
    resource_subscriptions: Sequence[str] = (),
    on_event: OnEvent | None = None,
) -> AsyncIterator[Subscription]:
    """Open one `subscriptions/listen` stream on `session` (2026-07-28 only).

    Entering sends the request and returns once the server's acknowledgment
    arrives; exiting ends the subscription. `on_event` is awaited before each
    event is returned - the seam `Client.listen` uses to finish cache eviction
    before the consumer can refetch.

    Raises:
        ListenNotSupportedError: negotiated version predates 2026-07-28.
        MCPError: the server rejected the request, or the connection failed pre-ack.
        SubscriptionLost: the stream ended before it was acknowledged.
        TimeoutError: the session's read timeout elapsed before the acknowledgment.
    """
    if session.protocol_version not in MODERN_PROTOCOL_VERSIONS:
        raise ListenNotSupportedError(session.protocol_version)
    if isinstance(resource_subscriptions, str):
        raise TypeError("resource_subscriptions takes a sequence of URIs, not a bare string")
    request = types.SubscriptionsListenRequest(
        params=types.SubscriptionsListenRequestParams(
            notifications=types.SubscriptionFilter(
                tools_list_changed=tools_list_changed or None,
                prompts_list_changed=prompts_list_changed or None,
                resources_list_changed=resources_list_changed or None,
                resource_subscriptions=list(resource_subscriptions) or None,
            )
        )
    )
    task_group = session._task_group  # pyright: ignore[reportPrivateUsage]
    if task_group is None:
        raise RuntimeError("listen() requires an entered session")
    request_id: types.RequestId = f"listen-{next(_listen_ids)}"
    data = request.model_dump(by_alias=True, mode="json", exclude_none=True)
    opts: CallOptions = {"request_id": request_id}
    session._stamp(data, opts)  # pyright: ignore[reportPrivateUsage]
    driver_scope = anyio.CancelScope()

    async def drive() -> None:
        # Deliberately no result timeout: the response arrives when the stream ends.
        with driver_scope:
            try:
                await session._dispatcher.send_raw_request(  # pyright: ignore[reportPrivateUsage]
                    data["method"], data.get("params"), opts
                )
            except MCPError as error:
                route.settle("lost", error=error)
                return
            except ValueError as error:
                # A raw request id collided with our minted listen id: fail this subscription
                # and release the route in this same slice, so it cannot consume the raw caller's ack.
                session._unregister_listen_route(request_id)  # pyright: ignore[reportPrivateUsage]
                route.settle("lost", error=MCPError(types.INTERNAL_ERROR, str(error)))
                return
            # A result, whatever its body, is the spec's graceful close; with no prior ack
            # it opens the subscription already closed.
            route.set_acked(types.SubscriptionFilter())
            route.settle("graceful")

    # Register the demux route before the request is written so the ack cannot race it.
    route = session._register_listen_route(request_id)  # pyright: ignore[reportPrivateUsage]
    try:
        task_group.start_soon(drive)
        with anyio.fail_after(session._session_read_timeout_seconds):  # pyright: ignore[reportPrivateUsage]
            await route.acked.wait()
        if route.honored is None:
            # Only reachable on failure paths: a graceful no-ack result acked an empty filter in drive().
            if route.error is not None:
                raise route.error
            raise SubscriptionLost(f"subscription {request_id!r} ended before it was acknowledged")
        yield Subscription(route, request_id, route.honored, on_event)
    finally:
        route.settle("local")
        driver_scope.cancel()
        session._unregister_listen_route(request_id)  # pyright: ignore[reportPrivateUsage]