Integrating multiple Large Language Models (LLMs) like OpenAI and Anthropic's Claude into applications can be a daunting task. The complexities of handling different APIs, communication protocols, and ensuring efficient routing of requests can introduce significant challenges.
\ But using a message broker and router can be an elegant solution to this problem, addressing these pain points and providing several key advantages.
\ In this blog post, we'll look at just how to do this. We’ll provide code examples to guide you through setting up a router that interfaces with both OpenAI and Anthropic's Claude using KubeMQ as our example.
Key Advantages of Using a Message Broker as an LLM Router 1. Simplified IntegrationBy using a message broker as a router, you abstract the complexities involved in directly interfacing with different LLM APIs. This simplifies the client-side code and reduces the likelihood of errors.
2. Multi-Model Use CasesA message broker facilitates communication between multiple LLMs or models specialized for different tasks (e.g., one for summarization, another for sentiment analysis). It ensures requests are routed to the appropriate model efficiently, allowing applications to leverage the strengths of each model without additional overhead.
3. Batch Processing and Large-Scale InferenceFor applications requiring batch processing or large-scale inference tasks, a message broker enables asynchronous handling by queuing requests when LLMs are busy or unavailable. This ensures that no data or requests are lost, providing reliable processing even under heavy workloads.
4. Redundancy and Fallback AssuranceIn scenarios where uptime is critical, a message broker ensures seamless fallback to alternative environments. For example, if a connection to one cloud provider offering an OpenAI model fails, KubeMQ can automatically switch to another provider. This redundancy guarantees uninterrupted AI operations, maintaining service reliability and customer satisfaction.
5. Handling High Traffic ApplicationsA message broker distributes incoming requests across multiple LLM instances or replicas, preventing overloading and ensuring smooth operation. This load balancing is essential for high-traffic applications, allowing them to scale effectively without compromising performance.
Building an LLM Router With KubeMQ: Integrating OpenAI and ClaudeNow, I’ll guide you through setting up a router that interfaces with both OpenAI and Anthropic's Claude using KubeMQ–a leading, open-source message broker and message queue platform.
\ Leveraging KubeMQ's advantages and providing code examples, we'll walk through setting up the messaging broker, building the server-side router, and creating a client to send queries.
\ All code examples can be found in KubeMQ’s GitHub repository.
PrerequisitesBefore we begin, ensure you have the following:
\
\ kubemq-cqPython package installed: \n pip install kubemq-cq
\ .envfile containing your API keys: \n OPENAI_API_KEY=your_openai_api_key \n ANTHROPIC_API_KEY=your_anthropic_api_key
Setting Up KubeMQFirst, we need to ensure that KubeMQ is operational. We'll deploy it using Docker:
\
docker run -d --rm \ -p 8080:8080 \ -p 50000:50000 \ -p 9090:9090 \ -e KUBEMQ_TOKEN="your_token" \ kubemq/kubemq-community:latest\ Ports Explanation:
\
\ Note: Replace "your_token" with your actual KubeMQ token.
Creating the LLM Router ServerThe LLM Router acts as an intermediary between clients and the LLMs. It listens to specific channels for queries and routes them to the appropriate LLM.
server.py import time from kubemq.cq import Client, QueryMessageReceived, QueryResponseMessage, QueriesSubscription, CancellationToken from langchain.chat_models import ChatOpenAI from langchain.llms import Anthropic import os from dotenv import load_dotenv import threading load_dotenv() class LLMRouter: def __init__(self): self.openai_llm = ChatOpenAI( api_key=os.getenv("OPENAI_API_KEY"), model_name="gpt-3.5-turbo" ) self.claude_llm = Anthropic( api_key=os.getenv("ANTHROPIC_API_KEY"), model="claude-3" ) self.client = Client(address="localhost:50000") def handle_openai_query(self, request: QueryMessageReceived): try: message = request.body.decode('utf-8') result = self.openai_llm(message) response = QueryResponseMessage( query_received=request, is_executed=True, body=result.encode('utf-8') ) self.client.send_response_message(response) except Exception as e: self.client.send_response_message(QueryResponseMessage( query_received=request, is_executed=False, error=str(e) )) def handle_claude_query(self, request: QueryMessageReceived): try: message = request.body.decode('utf-8') result = self.claude_llm(message) response = QueryResponseMessage( query_received=request, is_executed=True, body=result.encode('utf-8') ) self.client.send_response_message(response) except Exception as e: self.client.send_response_message(QueryResponseMessage( query_received=request, is_executed=False, error=str(e) )) def run(self): def on_error(err: str): print(f"Error: {err}") def subscribe_openai(): self.client.subscribe_to_queries( subscription=QueriesSubscription( channel="openai_requests", on_receive_query_callback=self.handle_openai_query, on_error_callback=on_error, ), cancel=CancellationToken() ) def subscribe_claude(): self.client.subscribe_to_queries( subscription=QueriesSubscription( channel="claude_requests", on_receive_query_callback=self.handle_claude_query, on_error_callback=on_error, ), cancel=CancellationToken() ) threading.Thread(target=subscribe_openai).start() threading.Thread(target=subscribe_claude).start() print("LLM Router running on channels: openai_requests, claude_requests") try: while True: time.sleep(1) except KeyboardInterrupt: print("Shutting down...") if __name__ == "__main__": router = LLMRouter() router.run()\
ExplanationInitialization:
Loads environment variables for API keys.
Initializes clients for OpenAI and Anthropic LLMs.
Set up a KubeMQ client.
\
Handling Queries:
handleopenaiquery and handleclaudequery decode the incoming message, pass it to the respective LLM, and send back the response.
Errors are caught and sent back with the is_executed flag set to False.
\
Subscription:
The router subscribes to two channels: openairequests and clauderequests.
Uses threading to handle subscriptions concurrently.
\
Running the Server:
The run method starts the subscriptions and keeps the server running until interrupted.
The client sends queries to the LLM Router, specifying which model to use.
client.py from kubemq.cq import Client, QueryMessage import json class LLMClient: def __init__(self, address="localhost:50000"): self.client = Client(address=address) def send_message(self, message: str, model: str) -> dict: channel = f"{model}_requests" response = self.client.send_query_request(QueryMessage( channel=channel, body=message.encode('utf-8'), timeout_in_seconds=30 )) if response.is_error: return {"error": response.error} else: return {"response": response.body.decode('utf-8')} if __name__ == "__main__": client = LLMClient() models = ["openai", "claude"] message = input("Enter your message: ") model = input(f"Choose model ({'/'.join(models)}): ") if model in models: response = client.send_message(message, model) if "error" in response: print(f"Error: {response['error']}") else: print(f"Response: {response['response']}") else: print("Invalid model selected")\
ExplanationInitialization:
Sets up a KubeMQ client.
\
Sending Messages:
send_message method constructs the appropriate channel based on the selected model.
Sends a query message to the router and waits for the response.
Handles errors and decodes the response body.
\
User Interaction:
Prompts the user to enter a message and select a model.
Prints out the response from the LLM.
For services or clients that prefer or require RESTful communication, KubeMQ provides REST endpoints.
Sending a Request via RESTEndpoint:
POST http://localhost:9090/send/request
\ Headers:
Content-Type: application/json
Body:
{ "RequestTypeData": 2, "ClientID": "LLMRouter-sender", "Channel": "openai_requests", "BodyString": "What is the capital of France?", "Timeout": 30000 }\ Payload details:
The response will be a JSON object containing the LLM's output or an error message.
ConclusionBy leveraging a message broker (KubeMQ), we've built a scalable and efficient router that can interface with multiple LLMs. This setup allows clients to send queries to different models seamlessly and can be extended to include more models or functionalities.
\ Benefits of this Approach:
\
\ Have a really great day!
All Rights Reserved. Copyright , Central Coast Communications, Inc.