Building AI Tools with Claude: SSE, MCP, and TypeScript

JV
Jo Vinkenroye
Building AI Tools with Claude: SSE, MCP, and TypeScript

This comprehensive tutorial guides seasoned web developers through implementing a tools calling system via Server-Sent Events (SSE) using Anthropic's Claude 3.5 Sonnet. You'll learn how to create a Message Control Protocol (MCP) server that facilitates real-time communication between your tools and the Claude API.

The Claude Messages API and Tools Calling

The Anthropic Messages API provides a stateless interface for interacting with Claude models. Claude 3.5 Sonnet supports tools calling, allowing the model to request execution of client-defined functions.

The basic API flow works like this:

1import Anthropic from '@anthropic-ai/sdk';
2
3const client = new Anthropic({
4 apiKey: process.env.ANTHROPIC_API_KEY
5});
6
7async function main() {
8 const message = await client.messages.create({
9 model: "claude-3-5-sonnet-latest",
10 max_tokens: 1024,
11 messages: [{
12 role: "user",
13 content: "Hello, Claude"
14 }]
15 });
16
17 console.log(message.content);
18}


When using tools, you define them in your API request and Claude can decide to use them:

1const weatherTool = {
2 name: "get_weather",
3 description: "Get current weather for a location",
4 input_schema: {
5 type: "object",
6 properties: {
7 location: {
8 type: "string",
9 description: "City and state/country"
10 },
11 unit: {
12 type: "string",
13 enum: ["celsius", "fahrenheit"],
14 description: "Temperature unit"
15 }
16 },
17 required: ["location"]
18 }
19};
20
21// Claude might respond with a tool_use event
22// Then you execute the tool and send results back


Understanding Server-Sent Events (SSE)

Server-Sent Events is a standard for pushing updates from a server to clients over HTTP. Unlike WebSockets, SSE provides one-way communication from server to client, making it ideal for streaming responses.

Anthropic's API supports streaming via SSE, which is particularly valuable for tools calling because:

It enables real-time updates as Claude thinks and generates content

It allows clients to see when Claude is preparing to use a tool

It provides a more responsive user experience for long responses

The SSE connection maintains a persistent HTTP connection with special headers:

Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive

When Claude decides to use a tool, the streaming response includes specific events indicating this decision, allowing your application to respond immediately.

The Message Control Protocol (MCP)

The Message Control Protocol (MCP) is Anthropic's open standard for connecting AI assistants to external tools and data sources. While tools calling in the API allows defining ad-hoc tools for a single request, MCP provides a more persistent and standardized approach.

MCP follows a client-server architecture:

  • MCP Host: Applications like Claude Desktop or IDE plugins
  • MCP Client: Maintains connections with MCP servers
  • MCP Server: Exposes capabilities (tools, resources) via a standardized API

Important Note: According to the official MCP TypeScript SDK, the SSE transport is now deprecated in favor of Streamable HTTP. New implementations should use Streamable HTTP, but this tutorial includes SSE implementation as requested.

Project Setup

Let's set up our TypeScript project with the necessary dependencies:

1# Create project directory
2mkdir claude-tools-sse
3cd claude-tools-sse
4
5# Initialize npm project
6npm init -y
7
8# Install server dependencies
9npm install @modelcontextprotocol/sdk @anthropic-ai/sdk express cors zod
10
11# Install client dependencies
12npm install eventsource-parser
13
14# Install dev dependencies
15npm install -D typescript @types/node @types/express @types/cors ts-node
16

Create a tsconfig.json file:

1{
2 "compilerOptions": {
3 "target": "ES2022",
4 "module": "NodeNext",
5 "moduleResolution": "NodeNext",
6 "outDir": "./dist",
7 "rootDir": "./src",
8 "esModuleInterop": true,
9 "strict": true,
10 "skipLibCheck": true,
11 "forceConsistentCasingInFileNames": true
12 },
13 "include": ["src/**/*"],
14 "exclude": ["node_modules"]
15}
16
17
18And update package.json:
19
20{
21 "type": "module",
22 "scripts": {
23 "build": "tsc",
24 "start": "node dist/server.js",
25 "dev": "ts-node --esm src/server.ts",
26 "client": "ts-node --esm src/client.ts"
27 }
28}


Server-side Implementation

Basic MCP Server Structure

Create a src/server.ts file with a basic MCP server implementation:

1import express from 'express';
2import cors from 'cors';
3import { randomUUID } from 'crypto';
4import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
5import { StreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/streamableHttp.js";
6import { InMemoryEventStore } from "@modelcontextprotocol/sdk/inMemory.js";
7import { z } from "zod";
8
9// Initialize Express
10const app = express();
11
12// Configure CORS and JSON middleware
13app.use(cors());
14app.use(express.json());
15
16// Create an MCP server
17const server = new McpServer({
18 name: "Claude Tools Server",
19 version: "1.0.0",
20 description: "MCP server for Claude 3.5 Sonnet tool calling"
21});
22
23// Map to store transports by session ID
24const transports: Record<string, StreamableHTTPServerTransport> = {};
25
26// Add a simple example tool
27server.tool(
28 "calculate",
29 "Perform basic arithmetic calculations",
30 {
31 expression: z.string().describe("The mathematical expression to evaluate (e.g., '5 * 3 + 2')"),
32 },
33 async ({ expression }) => {
34 try {
35 // Use Function constructor to evaluate the expression (use with caution in production)
36 const result = new Function(`return ${expression}`)();
37 return {
38 content: [{
39 type: "text",
40 text: `Result: ${result}`
41 }]
42 };
43 } catch (error) {
44 return {
45 content: [{
46 type: "text",
47 text: `Error evaluating expression: ${error instanceof Error ? error.message : String(error)}`
48 }],
49 isError: true
50 };
51 }
52 }
53);
54
55// Handle POST requests for client-to-server communication
56app.post('/mcp', async (req, res) => {
57 // Check for existing session ID
58 const sessionId = req.headers['mcp-session-id'] as string | undefined;
59 let transport: StreamableHTTPServerTransport;
60
61 if (sessionId && transports[sessionId]) {
62 // Reuse existing transport
63 transport = transports[sessionId];
64 } else {
65 // New session - create a transport with event store for resumability
66 const eventStore = new InMemoryEventStore();
67 transport = new StreamableHTTPServerTransport({
68 sessionIdGenerator: () => randomUUID(),
69 eventStore,
70 onsessioninitialized: (sid) => {
71 // Store the transport by session ID
72 transports[sid] = transport;
73 }
74 });
75
76 // Clean up transport when closed
77 transport.onclose = () => {
78 if (transport.sessionId) {
79 delete transports[transport.sessionId];
80 console.log(`Session ${transport.sessionId} closed`);
81 }
82 };
83
84 // Connect the server to the transport
85 await server.connect(transport);
86 }
87
88 // Handle the request
89 try {
90 await transport.handleRequest(req, res, req.body);
91 } catch (error) {
92 console.error('Error handling request:', error);
93 if (!res.headersSent) {
94 res.status(500).json({
95 jsonrpc: '2.0',
96 error: {
97 code: -32603,
98 message: 'Internal server error',
99 },
100 id: null,
101 });
102 }
103 }
104});
105
106// Reusable handler for GET and DELETE requests
107const handleSessionRequest = async (req: express.Request, res: express.Response) => {
108 const sessionId = req.headers['mcp-session-id'] as string | undefined;
109 if (!sessionId || !transports[sessionId]) {
110 res.status(400).send('Invalid or missing session ID');
111 return;
112 }
113
114 const transport = transports[sessionId];
115 try {
116 await transport.handleRequest(req, res);
117 } catch (error) {
118 console.error('Error handling session request:', error);
119 if (!res.headersSent) {
120 res.status(500).send('Internal server error');
121 }
122 }
123};
124
125// Handle GET requests for server-to-client notifications
126app.get('/mcp', handleSessionRequest);
127
128// Handle DELETE requests for session termination
129app.delete('/mcp', handleSessionRequest);
130
131// Start the server
132const PORT = process.env.PORT || 3000;
133app.listen(PORT, () => {
134 console.log(`MCP server running on http://localhost:${PORT}`);
135});


Implementing Useful Tools

Expand your server to implement more useful tools that Claude can call:

1// Add a web search tool
2server.tool(
3 "web_search",
4 "Search the web for information",
5 {
6 query: z.string().describe("The search query"),
7 maxResults: z.number().optional().describe("Maximum number of results to return")
8 },
9 async ({ query, maxResults = 5 }) => {
10 try {
11 // In a real implementation, you would call a search API here
12 // For demonstration, we'll return a mock response
13 const mockResults = [
14 { title: "Search Result 1", url: "https://example.com/1", snippet: "This is a mock search result" },
15 { title: "Search Result 2", url: "https://example.com/2", snippet: "Another mock search result" },
16 // Add more mock results as needed
17 ].slice(0, maxResults);
18
19 return {
20 content: [{
21 type: "text",
22 text: JSON.stringify(mockResults, null, 2)
23 }]
24 };
25 } catch (error) {
26 return {
27 content: [{
28 type: "text",
29 text: `Error performing search: ${error instanceof Error ? error.message : String(error)}`
30 }],
31 isError: true
32 };
33 }
34 }
35);
36
37// Add a tool for fetching content from a URL
38server.tool(
39 "fetch_url",
40 "Fetch content from a specified URL",
41 {
42 url: z.string().url().describe("The URL to fetch content from")
43 },
44 async ({ url }) => {
45 try {
46 const response = await fetch(url);
47 if (!response.ok) {
48 throw new Error(`HTTP error! Status: ${response.status}`);
49 }
50 const text = await response.text();
51
52 // For safety, limit the size of the response
53 const truncatedText = text.length > 10000 ? text.substring(0, 10000) + "... (truncated)" : text;
54
55 return {
56 content: [{
57 type: "text",
58 text: truncatedText
59 }]
60 };
61 } catch (error) {
62 return {
63 content: [{
64 type: "text",
65 text: `Error fetching URL: ${error instanceof Error ? error.message : String(error)}`
66 }],
67 isError: true
68 };
69 }
70 }
71);
72

Creating a Wrapper for Anthropic's API

To directly handle Claude's tool calling, create an API wrapper in src/anthropic.ts:

1// src/anthropic.ts
2import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
3
4interface ToolCallInput {
5 name: string;
6 input: Record<string, any>;
7}
8
9interface ToolCallResult {
10 content: string | { type: string; text: string }[];
11 isError?: boolean;
12}
13
14export async function handleClaudeToolCall(
15 server: McpServer,
16 toolCall: ToolCallInput
17): Promise<ToolCallResult> {
18 const { name, input } = toolCall;
19
20 // Get the tool from the server
21 const tools = await server.listTools();
22 const tool = tools.find(t => t.name === name);
23
24 if (!tool) {
25 return {
26 content: [{ type: "text", text: `Error: Tool '${name}' not found` }],
27 isError: true
28 };
29 }
30
31 try {
32 // Call the tool with the provided input
33 const result = await server.callTool({ name, arguments: input });
34 return result;
35 } catch (error) {
36 return {
37 content: [{
38 type: "text",
39 text: `Error executing tool '${name}': ${error instanceof Error ? error.message : String(error)}`
40 }],
41 isError: true
42 };
43 }
44}
45

Adding a Direct API Route for Claude

Add an API endpoint for direct tool calling from Claude:

1import { handleClaudeToolCall } from './anthropic.js';
2
3// Add route for Claude tool calls
4app.post('/api/claude/tool-call', async (req, res) => {
5 const { name, input } = req.body;
6
7 if (!name || !input) {
8 return res.status(400).json({
9 error: 'Missing required fields: name and input'
10 });
11 }
12
13 try {
14 const result = await handleClaudeToolCall(server, { name, input });
15 res.json(result);
16 } catch (error) {
17 console.error('Error handling tool call:', error);
18 res.status(500).json({
19 error: 'Internal server error',
20 message: error instanceof Error ? error.message : String(error)
21 });
22 }
23});
24

SSE Implementation (When Specifically Needed)

If you need to use SSE transport instead of Streamable HTTP, here's how to implement it:

1// src/sse_server.ts
2import express from 'express';
3import cors from 'cors';
4import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
5import { SSEServerTransport } from "@modelcontextprotocol/sdk/server/sse.js";
6import { z } from "zod";
7
8// Initialize Express
9const app = express();
10
11// Configure CORS and JSON middleware
12app.use(cors());
13app.use(express.json());
14
15// Create an MCP server
16const server = new McpServer({
17 name: "Claude SSE Tools Server",
18 version: "1.0.0",
19 description: "SSE MCP server for Claude 3.5 Sonnet tool calling"
20});
21
22// Add a simple example tool
23server.tool(
24 "echo",
25 "Echo back the provided message",
26 {
27 message: z.string().describe("The message to echo back")
28 },
29 async ({ message }) => {
30 return {
31 content: [{
32 type: "text",
33 text: `Echo: ${message}`
34 }]
35 };
36 }
37);
38
39// Map to store transports by session ID
40const sseTransports = new Map();
41
42// SSE endpoint
43app.get('/sse', async (req, res) => {
44 // Set SSE headers
45 res.writeHead(200, {
46 'Content-Type': 'text/event-stream',
47 'Cache-Control': 'no-cache',
48 'Connection': 'keep-alive'
49 });
50
51 // Create a new SSE transport
52 const transport = new SSEServerTransport('/messages', res);
53 sseTransports.set(transport.sessionId, transport);
54
55 // Clean up on connection close
56 req.on('close', () => {
57 sseTransports.delete(transport.sessionId);
58 console.log(`SSE connection closed: ${transport.sessionId}`);
59 });
60
61 // Connect to the MCP server
62 await server.connect(transport);
63});
64
65// Message endpoint for SSE
66app.post('/messages', async (req, res) => {
67 const sessionId = req.query.sessionId as string;
68 const transport = sseTransports.get(sessionId);
69
70 if (!transport) {
71 return res.status(400).json({ error: 'Invalid session ID' });
72 }
73
74 await transport.handlePostMessage(req, res);
75});
76
77// Start the server
78const PORT = process.env.PORT || 3000;
79app.listen(PORT, () => {
80 console.log(`SSE MCP server running on http://localhost:${PORT}`);
81 console.log(`SSE endpoint: http://localhost:${PORT}/sse`);
82});
83

Client-side Implementation

SSE Client Implementation

Create an SSE client to handle server events in src/sse/sseClient.ts:

1// src/sse/sseClient.ts
2export interface SSEOptions {
3 headers?: Record<string, string>;
4 withCredentials?: boolean;
5 onOpen?: (event: Event) => void;
6 onMessage?: (event: MessageEvent) => void;
7 onError?: (event: Event) => void;
8}
9
10export class SSEClient {
11 private eventSource: EventSource | null = null;
12 private url: string;
13 private options: SSEOptions;
14
15 constructor(url: string, options: SSEOptions = {}) {
16 this.url = url;
17 this.options = options;
18 }
19
20 connect(): void {
21 if (this.eventSource) {
22 this.close();
23 }
24
25 this.eventSource = new EventSource(this.url, {
26 withCredentials: this.options.withCredentials
27 });
28
29 this.eventSource.onopen = (event) => {
30 if (this.options.onOpen) {
31 this.options.onOpen(event);
32 }
33 };
34
35 this.eventSource.onmessage = (event) => {
36 if (this.options.onMessage) {
37 this.options.onMessage(event);
38 }
39 };
40
41 this.eventSource.onerror = (event) => {
42 if (this.options.onError) {
43 this.options.onError(event);
44 }
45
46 // Auto-reconnect is handled by the EventSource
47 if (this.eventSource?.readyState === EventSource.CLOSED) {
48 console.log('Connection closed. Attempting to reconnect...');
49 }
50 };
51 }
52
53 addEventListener(eventName: string, callback: (event: MessageEvent) => void): void {
54 if (!this.eventSource) {
55 throw new Error('Event source not initialized. Call connect() first.');
56 }
57
58 this.eventSource.addEventListener(eventName, callback as EventListener);
59 }
60
61 close(): void {
62 if (this.eventSource) {
63 this.eventSource.close();
64 this.eventSource = null;
65 }
66 }
67}
68

Anthropic API Client

Create a client for interacting with Anthropic's API:

1// src/anthropic/anthropicClient.ts
2import Anthropic from '@anthropic-ai/sdk';
3import { createParser } from 'eventsource-parser';
4
5export interface AnthropicClientOptions {
6 apiKey: string;
7 model?: string;
8 maxTokens?: number;
9}
10
11export class AnthropicClient {
12 private client: Anthropic;
13 private options: AnthropicClientOptions;
14
15 constructor(options: AnthropicClientOptions) {
16 this.options = {
17 model: 'claude-3-5-sonnet-20240620',
18 maxTokens: 1024,
19 ...options,
20 };
21
22 this.client = new Anthropic({
23 apiKey: this.options.apiKey,
24 });
25 }
26
27 async sendMessage(
28 messages: Array<{ role: 'user' | 'assistant'; content: string | Array<any> }>,
29 tools?: Array<any>,
30 stream = true
31 ) {
32 try {
33 if (stream) {
34 return this.client.messages.create({
35 model: this.options.model!,
36 max_tokens: this.options.maxTokens,
37 messages,
38 tools,
39 stream: true,
40 });
41 } else {
42 return this.client.messages.create({
43 model: this.options.model!,
44 max_tokens: this.options.maxTokens,
45 messages,
46 tools,
47 });
48 }
49 } catch (error) {
50 console.error('Error sending message to Anthropic:', error);
51 throw error;
52 }
53 }
54}
55

Handling Streaming Responses

Create a handler for processing streamed responses:

1// src/anthropic/streamHandler.ts
2export interface StreamingContentBlock {
3 type: string;
4 text?: string;
5 index: number;
6}
7
8export interface StreamingMessage {
9 id: string;
10 role: string;
11 content: StreamingContentBlock[];
12 stopReason: string | null;
13 usage: {
14 inputTokens: number;
15 outputTokens: number;
16 };
17}
18
19export class StreamHandler {
20 private currentMessage: StreamingMessage | null = null;
21 private contentBlocks: Map<number, StreamingContentBlock> = new Map();
22 private onContentUpdate: (content: string, isDone: boolean) => void;
23
24 constructor(onContentUpdate: (content: string, isDone: boolean) => void) {
25 this.onContentUpdate = onContentUpdate;
26 }
27
28 handleEvent(event: { type: string; data: any }): void {
29 switch (event.type) {
30 case 'message_start':
31 this.handleMessageStart(event.data.message);
32 break;
33 case 'content_block_start':
34 this.handleContentBlockStart(event.data.index, event.data.content_block);
35 break;
36 case 'content_block_delta':
37 this.handleContentBlockDelta(event.data.index, event.data.delta);
38 break;
39 case 'content_block_stop':
40 this.handleContentBlockStop(event.data.index);
41 break;
42 case 'message_delta':
43 // Handle message delta if needed
44 break;
45 case 'message_stop':
46 this.handleMessageStop(event.data);
47 break;
48 default:
49 // Handle other event types if needed
50 break;
51 }
52
53 // Update UI with current content
54 this.updateContent(event.type === 'message_stop');
55 }
56
57 private handleMessageStart(message: any): void {
58 this.currentMessage = {
59 id: message.id,
60 role: message.role,
61 content: [],
62 stopReason: null,
63 usage: message.usage || { inputTokens: 0, outputTokens: 0 },
64 };
65 this.contentBlocks.clear();
66 }
67
68 private handleContentBlockStart(index: number, contentBlock: any): void {
69 this.contentBlocks.set(index, {
70 type: contentBlock.type,
71 text: contentBlock.type === 'text' ? contentBlock.text || '' : undefined,
72 index,
73 });
74 }
75
76 private handleContentBlockDelta(index: number, delta: any): void {
77 const block = this.contentBlocks.get(index);
78 if (block && delta.type === 'text_delta' && block.type === 'text') {
79 block.text = (block.text || '') + (delta.text || '');
80 this.contentBlocks.set(index, block);
81 }
82 }
83
84 private handleContentBlockStop(index: number): void {
85 // Block is complete - could trigger specific UI updates for this block
86 }
87
88 private handleMessageStop(data: any): void {
89 if (this.currentMessage) {
90 this.currentMessage.stopReason = data.stop_reason || 'end_turn';
91 }
92 }
93
94 private updateContent(isDone: boolean): void {
95 if (!this.currentMessage) return;
96
97 // Sort blocks by index and concatenate text
98 const sortedBlocks = Array.from(this.contentBlocks.values())
99 .sort((a, b) => a.index - b.index);
100
101 // Extract text content
102 const textContent = sortedBlocks
103 .filter(block => block.type === 'text' && block.text)
104 .map(block => block.text)
105 .join('');
106
107 // Update UI via callback
108 this.onContentUpdate(textContent, isDone);
109 }
110
111 public getToolCalls(): any[] {
112 if (!this.currentMessage) return [];
113
114 // Extract tool_use blocks from content
115 return Array.from(this.contentBlocks.values())
116 .filter(block => block.type === 'tool_use');
117 }
118}


Tool Calling Implementation

Create a handler for processing tool calls:

1// src/anthropic/toolsHandler.ts
2import Anthropic from '@anthropic-ai/sdk';
3
4export interface Tool {
5 name: string;
6 description: string;
7 inputSchema: Record<string, any>;
8 handler: (params: any) => Promise<any>;
9}
10
11export class ToolsHandler {
12 private tools: Map<string, Tool> = new Map();
13 private anthropicClient: Anthropic;
14
15 constructor(anthropicClient: Anthropic) {
16 this.anthropicClient = anthropicClient;
17 }
18
19 registerTool(tool: Tool): void {
20 this.tools.set(tool.name, tool);
21 }
22
23 getToolsForApiRequest(): any[] {
24 return Array.from(this.tools.values()).map(tool => ({
25 name: tool.name,
26 description: tool.description,
27 input_schema: tool.inputSchema,
28 }));
29 }
30
31 async handleToolCall(toolCall: any): Promise<any> {
32 const { name, input } = toolCall;
33 const tool = this.tools.get(name);
34
35 if (!tool) {
36 throw new Error(`Tool "${name}" not found`);
37 }
38
39 try {
40 return await tool.handler(input);
41 } catch (error) {
42 console.error(`Error executing tool "${name}":`, error);
43 throw error;
44 }
45 }
46
47 async createToolResultMessage(
48 messageId: string,
49 toolCallId: string,
50 result: any
51 ): Promise<any> {
52 return {
53 role: 'user',
54 content: [
55 {
56 type: 'tool_result',
57 tool_call_id: toolCallId,
58 content: JSON.stringify(result),
59 },
60 ],
61 };
62 }
63}
64

Complete Client Implementation

Create a complete client implementation that ties everything together:

1// src/client.ts
2import { AnthropicClient } from './anthropic/anthropicClient.js';
3import { StreamHandler } from './anthropic/streamHandler.js';
4import { ToolsHandler, Tool } from './anthropic/toolsHandler.js';
5import { SSEClient } from './sse/sseClient.js';
6import Anthropic from '@anthropic-ai/sdk';
7
8// Configuration (in a real app, get from environment variables)
9const ANTHROPIC_API_KEY = process.env.ANTHROPIC_API_KEY || 'your-api-key-here';
10const SSE_SERVER_URL = 'http://localhost:3000/sse';
11
12// Initialize Anthropic client
13const anthropicClient = new AnthropicClient({
14 apiKey: ANTHROPIC_API_KEY,
15});
16
17// Initialize tools handler
18const toolsHandler = new ToolsHandler(new Anthropic({ apiKey: ANTHROPIC_API_KEY }));
19
20// Register a sample weather tool
21const weatherTool: Tool = {
22 name: 'get_weather',
23 description: 'Get the current weather for a location',
24 inputSchema: {
25 type: 'object',
26 properties: {
27 location: {
28 type: 'string',
29 description: 'The city and state, e.g., San Francisco, CA',
30 },
31 unit: {
32 type: 'string',
33 enum: ['celsius', 'fahrenheit'],
34 description: 'The unit of temperature',
35 },
36 },
37 required: ['location'],
38 },
39 handler: async (params) => {
40 // In a real app, this would call a weather API
41 console.log('Getting weather for', params.location);
42 return {
43 temperature: 72,
44 unit: params.unit || 'fahrenheit',
45 description: 'Sunny',
46 location: params.location,
47 };
48 },
49};
50
51toolsHandler.registerTool(weatherTool);
52
53// Function to handle user messages
54async function handleUserMessage(messageText: string) {
55 console.log(`User message: ${messageText}`);
56 console.log('Sending to Claude...');
57
58 try {
59 // Create stream handler to process streaming response
60 const streamHandler = new StreamHandler((content, isDone) => {
61 // In a real app, update UI with content
62 process.stdout.write('\r\033[K'); // Clear line
63 process.stdout.write(`Claude: ${content}`);
64
65 if (isDone) {
66 console.log('\n\nResponse complete');
67 // Check for tool calls
68 const toolCalls = streamHandler.getToolCalls();
69 if (toolCalls.length > 0) {
70 console.log('\nDetected tool calls:', toolCalls.length);
71 handleToolCalls(toolCalls);
72 }
73 }
74 });
75
76 // Send message to Anthropic
77 const stream = await anthropicClient.sendMessage(
78 [{ role: 'user', content: messageText }],
79 toolsHandler.getToolsForApiRequest(),
80 true // stream
81 );
82
83 // Process streaming response
84 for await (const event of stream) {
85 streamHandler.handleEvent(event);
86 }
87 } catch (error) {
88 console.error('Error handling message:', error);
89 }
90}
91
92// Function to handle tool calls
93async function handleToolCalls(toolCalls: any[]) {
94 if (!toolCalls.length) return;
95
96 try {
97 // Process each tool call
98 for (const toolCall of toolCalls) {
99 console.log(`\nExecuting tool: ${toolCall.name}`);
100 console.log(`Tool input:`, toolCall.input);
101
102 const result = await toolsHandler.handleToolCall({
103 name: toolCall.name,
104 input: toolCall.input,
105 });
106
107 console.log(`Tool result:`, result);
108
109 // Send tool result back to Anthropic
110 const toolResultMessage = await toolsHandler.createToolResultMessage(
111 toolCall.id,
112 toolCall.id,
113 result
114 );
115
116 console.log('\nSending tool result back to Claude...');
117
118 // Continue the conversation with the tool result
119 const response = await anthropicClient.sendMessage(
120 [
121 { role: 'user', content: 'What\'s the weather like?' },
122 { role: 'assistant', content: [toolCall] },
123 toolResultMessage,
124 ],
125 toolsHandler.getToolsForApiRequest(),
126 false // Don't stream the follow-up response
127 );
128
129 // Display final response
130 console.log('\nFinal response:', response.content);
131 }
132 } catch (error) {
133 console.error('Error handling tool calls:', error);
134 }
135}
136
137// Connect to SSE server
138function connectToSSEServer() {
139 const sseClient = new SSEClient(SSE_SERVER_URL, {
140 onOpen: () => {
141 console.log('Connected to SSE server');
142 },
143 onMessage: (event) => {
144 try {
145 const data = JSON.parse(event.data);
146 console.log('Received SSE message:', data);
147
148 // Handle different types of messages from the SSE server
149 if (data.type === 'tool_request') {
150 handleSSEToolRequest(data);
151 }
152 } catch (e) {
153 console.error('Error parsing SSE message:', e);
154 }
155 },
156 onError: (error) => {
157 console.error('SSE connection error:', error);
158 }
159 });
160
161 sseClient.connect();
162 return sseClient;
163}
164
165// Handle tool requests from SSE server
166async function handleSSEToolRequest(data: any) {
167 try {
168 console.log(`Executing tool from SSE: ${data.tool}`);
169
170 // Execute tool and get result
171 const result = await toolsHandler.handleToolCall({
172 name: data.tool,
173 input: data.input,
174 });
175
176 console.log('Tool result:', result);
177
178 // Send result back to SSE server
179 fetch(`http://localhost:3000/tool-result`, {
180 method: 'POST',
181 headers: {
182 'Content-Type': 'application/json',
183 },
184 body: JSON.stringify({
185 requestId: data.requestId,
186 result,
187 }),
188 });
189 } catch (error) {
190 console.error('Error handling SSE tool request:', error);
191 }
192}
193
194// Main function to run the client
195async function main() {
196 console.log('Starting Claude Tools Client');
197
198 // Connect to SSE server
199 const sseClient = connectToSSEServer();
200
201 // Example usage
202 await handleUserMessage('What\'s the weather like in San Francisco?');
203}
204
205// Run the client
206main().catch(console.error);


Best Practices and Potential Pitfalls

Best Practices

Structuring TypeScript Code

Follow a Modular Architecture

  • Separate concerns: connection handling, message parsing, tool execution, error handling
  • Use TypeScript interfaces for clear contracts between modules
  • Implement dependency injection for testability

SSE Server Configuration

1// Set proper SSE headers
2res.writeHead(200, {
3 'Content-Type': 'text/event-stream; charset=utf-8',
4 'Cache-Control': 'no-cache, no-transform',
5 'Connection': 'keep-alive'
6});
7

Security Considerations

API Key Management

  • Never expose Anthropic API keys in client-side code
  • Use environment variables or secure secret management
  • Implement key rotation policies

Authentication for MCP

1// Example JWT authentication
2import { expressjwt } from 'express-jwt';
3
4const authenticate = expressjwt({
5 secret: process.env.JWT_SECRET || 'your-secret-key',
6 algorithms: ['HS256']
7});
8
9// Apply to routes that need protection
10app.post('/api/claude/tool-call', authenticate, async (req, res) => {
11 // Route implementation
12});


Input Validation

1// Always validate inputs with Zod
2server.tool(
3 "example_tool",
4 "An example tool",
5 {
6 param1: z.string().min(1).max(100).describe("Parameter description"),
7 param2: z.number().min(0).max(100).optional().describe("Optional parameter")
8 },
9 async ({ param1, param2 }) => {
10 // Implementation
11 }
12);


Performance Optimization

Connection Management

  • Use HTTP/2 to overcome browser connection limits (6 per domain)
  • Close inactive connections to free resources
  • Implement timeouts for idle connections

Memory Efficiency

  • Use streams instead of buffering entire responses
  • Clear references to large objects when no longer needed
  • Be cautious with closures that might retain references

Common Pitfalls

SSE Implementation Issues

Browser Connection Limits

  • Browsers limit SSE connections to 6 per domain without HTTP/2
  • Use HTTP/2 or implement connection pooling strategies
  • Consider using WebSockets for extremely high-connection scenarios

Memory Leaks

1// Always clean up SSE connections
2req.on('close', () => {
3 clearInterval(heartbeatInterval);
4 delete connections[connectionId];
5});
6

Proxying Issues

  • Some proxies buffer responses, breaking the SSE stream
  • Set appropriate headers to prevent buffering
1res.setHeader('X-Accel-Buffering', 'no'); // For NGINX
2

Claude API Limitations

Tool Call Timeouts

  • Long-running tools may time out
  • Consider background processing with task queues
  • Return interim status updates for long operations

Sequential Tool Calls

  • Multiple dependent tool calls are executed sequentially
  • Design tools to minimize interdependencies
  • Consider combining related operations into single tools

Edge Cases

Network Disruptions

  • Implement reconnection logic with exponential backoff
  • Store state to allow resuming conversations
  • Provide user feedback during reconnection attempts

Browser Compatibility

  • Add EventSource polyfills for older browsers
  • Test across multiple browser environments
  • Provide fallback mechanisms for browsers without SSE

Error Handling Approaches

Robust API Call Error Handling

1try {
2 const response = await anthropic.messages.create({
3 model: "claude-3-5-sonnet-latest",
4 max_tokens: 1024,
5 messages: [{ role: "user", content: "Hello, Claude" }],
6 stream: true,
7 });
8} catch (error) {
9 if (error instanceof Anthropic.APIError) {
10 // Handle API-specific errors
11 console.error(`API Error: ${error.status} ${error.message}`);
12
13 // Handle specific status codes
14 if (error.status === 429) {
15 // Rate limiting - implement backoff and retry
16 } else if (error.status === 401) {
17 // Authentication issue
18 }
19 } else {
20 // Handle unexpected errors
21 console.error(`Unexpected error: ${error.message}`);
22 }
23}


Implementing Retry Logic

1async function fetchWithRetry(fn, options = {}) {
2 const { maxRetries = 3, baseDelay = 300, maxDelay = 10000 } = options;
3
4 let lastError;
5 for (let attempt = 0; attempt < maxRetries; attempt++) {
6 try {
7 return await fn();
8 } catch (err) {
9 lastError = err;
10
11 // Skip retry for certain errors
12 if (err.status === 400 || err.status === 401) {
13 throw err;
14 }
15
16 // Exponential backoff with jitter
17 const delay = Math.min(
18 maxDelay,
19 baseDelay * Math.pow(2, attempt) * (0.8 + Math.random() * 0.4)
20 );
21
22 console.log(`Retrying after ${delay}ms (attempt ${attempt + 1}/${maxRetries})`);
23 await new Promise(resolve => setTimeout(resolve, delay));
24 }
25 }
26
27 throw lastError;
28}
29

SSE Reconnection Handling

1// Server-side: Handle reconnections with last-event-id
2const lastEventId = req.headers['last-event-id'];
3if (lastEventId) {
4 // Resume from the last event
5 const missedEvents = getEventsSince(lastEventId);
6 for (const event of missedEvents) {
7 sendEvent(res, event);
8 }
9}
10
11// Client-side: EventSource handles reconnection automatically
12// But you can implement custom logic for complex scenarios
13let reconnectAttempts = 0;
14
15function connectSSE() {
16 const es = new EventSource(url);
17
18 es.onopen = () => {
19 reconnectAttempts = 0;
20 };
21
22 es.onerror = (error) => {
23 if (es.readyState === EventSource.CLOSED) {
24 // Custom reconnection logic
25 reconnectAttempts++;
26 const delay = Math.min(1000 * Math.pow(2, reconnectAttempts), 30000);
27
28 console.log(`Connection closed. Reconnecting in ${delay}ms...`);
29 setTimeout(connectSSE, delay);
30 }
31 };
32
33 return es;
34}


Heartbeat for Connection Maintenance

1// Send heartbeat every 30 seconds
2const heartbeatInterval = setInterval(() => {
3 if (res.writableEnded) {
4 clearInterval(heartbeatInterval);
5 return;
6 }
7
8 res.write(':keepalive\n\n');
9}, 30000);
10
11// Clean up on connection close
12req.on('close', () => {
13 clearInterval(heartbeatInterval);
14});
15

Testing Methodologies

Unit Testing

Testing individual components:

1// Example Jest test for SSE server
2describe('SSE Server', () => {
3 let server;
4 let mockResponse;
5
6 beforeEach(() => {
7 mockResponse = {
8 writeHead: jest.fn(),
9 write: jest.fn(),
10 end: jest.fn(),
11 on: jest.fn()
12 };
13 server = new SSEServer();
14 });
15
16 test('should set correct headers', () => {
17 server.handleRequest({}, mockResponse);
18
19 expect(mockResponse.writeHead).toHaveBeenCalledWith(200, {
20 'Content-Type': 'text/event-stream; charset=utf-8',
21 'Cache-Control': 'no-cache, no-transform',
22 'Connection': 'keep-alive'
23 });
24 });
25
26 test('should send event with correct format', () => {
27 server.handleRequest({}, mockResponse);
28 server.sendEvent(mockResponse, 'update', { data: 'test' });
29
30 expect(mockResponse.write).toHaveBeenCalledWith(
31 'event: update\ndata: {"data":"test"}\n\n'
32 );
33 });
34});
35

Integration Testing

Testing the complete flow:

1// Example integration test with supertest
2import request from 'supertest';
3import { app } from '../src/server';
4
5describe('MCP Server API', () => {
6 test('POST /api/claude/tool-call should execute a tool and return result', async () => {
7 const response = await request(app)
8 .post('/api/claude/tool-call')
9 .send({
10 name: 'calculate',
11 input: { expression: '5 + 3' }
12 });
13
14 expect(response.status).toBe(200);
15 expect(response.body.content[0].text).toContain('Result: 8');
16 });
17});
18

Mocking Anthropic's API

1// Mock the Anthropic API client
2jest.mock('@anthropic-ai/sdk', () => {
3 return {
4 Anthropic: jest.fn().mockImplementation(() => ({
5 messages: {
6 create: jest.fn().mockImplementation(async ({ stream }) => {
7 if (stream) {
8 // Return mock stream
9 return {
10 [Symbol.asyncIterator]: async function* () {
11 yield { type: 'content_block_start', content_block: { type: 'text' } };
12 yield { type: 'content_block_delta', delta: { text: 'Hello' } };
13 yield { type: 'content_block_delta', delta: { text: ' world' } };
14 yield { type: 'content_block_stop' };
15 yield { type: 'message_stop' };
16 }
17 };
18 }
19
20 // Return mock non-streaming response
21 return {
22 id: 'msg_mock123',
23 content: [{ type: 'text', text: 'Hello world' }]
24 };
25 })
26 }
27 }))
28 };
29});
30

Load Testing

1// Example k6 script for load testing SSE connections
2import http from 'k6/http';
3import { sleep } from 'k6';
4import { check } from 'k6';
5
6export default function() {
7 const res = http.get('http://localhost:3000/sse');
8
9 check(res, {
10 'status is 200': (r) => r.status === 200,
11 'content-type is text/event-stream': (r) => r.headers['Content-Type'] &&
12 r.headers['Content-Type'].includes('text/event-stream'),
13 });
14
15 sleep(10);
16}
17
18export const options = {
19 vus: 10, // 10 virtual users
20 duration: '30s' // Test for 30 seconds
21};


Conclusion

This tutorial has covered implementing a tools calling system via Server-Sent Events (SSE) using Anthropic's Claude 3.5 Sonnet. Key components include:

The Claude Messages API and tools calling functionality

Server-Sent Events (SSE) for streaming responses

The Message Control Protocol (MCP) for standardized tool integration

Server-side implementation in TypeScript

Client-side implementation for handling tool calls

Best practices for security, performance, and error handling

Testing methodologies for robust implementation

While SSE transport is being deprecated in favor of Streamable HTTP, the concepts and patterns in this tutorial apply to both approaches. As Anthropic's API evolves, the core principles of tool calling, streaming responses, and event-based architecture will remain relevant.

By leveraging these technologies, you can build powerful AI applications that combine Claude's intelligence with custom tools and data sources, creating more capable and interactive experiences for your users.

More Kai stories