Sending realtime updates
Up until now, all the changes you've made in the backend — adding todos, uploading documents, generating reports — have been one-way or limited to the same client. For some features and other clients we need to manually refresh the page to see the updates.
Wouldn't it be nice if new documents just appeared as soon as someone uploaded them? Or if todos synced instantly across open tabs?
In this section, we'll make the app truly live.
Add broadcast workload
A broadcast workload acts as the realtime hub for your app — a WebSocket service that clients can subscribe for updates.
Generate one with:
npx monolayer add broadcastDefine channels
Channels are logical topics clients can subscribe to and publish events through. You can think of them as rooms in a chat app — each one handles a separate stream of updates.
Change the contents workloads/broadcast.ts
import type { Todos } from "@/lib/prisma/generated/client";
import { Broadcast, ChannelData } from "@monolayer/sdk";
export type TodosChannelData =
| { action: "add"; todo: Todos }
| { action: "delete"; todoId: string };
export type DocumentsChannelData =
| { action: "add"; document: { key: string } }
| { action: "delete"; document: { key: string } };
const broadcast = new Broadcast({
channels: {
"/todos": {
data: new ChannelData<TodosChannelData>(),
},
"/documents": {
data: new ChannelData<DocumentsChannelData>(),
},
},
});
export default broadcast;
export type Channels = typeof broadcast._channelDataType;Publish and subscribe to channels (Documents)
Let's make the Documents tab update live when a new file is uploaded or deleted.
"use server";
import { publisher } from "@/lib/broadcast/publisher";
import { s3Client } from "@/lib/bucket/client";
import documents from "@/workloads/documents";
import { DeleteObjectCommand } from "@aws-sdk/client-s3";
import { revalidatePath } from "next/cache";
export async function deleteDocument(key: string) {
const command = new DeleteObjectCommand({
Bucket: documents.name,
Key: key,
});
await s3Client.send(command);
await publisher.publishTo("/documents", {}, [
{ action: "delete", document: { key } },
]);
revalidatePath("/");
}
export async function publishAddDocument(key: string) {
await publisher.publishTo("/documents", {}, [
{ action: "add", document: { key } },
]);
} "use client";
import { useChannelSubscription } from "@/lib/broadcast/client";
import type { DocumentsChannelData } from "@/workloads/broadcast";
import { createContext, use, useOptimistic } from "react";
export type BucketItem = { key: string; optimistic?: boolean };
type OptimisticAction =
| { action: "add"; document: BucketItem }
| { action: "delete"; document: BucketItem };
export const DocumentsContext = createContext<{
documents: BucketItem[];
addOptimistic: (action: OptimisticAction) => void;
subscriptionDocuments: DocumentsChannelData[];
} | null>(null);
export default function DocumentsProvider({
children,
items: initial,
}: React.PropsWithChildren & { items: Promise<BucketItem[]> }) {
const { all: subscriptionDocuments } = useChannelSubscription(
"/documents",
{},
);
const [documents, addOptimistic] = useOptimistic(
use(initial),
(currentState: BucketItem[], action: OptimisticAction) => {
switch (action.action) {
case "add":
return currentState.concat({
...action.document,
optimistic: true,
});
case "delete":
return currentState.filter(
(todo) => todo.key !== action.document.key,
);
}
},
);
return (
<DocumentsContext
value={{
documents,
addOptimistic,
subscriptionDocuments: subscriptionDocuments,
}}
>
{children}
</DocumentsContext>
);
}import { use } from "react";
import { DocumentsContext } from "./provider";
import { DocumentsContext, type BucketItem } from "./provider";
import type { DocumentsChannelData } from "@/workloads/broadcast";
export function useDocuments() {
const context = use(DocumentsContext);
if (!context) {
throw new Error("useLogViewer must be used within a <DocumentsProvider />");
}
return {
documents: context.documents,
documents: combineDocuments(
context.documents,
context.subscriptionDocuments,
),
addOptimistic: context.addOptimistic,
};
}
function combineDocuments(
items: BucketItem[],
subscriptionDocuments: DocumentsChannelData[],
) {
const deduplicatedItems = Object.values(
subscriptionDocuments.reduce<Record<string, DocumentsChannelData>>(
(acc, val) => {
acc[val.document.key] = val;
return acc;
},
{},
),
);
const subscriptionKeys = deduplicatedItems.map((item) => item.document.key);
return (
items
.filter((item) => !subscriptionKeys.includes(item.key))
.concat(
deduplicatedItems
.filter((item) => item.action === "add")
.map((item) => item.document),
)
);
} "use client";
import { Dropzone, DropzoneEmptyState } from "@/components/kibo-ui/dropzone";
import { useDocuments } from "./hooks";
import { startTransition } from "react";
import { uploadToBucket } from "@/lib/bucket/upload";
import documents from "@/workloads/documents";
import { revalidate } from "@/actions/revalidate";
import { publishAddDocument } from "@/actions/documents";
export function Upload() {
const { addOptimistic } = useDocuments();
const upload = async (files: File[]) => {
startTransition(async () => {
try {
for (const file of files) {
addOptimistic({ action: "add", document: { key: file.name } });
await uploadToBucket({
bucketName: documents.name,
file,
key: file.name,
});
await publishAddDocument(file.name);
}
await revalidate();
} catch (e) {
console.error(e);
}
});
};
return (
<div className="py-5">
<Dropzone onDrop={upload} onError={console.error}>
<DropzoneEmptyState />
</Dropzone>
</div>
);
}Publish and Subscribe to channels (Todos)
We can do the same for the Todos list.
"use server";
import { publisher } from "@/lib/broadcast/publisher";
import prisma from "@/lib/prisma/prisma";
import { revalidatePath } from "next/cache";
export type AddTodoState = {
input: string;
error?: string;
};
export async function addTodo(
_: unknown,
formData: FormData,
): Promise<AddTodoState> {
const text = formData.get("todo") as string | undefined;
if (text?.trim()) {
const todo = await prisma.todos.create({
data: { text, id: formData.get("optimisticId")?.toString() ?? undefined },
});
revalidatePath("/");
await publisher.publishTo("/todos", {}, [{ action: "add", todo }]);
return { input: "", error: undefined };
}
return {
error: "Todo cannot be empty",
input: text ?? "",
};
}
export type DeleteTodoState = {
error?: string;
success?: boolean;
};
export async function deleteTodo(formData: FormData): Promise<DeleteTodoState> {
const todoId = formData.get("todoId") as string | undefined;
if (todoId) {
try {
await prisma.todos.delete({ where: { id: todoId } });
revalidatePath("/");
await publisher.publishTo("/todos", {}, [
{ action: "delete", todoId: todoId },
]);
return { success: true };
} catch {
return { success: false, error: "Could not delete todo" };
}
}
return { success: false, error: "Could not delete todo" };
}"use client";
import { useChannelSubscription } from "@/lib/broadcast/client";
import type { Todos } from "@/lib/prisma/generated/client";
import type { TodosChannelData } from "@/workloads/broadcast";
import { createContext, use, useOptimistic } from "react";
type OptimisticAction =
| { kind: "add"; todo: Todos }
| { kind: "remove"; todoId: Todos["id"] };
export const TodosContext = createContext<{
todos: Todos[];
addOptimistic: (action: OptimisticAction) => void;
subscriptionTodos: TodosChannelData[];
} | null>(null);
export default function TodosProvider({
children,
todos: initial,
}: React.PropsWithChildren & { todos: Promise<Todos[]> }) {
const { all: subscriptionTodos } = useChannelSubscription("/todos", {});
const [todos, addOptimistic] = useOptimistic(
use(initial),
(currentState: Todos[], action: OptimisticAction) => {
switch (action.kind) {
case "add":
return currentState.concat(action.todo);
case "remove":
return currentState.filter((todo) => todo.id !== action.todoId);
}
},
);
return (
<TodosContext
value={{
todos,
addOptimistic,
subscriptionTodos,
}}
>
{children}
</TodosContext>
);
}import { TodosContext } from "./provider";
import { toast } from "sonner";
import { addTodo, deleteTodo, type AddTodoState } from "@/actions/todos";
import { use, useActionState } from "react";
import { v4 } from "uuid";
import type { Todos } from "@/lib/prisma/generated/client";
import type { TodosChannelData } from "@/workloads/broadcast";
export function useTodos() {
const context = use(TodosContext);
if (!context) {
throw new Error("useLogViewer must be used within a <TodosProvider />");
}
return {
todos: context.todos,
todos: combineTodos(context.todos, context.subscriptionTodos),
addOptimistic: context.addOptimistic,
};
}
export function useAddTodoActionState() {
const { addOptimistic } = useTodos();
const add = async (_: unknown, formData: FormData): Promise<AddTodoState> => {
const todo = formData.get("todo")?.toString();
const optimisticId = v4();
if (todo) {
const now = new Date();
addOptimistic({
kind: "add",
todo: {
id: optimisticId,
text: todo,
createdAt: now,
updatedAt: now,
},
});
}
formData.set("optimisticId", optimisticId);
return addTodo(_, formData);
};
return useActionState(add, {
input: "",
});
}
export function useDeleteTodoAction(todo: Todos) {
const { addOptimistic } = useTodos();
return async (form: FormData) => {
addOptimistic({ kind: "remove", todoId: todo.id });
const response = await deleteTodo(form);
if (response.error) toast.error(response.error);
};
}
function combineTodos(todos: Todos[], subscriptionItems: TodosChannelData[]) {
const deduplicatedItems = Object.values(
subscriptionItems.reduce<Record<string, TodosChannelData>>((acc, val) => {
const id = val.action === "add" ? val.todo.id : val.todoId;
acc[id] = val;
return acc;
}, {}),
);
const subscriptionIds = deduplicatedItems.map(
(item) => (item.action === "add" ? item.todo.id : item.todoId),
);
return (
todos
.filter((todo) => !subscriptionIds.includes(todo.id))
.concat(
deduplicatedItems
.filter((item) => item.action === "add")
.map((item) => item.todo),
)
.sort((a, b) => {
return (
new Date(a.createdAt).getTime() - new Date(b.createdAt).getTime()
);
})
);
} import { PutObjectCommand } from "@aws-sdk/client-s3";
import { Task } from "@monolayer/sdk";
import documents from "./documents";
import { publisher } from "@/lib/broadcast/publisher";
import { s3Client } from "@/lib/bucket/client";
export type UploadReportData = {
report: { message: string };
};
const uploadReport = new Task<UploadReportData>(
"generate-report",
async ({ data }) => {
const key = `Report-${new Date().toISOString()}`;
const command = new PutObjectCommand({
Bucket: documents.name,
Key: key,
Body: Buffer.from(data.report.message),
});
await s3Client.send(command);
await publisher.publishTo("/documents", {}, [
{ action: "add", document: { key } },
]);
},
);
export default uploadReport;import type { Metadata } from "next";
import { Geist, Geist_Mono } from "next/font/google";
import "./globals.css";
import { Toaster } from "@/components/ui/sonner";
import { BroadcastProvider } from "@monolayer/sdk";
const geistSans = Geist({
variable: "--font-geist-sans",
subsets: ["latin"],
});
const geistMono = Geist_Mono({
variable: "--font-geist-mono",
subsets: ["latin"],
});
export const metadata: Metadata = {
title: "Create Next App",
description: "Generated by create next app",
};
export default function RootLayout({
children,
}: Readonly<{
children: React.ReactNode;
}>) {
return (
<html lang="en">
<body
className={`${geistSans.variable} ${geistMono.variable} antialiased`}
>
<BroadcastProvider>{children}</BroadcastProvider> // [!code ++]
<Toaster theme="dark" position="top-right" />
</body>
</html>
);
}Start the development environment for broadcast
Finally, start your local development environment so the broadcast system runs:
npx monolayer start dev
Open two browser windows side by side. Navigate into your app in both and try uploading a document or adding a todo.
You'll see updates appear instantly in both — no reloads needed.