Building AI Tools with Claude: SSE, MCP, and TypeScript

This comprehensive tutorial guides seasoned web developers through implementing a tools calling system via Server-Sent Events (SSE) using Anthropic's Claude 3.5 Sonnet. You'll learn how to create a Message Control Protocol (MCP) server that facilitates real-time communication between your tools and the Claude API.
The Claude Messages API and Tools Calling
The Anthropic Messages API provides a stateless interface for interacting with Claude models. Claude 3.5 Sonnet supports tools calling, allowing the model to request execution of client-defined functions.
The basic API flow works like this:
1import Anthropic from '@anthropic-ai/sdk';23const client = new Anthropic({4 apiKey: process.env.ANTHROPIC_API_KEY5});67async function main() {8 const message = await client.messages.create({9 model: "claude-3-5-sonnet-latest",10 max_tokens: 1024,11 messages: [{12 role: "user",13 content: "Hello, Claude"14 }]15 });1617 console.log(message.content);18}
When using tools, you define them in your API request and Claude can decide to use them:
1const weatherTool = {2 name: "get_weather",3 description: "Get current weather for a location",4 input_schema: {5 type: "object",6 properties: {7 location: {8 type: "string",9 description: "City and state/country"10 },11 unit: {12 type: "string",13 enum: ["celsius", "fahrenheit"],14 description: "Temperature unit"15 }16 },17 required: ["location"]18 }19};2021// Claude might respond with a tool_use event22// Then you execute the tool and send results back
Understanding Server-Sent Events (SSE)
Server-Sent Events is a standard for pushing updates from a server to clients over HTTP. Unlike WebSockets, SSE provides one-way communication from server to client, making it ideal for streaming responses.
Anthropic's API supports streaming via SSE, which is particularly valuable for tools calling because:
It enables real-time updates as Claude thinks and generates content
It allows clients to see when Claude is preparing to use a tool
It provides a more responsive user experience for long responses
The SSE connection maintains a persistent HTTP connection with special headers:
Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive
When Claude decides to use a tool, the streaming response includes specific events indicating this decision, allowing your application to respond immediately.
The Message Control Protocol (MCP)
The Message Control Protocol (MCP) is Anthropic's open standard for connecting AI assistants to external tools and data sources. While tools calling in the API allows defining ad-hoc tools for a single request, MCP provides a more persistent and standardized approach.
MCP follows a client-server architecture:
- MCP Host: Applications like Claude Desktop or IDE plugins
- MCP Client: Maintains connections with MCP servers
- MCP Server: Exposes capabilities (tools, resources) via a standardized API
Important Note: According to the official MCP TypeScript SDK, the SSE transport is now deprecated in favor of Streamable HTTP. New implementations should use Streamable HTTP, but this tutorial includes SSE implementation as requested.
Project Setup
Let's set up our TypeScript project with the necessary dependencies:
1# Create project directory2mkdir claude-tools-sse3cd claude-tools-sse45# Initialize npm project6npm init -y78# Install server dependencies9npm install @modelcontextprotocol/sdk @anthropic-ai/sdk express cors zod1011# Install client dependencies12npm install eventsource-parser1314# Install dev dependencies15npm install -D typescript @types/node @types/express @types/cors ts-node16
Create a tsconfig.json file:
1{2 "compilerOptions": {3 "target": "ES2022",4 "module": "NodeNext",5 "moduleResolution": "NodeNext",6 "outDir": "./dist",7 "rootDir": "./src",8 "esModuleInterop": true,9 "strict": true,10 "skipLibCheck": true,11 "forceConsistentCasingInFileNames": true12 },13 "include": ["src/**/*"],14 "exclude": ["node_modules"]15}161718And update package.json:1920{21 "type": "module",22 "scripts": {23 "build": "tsc",24 "start": "node dist/server.js",25 "dev": "ts-node --esm src/server.ts",26 "client": "ts-node --esm src/client.ts"27 }28}
Server-side Implementation
Basic MCP Server Structure
Create a src/server.ts file with a basic MCP server implementation:
1import express from 'express';2import cors from 'cors';3import { randomUUID } from 'crypto';4import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";5import { StreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/streamableHttp.js";6import { InMemoryEventStore } from "@modelcontextprotocol/sdk/inMemory.js";7import { z } from "zod";89// Initialize Express10const app = express();1112// Configure CORS and JSON middleware13app.use(cors());14app.use(express.json());1516// Create an MCP server17const server = new McpServer({18 name: "Claude Tools Server",19 version: "1.0.0",20 description: "MCP server for Claude 3.5 Sonnet tool calling"21});2223// Map to store transports by session ID24const transports: Record<string, StreamableHTTPServerTransport> = {};2526// Add a simple example tool27server.tool(28 "calculate",29 "Perform basic arithmetic calculations",30 {31 expression: z.string().describe("The mathematical expression to evaluate (e.g., '5 * 3 + 2')"),32 },33 async ({ expression }) => {34 try {35 // Use Function constructor to evaluate the expression (use with caution in production)36 const result = new Function(`return ${expression}`)();37 return {38 content: [{39 type: "text",40 text: `Result: ${result}`41 }]42 };43 } catch (error) {44 return {45 content: [{46 type: "text",47 text: `Error evaluating expression: ${error instanceof Error ? error.message : String(error)}`48 }],49 isError: true50 };51 }52 }53);5455// Handle POST requests for client-to-server communication56app.post('/mcp', async (req, res) => {57 // Check for existing session ID58 const sessionId = req.headers['mcp-session-id'] as string | undefined;59 let transport: StreamableHTTPServerTransport;6061 if (sessionId && transports[sessionId]) {62 // Reuse existing transport63 transport = transports[sessionId];64 } else {65 // New session - create a transport with event store for resumability66 const eventStore = new InMemoryEventStore();67 transport = new StreamableHTTPServerTransport({68 sessionIdGenerator: () => randomUUID(),69 eventStore,70 onsessioninitialized: (sid) => {71 // Store the transport by session ID72 transports[sid] = transport;73 }74 });7576 // Clean up transport when closed77 transport.onclose = () => {78 if (transport.sessionId) {79 delete transports[transport.sessionId];80 console.log(`Session ${transport.sessionId} closed`);81 }82 };8384 // Connect the server to the transport85 await server.connect(transport);86 }8788 // Handle the request89 try {90 await transport.handleRequest(req, res, req.body);91 } catch (error) {92 console.error('Error handling request:', error);93 if (!res.headersSent) {94 res.status(500).json({95 jsonrpc: '2.0',96 error: {97 code: -32603,98 message: 'Internal server error',99 },100 id: null,101 });102 }103 }104});105106// Reusable handler for GET and DELETE requests107const handleSessionRequest = async (req: express.Request, res: express.Response) => {108 const sessionId = req.headers['mcp-session-id'] as string | undefined;109 if (!sessionId || !transports[sessionId]) {110 res.status(400).send('Invalid or missing session ID');111 return;112 }113114 const transport = transports[sessionId];115 try {116 await transport.handleRequest(req, res);117 } catch (error) {118 console.error('Error handling session request:', error);119 if (!res.headersSent) {120 res.status(500).send('Internal server error');121 }122 }123};124125// Handle GET requests for server-to-client notifications126app.get('/mcp', handleSessionRequest);127128// Handle DELETE requests for session termination129app.delete('/mcp', handleSessionRequest);130131// Start the server132const PORT = process.env.PORT || 3000;133app.listen(PORT, () => {134 console.log(`MCP server running on http://localhost:${PORT}`);135});
Implementing Useful Tools
Expand your server to implement more useful tools that Claude can call:
1// Add a web search tool2server.tool(3 "web_search",4 "Search the web for information",5 {6 query: z.string().describe("The search query"),7 maxResults: z.number().optional().describe("Maximum number of results to return")8 },9 async ({ query, maxResults = 5 }) => {10 try {11 // In a real implementation, you would call a search API here12 // For demonstration, we'll return a mock response13 const mockResults = [14 { title: "Search Result 1", url: "https://example.com/1", snippet: "This is a mock search result" },15 { title: "Search Result 2", url: "https://example.com/2", snippet: "Another mock search result" },16 // Add more mock results as needed17 ].slice(0, maxResults);1819 return {20 content: [{21 type: "text",22 text: JSON.stringify(mockResults, null, 2)23 }]24 };25 } catch (error) {26 return {27 content: [{28 type: "text",29 text: `Error performing search: ${error instanceof Error ? error.message : String(error)}`30 }],31 isError: true32 };33 }34 }35);3637// Add a tool for fetching content from a URL38server.tool(39 "fetch_url",40 "Fetch content from a specified URL",41 {42 url: z.string().url().describe("The URL to fetch content from")43 },44 async ({ url }) => {45 try {46 const response = await fetch(url);47 if (!response.ok) {48 throw new Error(`HTTP error! Status: ${response.status}`);49 }50 const text = await response.text();5152 // For safety, limit the size of the response53 const truncatedText = text.length > 10000 ? text.substring(0, 10000) + "... (truncated)" : text;5455 return {56 content: [{57 type: "text",58 text: truncatedText59 }]60 };61 } catch (error) {62 return {63 content: [{64 type: "text",65 text: `Error fetching URL: ${error instanceof Error ? error.message : String(error)}`66 }],67 isError: true68 };69 }70 }71);72
Creating a Wrapper for Anthropic's API
To directly handle Claude's tool calling, create an API wrapper in src/anthropic.ts:
1// src/anthropic.ts2import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";34interface ToolCallInput {5 name: string;6 input: Record<string, any>;7}89interface ToolCallResult {10 content: string | { type: string; text: string }[];11 isError?: boolean;12}1314export async function handleClaudeToolCall(15 server: McpServer,16 toolCall: ToolCallInput17): Promise<ToolCallResult> {18 const { name, input } = toolCall;1920 // Get the tool from the server21 const tools = await server.listTools();22 const tool = tools.find(t => t.name === name);2324 if (!tool) {25 return {26 content: [{ type: "text", text: `Error: Tool '${name}' not found` }],27 isError: true28 };29 }3031 try {32 // Call the tool with the provided input33 const result = await server.callTool({ name, arguments: input });34 return result;35 } catch (error) {36 return {37 content: [{38 type: "text",39 text: `Error executing tool '${name}': ${error instanceof Error ? error.message : String(error)}`40 }],41 isError: true42 };43 }44}45
Adding a Direct API Route for Claude
Add an API endpoint for direct tool calling from Claude:
1import { handleClaudeToolCall } from './anthropic.js';23// Add route for Claude tool calls4app.post('/api/claude/tool-call', async (req, res) => {5 const { name, input } = req.body;67 if (!name || !input) {8 return res.status(400).json({9 error: 'Missing required fields: name and input'10 });11 }1213 try {14 const result = await handleClaudeToolCall(server, { name, input });15 res.json(result);16 } catch (error) {17 console.error('Error handling tool call:', error);18 res.status(500).json({19 error: 'Internal server error',20 message: error instanceof Error ? error.message : String(error)21 });22 }23});24
SSE Implementation (When Specifically Needed)
If you need to use SSE transport instead of Streamable HTTP, here's how to implement it:
1// src/sse_server.ts2import express from 'express';3import cors from 'cors';4import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";5import { SSEServerTransport } from "@modelcontextprotocol/sdk/server/sse.js";6import { z } from "zod";78// Initialize Express9const app = express();1011// Configure CORS and JSON middleware12app.use(cors());13app.use(express.json());1415// Create an MCP server16const server = new McpServer({17 name: "Claude SSE Tools Server",18 version: "1.0.0",19 description: "SSE MCP server for Claude 3.5 Sonnet tool calling"20});2122// Add a simple example tool23server.tool(24 "echo",25 "Echo back the provided message",26 {27 message: z.string().describe("The message to echo back")28 },29 async ({ message }) => {30 return {31 content: [{32 type: "text",33 text: `Echo: ${message}`34 }]35 };36 }37);3839// Map to store transports by session ID40const sseTransports = new Map();4142// SSE endpoint43app.get('/sse', async (req, res) => {44 // Set SSE headers45 res.writeHead(200, {46 'Content-Type': 'text/event-stream',47 'Cache-Control': 'no-cache',48 'Connection': 'keep-alive'49 });5051 // Create a new SSE transport52 const transport = new SSEServerTransport('/messages', res);53 sseTransports.set(transport.sessionId, transport);5455 // Clean up on connection close56 req.on('close', () => {57 sseTransports.delete(transport.sessionId);58 console.log(`SSE connection closed: ${transport.sessionId}`);59 });6061 // Connect to the MCP server62 await server.connect(transport);63});6465// Message endpoint for SSE66app.post('/messages', async (req, res) => {67 const sessionId = req.query.sessionId as string;68 const transport = sseTransports.get(sessionId);6970 if (!transport) {71 return res.status(400).json({ error: 'Invalid session ID' });72 }7374 await transport.handlePostMessage(req, res);75});7677// Start the server78const PORT = process.env.PORT || 3000;79app.listen(PORT, () => {80 console.log(`SSE MCP server running on http://localhost:${PORT}`);81 console.log(`SSE endpoint: http://localhost:${PORT}/sse`);82});83
Client-side Implementation
SSE Client Implementation
Create an SSE client to handle server events in src/sse/sseClient.ts:
1// src/sse/sseClient.ts2export interface SSEOptions {3 headers?: Record<string, string>;4 withCredentials?: boolean;5 onOpen?: (event: Event) => void;6 onMessage?: (event: MessageEvent) => void;7 onError?: (event: Event) => void;8}910export class SSEClient {11 private eventSource: EventSource | null = null;12 private url: string;13 private options: SSEOptions;1415 constructor(url: string, options: SSEOptions = {}) {16 this.url = url;17 this.options = options;18 }1920 connect(): void {21 if (this.eventSource) {22 this.close();23 }2425 this.eventSource = new EventSource(this.url, {26 withCredentials: this.options.withCredentials27 });2829 this.eventSource.onopen = (event) => {30 if (this.options.onOpen) {31 this.options.onOpen(event);32 }33 };3435 this.eventSource.onmessage = (event) => {36 if (this.options.onMessage) {37 this.options.onMessage(event);38 }39 };4041 this.eventSource.onerror = (event) => {42 if (this.options.onError) {43 this.options.onError(event);44 }4546 // Auto-reconnect is handled by the EventSource47 if (this.eventSource?.readyState === EventSource.CLOSED) {48 console.log('Connection closed. Attempting to reconnect...');49 }50 };51 }5253 addEventListener(eventName: string, callback: (event: MessageEvent) => void): void {54 if (!this.eventSource) {55 throw new Error('Event source not initialized. Call connect() first.');56 }5758 this.eventSource.addEventListener(eventName, callback as EventListener);59 }6061 close(): void {62 if (this.eventSource) {63 this.eventSource.close();64 this.eventSource = null;65 }66 }67}68
Anthropic API Client
Create a client for interacting with Anthropic's API:
1// src/anthropic/anthropicClient.ts2import Anthropic from '@anthropic-ai/sdk';3import { createParser } from 'eventsource-parser';45export interface AnthropicClientOptions {6 apiKey: string;7 model?: string;8 maxTokens?: number;9}1011export class AnthropicClient {12 private client: Anthropic;13 private options: AnthropicClientOptions;1415 constructor(options: AnthropicClientOptions) {16 this.options = {17 model: 'claude-3-5-sonnet-20240620',18 maxTokens: 1024,19 ...options,20 };2122 this.client = new Anthropic({23 apiKey: this.options.apiKey,24 });25 }2627 async sendMessage(28 messages: Array<{ role: 'user' | 'assistant'; content: string | Array<any> }>,29 tools?: Array<any>,30 stream = true31 ) {32 try {33 if (stream) {34 return this.client.messages.create({35 model: this.options.model!,36 max_tokens: this.options.maxTokens,37 messages,38 tools,39 stream: true,40 });41 } else {42 return this.client.messages.create({43 model: this.options.model!,44 max_tokens: this.options.maxTokens,45 messages,46 tools,47 });48 }49 } catch (error) {50 console.error('Error sending message to Anthropic:', error);51 throw error;52 }53 }54}55
Handling Streaming Responses
Create a handler for processing streamed responses:
1// src/anthropic/streamHandler.ts2export interface StreamingContentBlock {3 type: string;4 text?: string;5 index: number;6}78export interface StreamingMessage {9 id: string;10 role: string;11 content: StreamingContentBlock[];12 stopReason: string | null;13 usage: {14 inputTokens: number;15 outputTokens: number;16 };17}1819export class StreamHandler {20 private currentMessage: StreamingMessage | null = null;21 private contentBlocks: Map<number, StreamingContentBlock> = new Map();22 private onContentUpdate: (content: string, isDone: boolean) => void;2324 constructor(onContentUpdate: (content: string, isDone: boolean) => void) {25 this.onContentUpdate = onContentUpdate;26 }2728 handleEvent(event: { type: string; data: any }): void {29 switch (event.type) {30 case 'message_start':31 this.handleMessageStart(event.data.message);32 break;33 case 'content_block_start':34 this.handleContentBlockStart(event.data.index, event.data.content_block);35 break;36 case 'content_block_delta':37 this.handleContentBlockDelta(event.data.index, event.data.delta);38 break;39 case 'content_block_stop':40 this.handleContentBlockStop(event.data.index);41 break;42 case 'message_delta':43 // Handle message delta if needed44 break;45 case 'message_stop':46 this.handleMessageStop(event.data);47 break;48 default:49 // Handle other event types if needed50 break;51 }5253 // Update UI with current content54 this.updateContent(event.type === 'message_stop');55 }5657 private handleMessageStart(message: any): void {58 this.currentMessage = {59 id: message.id,60 role: message.role,61 content: [],62 stopReason: null,63 usage: message.usage || { inputTokens: 0, outputTokens: 0 },64 };65 this.contentBlocks.clear();66 }6768 private handleContentBlockStart(index: number, contentBlock: any): void {69 this.contentBlocks.set(index, {70 type: contentBlock.type,71 text: contentBlock.type === 'text' ? contentBlock.text || '' : undefined,72 index,73 });74 }7576 private handleContentBlockDelta(index: number, delta: any): void {77 const block = this.contentBlocks.get(index);78 if (block && delta.type === 'text_delta' && block.type === 'text') {79 block.text = (block.text || '') + (delta.text || '');80 this.contentBlocks.set(index, block);81 }82 }8384 private handleContentBlockStop(index: number): void {85 // Block is complete - could trigger specific UI updates for this block86 }8788 private handleMessageStop(data: any): void {89 if (this.currentMessage) {90 this.currentMessage.stopReason = data.stop_reason || 'end_turn';91 }92 }9394 private updateContent(isDone: boolean): void {95 if (!this.currentMessage) return;9697 // Sort blocks by index and concatenate text98 const sortedBlocks = Array.from(this.contentBlocks.values())99 .sort((a, b) => a.index - b.index);100101 // Extract text content102 const textContent = sortedBlocks103 .filter(block => block.type === 'text' && block.text)104 .map(block => block.text)105 .join('');106107 // Update UI via callback108 this.onContentUpdate(textContent, isDone);109 }110111 public getToolCalls(): any[] {112 if (!this.currentMessage) return [];113114 // Extract tool_use blocks from content115 return Array.from(this.contentBlocks.values())116 .filter(block => block.type === 'tool_use');117 }118}
Tool Calling Implementation
Create a handler for processing tool calls:
1// src/anthropic/toolsHandler.ts2import Anthropic from '@anthropic-ai/sdk';34export interface Tool {5 name: string;6 description: string;7 inputSchema: Record<string, any>;8 handler: (params: any) => Promise<any>;9}1011export class ToolsHandler {12 private tools: Map<string, Tool> = new Map();13 private anthropicClient: Anthropic;1415 constructor(anthropicClient: Anthropic) {16 this.anthropicClient = anthropicClient;17 }1819 registerTool(tool: Tool): void {20 this.tools.set(tool.name, tool);21 }2223 getToolsForApiRequest(): any[] {24 return Array.from(this.tools.values()).map(tool => ({25 name: tool.name,26 description: tool.description,27 input_schema: tool.inputSchema,28 }));29 }3031 async handleToolCall(toolCall: any): Promise<any> {32 const { name, input } = toolCall;33 const tool = this.tools.get(name);3435 if (!tool) {36 throw new Error(`Tool "${name}" not found`);37 }3839 try {40 return await tool.handler(input);41 } catch (error) {42 console.error(`Error executing tool "${name}":`, error);43 throw error;44 }45 }4647 async createToolResultMessage(48 messageId: string,49 toolCallId: string,50 result: any51 ): Promise<any> {52 return {53 role: 'user',54 content: [55 {56 type: 'tool_result',57 tool_call_id: toolCallId,58 content: JSON.stringify(result),59 },60 ],61 };62 }63}64
Complete Client Implementation
Create a complete client implementation that ties everything together:
1// src/client.ts2import { AnthropicClient } from './anthropic/anthropicClient.js';3import { StreamHandler } from './anthropic/streamHandler.js';4import { ToolsHandler, Tool } from './anthropic/toolsHandler.js';5import { SSEClient } from './sse/sseClient.js';6import Anthropic from '@anthropic-ai/sdk';78// Configuration (in a real app, get from environment variables)9const ANTHROPIC_API_KEY = process.env.ANTHROPIC_API_KEY || 'your-api-key-here';10const SSE_SERVER_URL = 'http://localhost:3000/sse';1112// Initialize Anthropic client13const anthropicClient = new AnthropicClient({14 apiKey: ANTHROPIC_API_KEY,15});1617// Initialize tools handler18const toolsHandler = new ToolsHandler(new Anthropic({ apiKey: ANTHROPIC_API_KEY }));1920// Register a sample weather tool21const weatherTool: Tool = {22 name: 'get_weather',23 description: 'Get the current weather for a location',24 inputSchema: {25 type: 'object',26 properties: {27 location: {28 type: 'string',29 description: 'The city and state, e.g., San Francisco, CA',30 },31 unit: {32 type: 'string',33 enum: ['celsius', 'fahrenheit'],34 description: 'The unit of temperature',35 },36 },37 required: ['location'],38 },39 handler: async (params) => {40 // In a real app, this would call a weather API41 console.log('Getting weather for', params.location);42 return {43 temperature: 72,44 unit: params.unit || 'fahrenheit',45 description: 'Sunny',46 location: params.location,47 };48 },49};5051toolsHandler.registerTool(weatherTool);5253// Function to handle user messages54async function handleUserMessage(messageText: string) {55 console.log(`User message: ${messageText}`);56 console.log('Sending to Claude...');5758 try {59 // Create stream handler to process streaming response60 const streamHandler = new StreamHandler((content, isDone) => {61 // In a real app, update UI with content62 process.stdout.write('\r\033[K'); // Clear line63 process.stdout.write(`Claude: ${content}`);6465 if (isDone) {66 console.log('\n\nResponse complete');67 // Check for tool calls68 const toolCalls = streamHandler.getToolCalls();69 if (toolCalls.length > 0) {70 console.log('\nDetected tool calls:', toolCalls.length);71 handleToolCalls(toolCalls);72 }73 }74 });7576 // Send message to Anthropic77 const stream = await anthropicClient.sendMessage(78 [{ role: 'user', content: messageText }],79 toolsHandler.getToolsForApiRequest(),80 true // stream81 );8283 // Process streaming response84 for await (const event of stream) {85 streamHandler.handleEvent(event);86 }87 } catch (error) {88 console.error('Error handling message:', error);89 }90}9192// Function to handle tool calls93async function handleToolCalls(toolCalls: any[]) {94 if (!toolCalls.length) return;9596 try {97 // Process each tool call98 for (const toolCall of toolCalls) {99 console.log(`\nExecuting tool: ${toolCall.name}`);100 console.log(`Tool input:`, toolCall.input);101102 const result = await toolsHandler.handleToolCall({103 name: toolCall.name,104 input: toolCall.input,105 });106107 console.log(`Tool result:`, result);108109 // Send tool result back to Anthropic110 const toolResultMessage = await toolsHandler.createToolResultMessage(111 toolCall.id,112 toolCall.id,113 result114 );115116 console.log('\nSending tool result back to Claude...');117118 // Continue the conversation with the tool result119 const response = await anthropicClient.sendMessage(120 [121 { role: 'user', content: 'What\'s the weather like?' },122 { role: 'assistant', content: [toolCall] },123 toolResultMessage,124 ],125 toolsHandler.getToolsForApiRequest(),126 false // Don't stream the follow-up response127 );128129 // Display final response130 console.log('\nFinal response:', response.content);131 }132 } catch (error) {133 console.error('Error handling tool calls:', error);134 }135}136137// Connect to SSE server138function connectToSSEServer() {139 const sseClient = new SSEClient(SSE_SERVER_URL, {140 onOpen: () => {141 console.log('Connected to SSE server');142 },143 onMessage: (event) => {144 try {145 const data = JSON.parse(event.data);146 console.log('Received SSE message:', data);147148 // Handle different types of messages from the SSE server149 if (data.type === 'tool_request') {150 handleSSEToolRequest(data);151 }152 } catch (e) {153 console.error('Error parsing SSE message:', e);154 }155 },156 onError: (error) => {157 console.error('SSE connection error:', error);158 }159 });160161 sseClient.connect();162 return sseClient;163}164165// Handle tool requests from SSE server166async function handleSSEToolRequest(data: any) {167 try {168 console.log(`Executing tool from SSE: ${data.tool}`);169170 // Execute tool and get result171 const result = await toolsHandler.handleToolCall({172 name: data.tool,173 input: data.input,174 });175176 console.log('Tool result:', result);177178 // Send result back to SSE server179 fetch(`http://localhost:3000/tool-result`, {180 method: 'POST',181 headers: {182 'Content-Type': 'application/json',183 },184 body: JSON.stringify({185 requestId: data.requestId,186 result,187 }),188 });189 } catch (error) {190 console.error('Error handling SSE tool request:', error);191 }192}193194// Main function to run the client195async function main() {196 console.log('Starting Claude Tools Client');197198 // Connect to SSE server199 const sseClient = connectToSSEServer();200201 // Example usage202 await handleUserMessage('What\'s the weather like in San Francisco?');203}204205// Run the client206main().catch(console.error);
Best Practices and Potential Pitfalls
Best Practices
Structuring TypeScript Code
Follow a Modular Architecture
- Separate concerns: connection handling, message parsing, tool execution, error handling
- Use TypeScript interfaces for clear contracts between modules
- Implement dependency injection for testability
SSE Server Configuration
1// Set proper SSE headers2res.writeHead(200, {3 'Content-Type': 'text/event-stream; charset=utf-8',4 'Cache-Control': 'no-cache, no-transform',5 'Connection': 'keep-alive'6});7
Security Considerations
API Key Management
- Never expose Anthropic API keys in client-side code
- Use environment variables or secure secret management
- Implement key rotation policies
Authentication for MCP
1// Example JWT authentication2import { expressjwt } from 'express-jwt';34const authenticate = expressjwt({5 secret: process.env.JWT_SECRET || 'your-secret-key',6 algorithms: ['HS256']7});89// Apply to routes that need protection10app.post('/api/claude/tool-call', authenticate, async (req, res) => {11 // Route implementation12});
Input Validation
1// Always validate inputs with Zod2server.tool(3 "example_tool",4 "An example tool",5 {6 param1: z.string().min(1).max(100).describe("Parameter description"),7 param2: z.number().min(0).max(100).optional().describe("Optional parameter")8 },9 async ({ param1, param2 }) => {10 // Implementation11 }12);
Performance Optimization
Connection Management
- Use HTTP/2 to overcome browser connection limits (6 per domain)
- Close inactive connections to free resources
- Implement timeouts for idle connections
Memory Efficiency
- Use streams instead of buffering entire responses
- Clear references to large objects when no longer needed
- Be cautious with closures that might retain references
Common Pitfalls
SSE Implementation Issues
Browser Connection Limits
- Browsers limit SSE connections to 6 per domain without HTTP/2
- Use HTTP/2 or implement connection pooling strategies
- Consider using WebSockets for extremely high-connection scenarios
Memory Leaks
1// Always clean up SSE connections2req.on('close', () => {3 clearInterval(heartbeatInterval);4 delete connections[connectionId];5});6
Proxying Issues
- Some proxies buffer responses, breaking the SSE stream
- Set appropriate headers to prevent buffering
1res.setHeader('X-Accel-Buffering', 'no'); // For NGINX2
Claude API Limitations
Tool Call Timeouts
- Long-running tools may time out
- Consider background processing with task queues
- Return interim status updates for long operations
Sequential Tool Calls
- Multiple dependent tool calls are executed sequentially
- Design tools to minimize interdependencies
- Consider combining related operations into single tools
Edge Cases
Network Disruptions
- Implement reconnection logic with exponential backoff
- Store state to allow resuming conversations
- Provide user feedback during reconnection attempts
Browser Compatibility
- Add EventSource polyfills for older browsers
- Test across multiple browser environments
- Provide fallback mechanisms for browsers without SSE
Error Handling Approaches
Robust API Call Error Handling
1try {2 const response = await anthropic.messages.create({3 model: "claude-3-5-sonnet-latest",4 max_tokens: 1024,5 messages: [{ role: "user", content: "Hello, Claude" }],6 stream: true,7 });8} catch (error) {9 if (error instanceof Anthropic.APIError) {10 // Handle API-specific errors11 console.error(`API Error: ${error.status} ${error.message}`);1213 // Handle specific status codes14 if (error.status === 429) {15 // Rate limiting - implement backoff and retry16 } else if (error.status === 401) {17 // Authentication issue18 }19 } else {20 // Handle unexpected errors21 console.error(`Unexpected error: ${error.message}`);22 }23}
Implementing Retry Logic
1async function fetchWithRetry(fn, options = {}) {2 const { maxRetries = 3, baseDelay = 300, maxDelay = 10000 } = options;34 let lastError;5 for (let attempt = 0; attempt < maxRetries; attempt++) {6 try {7 return await fn();8 } catch (err) {9 lastError = err;1011 // Skip retry for certain errors12 if (err.status === 400 || err.status === 401) {13 throw err;14 }1516 // Exponential backoff with jitter17 const delay = Math.min(18 maxDelay,19 baseDelay * Math.pow(2, attempt) * (0.8 + Math.random() * 0.4)20 );2122 console.log(`Retrying after ${delay}ms (attempt ${attempt + 1}/${maxRetries})`);23 await new Promise(resolve => setTimeout(resolve, delay));24 }25 }2627 throw lastError;28}29
SSE Reconnection Handling
1// Server-side: Handle reconnections with last-event-id2const lastEventId = req.headers['last-event-id'];3if (lastEventId) {4 // Resume from the last event5 const missedEvents = getEventsSince(lastEventId);6 for (const event of missedEvents) {7 sendEvent(res, event);8 }9}1011// Client-side: EventSource handles reconnection automatically12// But you can implement custom logic for complex scenarios13let reconnectAttempts = 0;1415function connectSSE() {16 const es = new EventSource(url);1718 es.onopen = () => {19 reconnectAttempts = 0;20 };2122 es.onerror = (error) => {23 if (es.readyState === EventSource.CLOSED) {24 // Custom reconnection logic25 reconnectAttempts++;26 const delay = Math.min(1000 * Math.pow(2, reconnectAttempts), 30000);2728 console.log(`Connection closed. Reconnecting in ${delay}ms...`);29 setTimeout(connectSSE, delay);30 }31 };3233 return es;34}
Heartbeat for Connection Maintenance
1// Send heartbeat every 30 seconds2const heartbeatInterval = setInterval(() => {3 if (res.writableEnded) {4 clearInterval(heartbeatInterval);5 return;6 }78 res.write(':keepalive\n\n');9}, 30000);1011// Clean up on connection close12req.on('close', () => {13 clearInterval(heartbeatInterval);14});15
Testing Methodologies
Unit Testing
Testing individual components:
1// Example Jest test for SSE server2describe('SSE Server', () => {3 let server;4 let mockResponse;56 beforeEach(() => {7 mockResponse = {8 writeHead: jest.fn(),9 write: jest.fn(),10 end: jest.fn(),11 on: jest.fn()12 };13 server = new SSEServer();14 });1516 test('should set correct headers', () => {17 server.handleRequest({}, mockResponse);1819 expect(mockResponse.writeHead).toHaveBeenCalledWith(200, {20 'Content-Type': 'text/event-stream; charset=utf-8',21 'Cache-Control': 'no-cache, no-transform',22 'Connection': 'keep-alive'23 });24 });2526 test('should send event with correct format', () => {27 server.handleRequest({}, mockResponse);28 server.sendEvent(mockResponse, 'update', { data: 'test' });2930 expect(mockResponse.write).toHaveBeenCalledWith(31 'event: update\ndata: {"data":"test"}\n\n'32 );33 });34});35
Integration Testing
Testing the complete flow:
1// Example integration test with supertest2import request from 'supertest';3import { app } from '../src/server';45describe('MCP Server API', () => {6 test('POST /api/claude/tool-call should execute a tool and return result', async () => {7 const response = await request(app)8 .post('/api/claude/tool-call')9 .send({10 name: 'calculate',11 input: { expression: '5 + 3' }12 });1314 expect(response.status).toBe(200);15 expect(response.body.content[0].text).toContain('Result: 8');16 });17});18
Mocking Anthropic's API
1// Mock the Anthropic API client2jest.mock('@anthropic-ai/sdk', () => {3 return {4 Anthropic: jest.fn().mockImplementation(() => ({5 messages: {6 create: jest.fn().mockImplementation(async ({ stream }) => {7 if (stream) {8 // Return mock stream9 return {10 [Symbol.asyncIterator]: async function* () {11 yield { type: 'content_block_start', content_block: { type: 'text' } };12 yield { type: 'content_block_delta', delta: { text: 'Hello' } };13 yield { type: 'content_block_delta', delta: { text: ' world' } };14 yield { type: 'content_block_stop' };15 yield { type: 'message_stop' };16 }17 };18 }1920 // Return mock non-streaming response21 return {22 id: 'msg_mock123',23 content: [{ type: 'text', text: 'Hello world' }]24 };25 })26 }27 }))28 };29});30
Load Testing
1// Example k6 script for load testing SSE connections2import http from 'k6/http';3import { sleep } from 'k6';4import { check } from 'k6';56export default function() {7 const res = http.get('http://localhost:3000/sse');89 check(res, {10 'status is 200': (r) => r.status === 200,11 'content-type is text/event-stream': (r) => r.headers['Content-Type'] &&12 r.headers['Content-Type'].includes('text/event-stream'),13 });1415 sleep(10);16}1718export const options = {19 vus: 10, // 10 virtual users20 duration: '30s' // Test for 30 seconds21};
Conclusion
This tutorial has covered implementing a tools calling system via Server-Sent Events (SSE) using Anthropic's Claude 3.5 Sonnet. Key components include:
The Claude Messages API and tools calling functionality
Server-Sent Events (SSE) for streaming responses
The Message Control Protocol (MCP) for standardized tool integration
Server-side implementation in TypeScript
Client-side implementation for handling tool calls
Best practices for security, performance, and error handling
Testing methodologies for robust implementation
While SSE transport is being deprecated in favor of Streamable HTTP, the concepts and patterns in this tutorial apply to both approaches. As Anthropic's API evolves, the core principles of tool calling, streaming responses, and event-based architecture will remain relevant.
By leveraging these technologies, you can build powerful AI applications that combine Claude's intelligence with custom tools and data sources, creating more capable and interactive experiences for your users.