Content
# Kachaka MCP Server
> [!WARNING]
> The code in this repository is experimental and may not function correctly.
## 1. Overview
The Kachaka MCP Server (kachaka-mcp) is a server that provides the functionalities of [Kachaka](https://kachaka.life/home/) to various AI models through the Model Context Protocol (MCP). This server allows AI models such as Claude, GPT-4, and local LLMs to understand and control the state of Kachaka.
## 2. Deployment
### 2.1 Installation using uv sync
Install [uv](https://docs.astral.sh/uv/) (if you haven't already).
Follow the steps below to install kachaka-mcp.
```bash
git clone https://github.com/pf-robotics/kachaka-mcp.git
cd kachaka-mcp
# Installation
uv sync
# Or install including development dependencies
uv sync --all
```
### 2.2 Usage from Claude Desktop
Please add the following configuration to `claude_desktop_config.json`:
```json
{
"mcpServers": {
"kachaka-mcp": {
"command": "uv",
"args": [
"--directory",
"<path to kachaka-mcp directory>",
"run",
"python",
"-m",
"kachaka_mcp.server"
],
"env": {
"KACHAKA_HOST": "<kachaka host>"
}
}
}
}
```
Here, please modify the kachaka-mcp directory and the value of `KACHAKA_HOST` according to your environment.
Example configuration:
```json
{
"mcpServers": {
"kachaka-mcp": {
"command": "uv",
"args": [
"--directory",
"C:\\kachaka-mcp",
"run",
"python",
"-m",
"kachaka_mcp.server"
],
"env": {
"KACHAKA_HOST": "192.168.1.100:26400"
}
}
}
}
```
### 2.3 Debugging with MCP Inspector
To debug using [MCP Inspector](https://github.com/modelcontextprotocol/inspector), you can do the following:
```bash
npx @modelcontextprotocol/inspector -e KACHAKA_HOST=192.168.1.100:26400 uv run python -m kachaka_mcp.server
```
### 2.4 Direct Execution (All Platforms)
```bash
# Installation
pip install -e .
# Configuration
export KACHAKA_HOST="192.168.1.100:26400" # Kachaka host (Linux/macOS)
# Or
set KACHAKA_HOST=192.168.1.100:26400 # Kachaka host (Windows CMD)
# Or
$env:KACHAKA_HOST = "192.168.1.100:26400" # Kachaka host (Windows PowerShell)
# Execution
python -m kachaka_mcp.server
# Or
mcp run kachaka_mcp.server
# Run in development mode (with MCP Inspector)
mcp dev kachaka_mcp.server
# Install to Claude Desktop
mcp install kachaka_mcp.server --name "Kachaka Robot" -v KACHAKA_HOST=192.168.1.100:26400
```
## 3. Using Configuration Files
You can also use a configuration file instead of environment variables. The configuration file should be placed at `~/.kachaka-mcp/config.json` (Linux/macOS) or `%USERPROFILE%\.kachaka-mcp\config.json` (Windows).
```json
{
"kachaka_host": "192.168.1.100:26400",
"server_name": "My Kachaka Robot",
"log_level": "INFO",
"auth_enabled": false,
"api_keys": []
}
```
The location of the configuration file can be changed using the environment variable `KACHAKA_MCP_CONFIG`:
```bash
# Linux/macOS
export KACHAKA_MCP_CONFIG="/path/to/config.json"
# Windows CMD
set KACHAKA_MCP_CONFIG=C:\path\to\config.json
# Windows PowerShell
$env:KACHAKA_MCP_CONFIG = "C:\path\to\config.json"
```
## 4. Architecture
```mermaid
graph TD
A[AI Model] <-->|MCP| B[kachaka-mcp Server]
B <-->|gRPC| C[Kachaka]
subgraph "AI Models"
A1[Claude] --- A
A2[GPT-4] --- A
A3[Local LLM] --- A
end
subgraph "kachaka-mcp"
D[Resource Layer] --> F[Kachaka API Client]
E[Tool Layer] --> F
G[Prompt Layer] --> F
H[Authentication & Security Layer]
end
```
## 5. Key Components
### 5.1 Kachaka API Client
- Responsible for communication with the Kachaka API
- Uses KachakaApiClient to communicate with Kachaka
- Supports both synchronous and asynchronous interfaces
- Manages network connections and reconnection functionality
### 5.2 Resource Layer
Exposes Kachaka's state information as MCP resources:
#### 5.2.1 Kachaka Information Resource
- `robot://status` - Current status of Kachaka (position, battery, errors, etc.)
- `robot://version` - Kachaka version information
- `robot://serial` - Serial number
- `robot://command` - Information about the currently executing command
#### 5.2.2 Map Resource
- `map://current` - Current map information (PNG format)
- `map://locations/{location_id?}` - Information about registered locations
- `map://shelves/{shelf_id?}` - Information and location of shelves
- `map://list` - List of available maps
#### 5.2.3 Sensor Resource
- `sensors://camera/front` - Front camera image
- `sensors://camera/back` - Back camera image
- `sensors://camera/tof` - ToF camera image
- `sensors://laser` - Laser scan data
- `sensors://imu` - IMU data
- `sensors://odometry` - Odometry data
- `sensors://object_detection` - Object detection results
### 5.3 Tool Layer
Exposes Kachaka's operational capabilities as MCP tools:
#### 5.3.1 Movement Tool
- `move_to_location(location_name: str)` - Move to the specified location
- `move_to_pose(x: float, y: float, yaw: float)` - Move to the specified coordinates
- `return_home()` - Return home
- `move_forward(distance_meter: float, speed: float)` - Move forward a specified distance
- `rotate_in_place(angle_radian: float)` - Rotate in place
- `set_robot_velocity(linear: float, angular: float)` - Set Kachaka's speed
#### 5.3.2 Shelf Operation Tool
- `move_shelf(shelf_name: str, location_name: str)` - Move the shelf to the specified location
- `return_shelf(shelf_name: str)` - Return the shelf to its original location
- `dock_shelf()` - Dock the shelf
- `undock_shelf()` - Undock the shelf
- `dock_any_shelf_with_registration(location_name: str, dock_forward: bool)` - Dock and register any shelf
#### 5.3.3 System Operation Tool
- `speak(text: str)` - Speak the text aloud
- `cancel_command()` - Cancel the currently executing command
- `proceed()` - Proceed to the next step
- `lock(duration_sec: float)` - Lock for the specified duration
- `set_auto_homing_enabled(enable: bool)` - Enable/disable auto homing
- `set_manual_control_enabled(enable: bool)` - Enable/disable manual control
- `set_speaker_volume(volume: int)` - Set the speaker volume
- `restart_robot()` - Restart Kachaka
#### 5.3.4 Map Operation Tool
- `switch_map(map_id: str)` - Switch the map
- `export_map(map_id: str, output_file_path: str)` - Export the map
- `import_map(target_file_path: str)` - Import the map
- `set_robot_pose(pose: dict)` - Set Kachaka's position
### 5.4 Prompt Layer
Provides prompt templates to facilitate interaction with AI models:
- `robot_control_prompt()` - Basic prompt for Kachaka control
- `shelf_operation_prompt()` - Basic prompt for shelf operations
- `navigation_prompt()` - Basic prompt for navigation
- `error_handling_prompt()` - Basic prompt for error handling
### 5.5 Authentication & Security Layer
- API key-based authentication
- Access control and permission management
- Operation logging
## 6. Implementation Plan
### 6.1 Project Structure
```
kachaka-mcp/
├── README.md # Project overview
├── pyproject.toml # Project settings
├── requirements.txt # Dependencies
├── src/
│ └── kachaka_mcp/
│ ├── __init__.py
│ ├── server.py # Main implementation of the MCP server
│ ├── resources.py # Resource definitions
│ ├── tools.py # Tool definitions
│ ├── prompts.py # Prompt definitions
│ ├── client.py # Kachaka API client wrapper
│ ├── auth.py # Authentication & security
│ └── utils/
│ ├── __init__.py
│ ├── config.py # Configuration management
│ └── logging.py # Logging
├── scripts/
│ ├── install.sh # Installation script
│ └── run.sh # Execution script
└── tests/
├── __init__.py
├── test_server.py
├── test_resources.py
└── test_tools.py
```
### 6.2 Dependencies
```toml
[project]
name = "kachaka-mcp"
version = "0.1.0"
description = "MCP server for Kachaka robot"
requires-python = ">=3.10"
dependencies = [
"mcp[cli]>=1.7.1",
"kachaka-api@git+https://github.com/pf-robotics/kachaka-api",
"grpcio",
"pydantic",
"pillow",
"numpy",
"loguru",
"protobuf==5.27.2"
]
[project.optional-dependencies]
dev = [
"pytest",
"black",
"isort",
"mypy",
"ruff",
]
```
### 6.3 Server Implementation
#### 6.3.1 Main Server Class
```python
# server.py
import asyncio
from contextlib import asynccontextmanager
from typing import AsyncIterator, Optional
from kachaka_api.aio import KachakaApiClient
from mcp.server.fastmcp import Context, FastMCP
from .resources import register_resources
from .tools import register_tools
from .prompts import register_prompts
from .auth import KachakaAuthProvider
class KachakaMCPContext:
"""Context for Kachaka MCP Server"""
def __init__(self, kachaka_client: KachakaApiClient):
self.kachaka_client = kachaka_client
@asynccontextmanager
async def kachaka_lifespan(server: FastMCP) -> AsyncIterator[KachakaMCPContext]:
"""Manage lifespan of Kachaka MCP Server"""
# Load configuration
from .utils.config import load_config
config = load_config()
# Initialize Kachaka API client
kachaka_client = KachakaApiClient(target=config.kachaka_host)
try:
# Create and provide context
context = KachakaMCPContext(kachaka_client)
yield context
finally:
# Cleanup
pass
def create_server(server_name: str = "Kachaka Robot") -> FastMCP:
"""Create Kachaka MCP Server"""
# Create MCP server
mcp = FastMCP(
server_name,
lifespan=kachaka_lifespan,
auth_provider=KachakaAuthProvider(),
)
# Register resources, tools, and prompts
register_resources(mcp)
register_tools(mcp)
register_prompts(mcp)
return mcp
def main():
"""Main function"""
server = create_server()
server.run()
if __name__ == "__main__":
main()
```
#### 6.3.2 Resource Implementation Example
```python
# resources.py
from mcp.server.fastmcp import FastMCP, Context, Image
from PIL import Image as PILImage
import io
import json
def register_resources(mcp: FastMCP):
"""Register resources"""
@mcp.resource("robot://status")
async def get_robot_status(ctx: Context) -> str:
"""Get the current status of Kachaka"""
kachaka_client = ctx.request_context.lifespan_context.kachaka_client
# Retrieve various information
pose = await kachaka_client.get_robot_pose()
battery_info = await kachaka_client.get_battery_info()
command_state, command = await kachaka_client.get_command_state()
# Return as JSON
status = {
"pose": {
"x": pose.x,
"y": pose.y,
"yaw": pose.yaw
},
"battery": {
"percentage": battery_info[0],
"status": str(battery_info[1])
},
"command_state": str(command_state),
"command": {
"type": command.WhichOneof("command"),
"id": command_state.command_id if command_state.command_id else ""
}
}
return json.dumps(status, indent=2)
@mcp.resource("map://current")
async def get_current_map(ctx: Context) -> Image:
"""Get the current map image"""
kachaka_client = ctx.request_context.lifespan_context.kachaka_client
# Retrieve the map
map_data = await kachaka_client.get_png_map()
# Return as image
return Image(data=map_data.data, format="png")
# Other resources can be implemented similarly...
```
#### 6.3.3 Tool Implementation Example
```python
# tools.py
from mcp.server.fastmcp import FastMCP, Context
from typing import Dict, Any
def register_tools(mcp: FastMCP):
"""Register tools"""
@mcp.tool()
async def move_to_location(location_name: str, ctx: Context) -> str:
"""Move Kachaka to the specified location"""
kachaka_client = ctx.request_context.lifespan_context.kachaka_client
# Set progress report
ctx.info(f"Moving to location: {location_name}")
# Execute move command
result = await kachaka_client.move_to_location(
location_name,
wait_for_completion=True
)
# Return result
if result.success:
return f"Successfully moved to {location_name}"
else:
return f"Failed to move to {location_name}: {result.message}"
@mcp.tool()
async def speak(text: str, ctx: Context) -> str:
"""Speak the text aloud"""
kachaka_client = ctx.request_context.lifespan_context.kachaka_client
# Execute speak command
result = await kachaka_client.speak(text, wait_for_completion=True)
# Return result
if result.success:
return f"Successfully spoke: {text}"
else:
return f"Failed to speak: {result.message}"
# Other tools can be implemented similarly...
```
#### 6.3.4 Prompt Implementation Example
```python
# prompts.py
from mcp.server.fastmcp import FastMCP
from mcp.server.fastmcp.prompts import base
def register_prompts(mcp: FastMCP):
"""Register prompts"""
@mcp.prompt()
def robot_control_prompt() -> list[base.Message]:
"""Basic prompt for Kachaka control"""
return [
base.SystemMessage(
"You are an assistant controlling Kachaka. "
"You can operate Kachaka using the following tools:"
"- move_to_location: Move to the specified location"
"- return_home: Return home"
"- speak: Speak the text aloud"
"etc."
),
base.UserMessage("Please help me operate Kachaka."),
]
# Other prompts can be implemented similarly...
```
### 6.4 Configuration Management
The values for `kachaka_host` and `KACHAKA_HOST` mentioned later should match the value following `target=` in "[Using Kachaka API with Python](https://github.com/pf-robotics/kachaka-api/blob/main/docs/PYTHON.md)".
Note that `26400` is the port number for the [Kachaka API Server (gRPC)](https://github.com/pf-robotics/kachaka-api/blob/main/docs/PLAYGROUND.md#playground%E3%81%AE%E4%BB%95%E6%A7%98).
```python
# utils/config.py
from pydantic import BaseModel
import os
import json
from pathlib import Path
class KachakaMCPConfig(BaseModel):
"""Configuration for Kachaka MCP Server"""
kachaka_host: str = "192.168.1.100:26400" # Default Kachaka host
server_name: str = "Kachaka Robot"
log_level: str = "INFO"
auth_enabled: bool = False
api_keys: list[str] = []
def load_config() -> KachakaMCPConfig:
"""Load configuration"""
# Load configuration from environment variables
config_path = os.environ.get("KACHAKA_MCP_CONFIG", "~/.kachaka-mcp/config.json")
config_path = Path(config_path).expanduser()
# Default configuration
config = KachakaMCPConfig()
# Load from configuration file if it exists
if config_path.exists():
with open(config_path, "r") as f:
config_data = json.load(f)
config = KachakaMCPConfig(**config_data)
# Overwrite with environment variables
if os.environ.get("KACHAKA_HOST"):
config.kachaka_host = os.environ.get("KACHAKA_HOST")
return config
```
### 6.5 Authentication Implementation
```python
# auth.py
from mcp.server.auth import OAuthServerProvider, ClientRegistrationOptions, RevocationOptions
from typing import Dict, List, Optional
class KachakaAuthProvider(OAuthServerProvider):
"""Authentication provider for Kachaka MCP Server"""
def __init__(self):
from .utils.config import load_config
self.config = load_config()
self.api_keys = set(self.config.api_keys)
async def validate_client_credentials(self, client_id: str, client_secret: str) -> bool:
"""Validate client credentials"""
if not self.config.auth_enabled:
return True
return client_secret in self.api_keys
# Other authentication methods can be implemented...
```
## 7. Testing Plan
### 7.1 Unit Tests
- Test resource functionalities
- Test tool functionalities
- Test authentication functionalities
### 7.2 Integration Tests
- Test server startup and shutdown
- Test integration with AI models
- Test error handling
### 7.3 End-to-End Tests
- Test integration with the actual Kachaka
- Test access from various AI models
## 8. Future Extensibility
### 8.1 Short-term Expansion Plans
- Addition of a Web UI (for monitoring server status, changing settings, etc.)
- Management features for multiple Kachakas
- Support for custom commands
### 8.2 Medium to Long-term Expansion Plans
- Advanced scheduling features
- User-defined workflows
- Integration with other robot platforms
## 9. Security Considerations
- Network security (TLS/SSL support)
- Access control and authentication
- Ensuring safe operations (limiting dangerous operations)
- Logging and auditing
## 10. License
[Apache License 2.0](LICENSE)
Connection Info
You Might Also Like
markitdown
MarkItDown-MCP is a lightweight server for converting URIs to Markdown.
everything-claude-code
Complete Claude Code configuration collection - agents, skills, hooks,...
servers
Model Context Protocol Servers
Time
A Model Context Protocol server for time and timezone conversions.
Filesystem
Node.js MCP Server for filesystem operations with dynamic access control.
Sequential Thinking
A structured MCP server for dynamic problem-solving and reflective thinking.