Subscriptions
Subscriptions allow you to stream real-time data from your server to clients using Server-Sent Events (SSE). With tRPC v11, subscriptions use async generators — a native JavaScript pattern for producing a sequence of values over time.
If you are not sure about tRPC subscriptions, you can dive into the concept in their official documentation.
Writing Subscriptions
Subscriptions work just like queries and mutations but use the @Subscription() decorator and an async generator method.
The generator yields values that are streamed to the client as SSE events.
import { Router, Subscription, Input, Options } from 'nestjs-trpc';
import { Inject } from '@nestjs/common';
import { z } from 'zod';
import { EventService } from './event.service';
@Router({ alias: 'events' })
export class EventRouter {
constructor(@Inject(EventService) private eventService: EventService) {}
@Subscription({
input: z.object({ channelId: z.string() }),
})
async *onMessage(
@Input('channelId') channelId: string,
@Options() opts: { signal?: AbortSignal },
) {
for await (const event of this.eventService.listen(channelId, opts.signal)) {
yield event;
}
}
}The @Subscription() decorator works identically to @Query() and @Mutation() — it accepts input and output schemas for validation and type inference.
Subscriptions require tRPC v11 or later. The existing Express and Fastify adapters handle SSE transport automatically — no additional setup or WebSocket configuration is needed.
Cleanup with AbortSignal
When a client disconnects, tRPC signals the generator to stop via opts.signal. Use this to clean up resources:
@Subscription({
input: z.object({ channelId: z.string() }),
})
async *onMessage(
@Input('channelId') channelId: string,
@Options() opts: { signal?: AbortSignal },
) {
let count = 0;
while (!opts.signal?.aborted) {
await new Promise((resolve) => setTimeout(resolve, 1000));
yield { message: `Event ${count++}`, timestamp: Date.now() };
}
}Input Validation
Subscription input is validated the same way as queries and mutations — pass a Zod schema to the input option:
@Subscription({
input: z.object({
channelId: z.string(),
since: z.date().optional(),
}),
})
async *onMessage(
@Input('channelId') channelId: string,
@Input('since') since: Date | undefined,
) {
// ...
}Client Usage
On the client side, subscriptions are consumed using the subscription method from your tRPC client. The exact API depends on your client framework — refer to the tRPC client subscription docs for framework-specific examples.
// React example with @trpc/react-query
const { data } = trpc.events.onMessage.useSubscription(
{ channelId: 'general' },
{
onData(event) {
console.log('Received:', event);
},
},
);Middlewares
Subscriptions support the same @UseMiddlewares() decorator as queries and mutations. Middlewares execute before the generator starts:
@UseMiddlewares(AuthMiddleware)
@Subscription({
input: z.object({ channelId: z.string() }),
})
async *onMessage(@Input('channelId') channelId: string) {
// Only authenticated users reach this point
}Dependency Injection
Subscription routers fully support Dependency Injection, just like query and mutation routers. Inject services through the constructor to access your business logic.