Initial commit: Skills API with MCP servers

- FastAPI backend with SQLite (ai.db)
- Tables: skills, snippets, conventions, cache, memory
- MCP servers: homelab, gameservers, skills
- Docker Compose setup
- Seed data with 8 skills, 2 conventions, 2 snippets
- Token savings patterns via context bundles and caching
This commit is contained in:
Lukas Parsons 2026-03-22 21:18:23 -04:00
parent 114b3b1628
commit 7f7699ff94
19 changed files with 1680 additions and 1 deletions

3
.env.example Normal file
View file

@ -0,0 +1,3 @@
DATABASE_URL=sqlite+aiosqlite:///./ai.db
HOST=0.0.0.0
PORT=8080

15
.gitignore vendored Normal file
View file

@ -0,0 +1,15 @@
__pycache__/
*.py[cod]
*$py.class
*.so
.Python
env/
venv/
ENV/
.env
*.db
*.sqlite
data/
.pytest_cache/
.coverage
htmlcov/

93
CLAUDE.md Normal file
View file

@ -0,0 +1,93 @@
# MCP Server Configuration
## Running MCP Servers
### Option 1: Directly with Python
```bash
cd mcp
pip install -r requirements.txt
python homelab.py
```
### Option 2: Via Claude Desktop Config
Add to your Claude Desktop config (`~/Library/Application Support/Claude/claude_desktop_config.json` on macOS or `%APPDATA%\Claude\claude_desktop_config.json` on Windows):
```json
{
"mcpServers": {
"homelab": {
"command": "python",
"args": ["/path/to/ai-skills-api/mcp/homelab.py"],
"env": {
"DOCKER_HOST": "unix:///var/run/docker.sock"
}
},
"gameservers": {
"command": "python",
"args": ["/path/to/ai-skills-api/mcp/gameservers.py"],
"env": {
"GAME_SERVERS_DIR": "/opt/game-servers"
}
},
"skills": {
"command": "python",
"args": ["/path/to/ai-skills-api/mcp/skills.py"],
"env": {
"SKILLS_API_URL": "http://localhost:8080"
}
}
}
}
```
## Available Tools
### homelab
- `container_status` - Get Docker container status
- `list_containers` - List all containers
- `start_container` - Start a container
- `stop_container` - Stop a container
- `restart_container` - Restart a container
- `container_logs` - Get container logs
- `system_resources` - Get CPU/memory/disk usage
- `run_command` - Run shell command (use carefully)
- `docker_compose_action` - Run docker-compose actions
### gameservers
- `list_servers` - List all game servers
- `get_server_config` - Get server config
- `update_server_config` - Update server config
- `server_status` - Get server status
- `start_server` - Start a game server
- `stop_server` - Stop a game server
- `get_server_logs` - Get server logs
- `create_server` - Create new game server
- `delete_server` - Delete a game server
- `get_templates` - Get available templates
### skills
- `get_skill` - Get skill by ID
- `search_skills` - Search skills
- `list_skills` - List skills
- `get_context` - Get context bundle
- `get_conventions` - Get conventions
- `get_snippets` - Get snippets
- `check_cache` - Check response cache
- `get_memory` - Get project memory
- `add_memory` - Add project memory
- `create_skill` - Create new skill
## Token Savings Pattern
When using agents:
1. **Before asking**: Call `skills/check_cache` with your prompt
2. **If cached**: Use the cached response directly
3. **If not cached**: Call `skills/get_context` to inject relevant skills/conventions
4. **After response**: Optionally call `skills/add_memory` to save important decisions
This pattern avoids:
- Re-sending your coding standards every request
- Re-explaining project architecture
- Re-asking questions you've asked before

12
Dockerfile Normal file
View file

@ -0,0 +1,12 @@
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
EXPOSE 8080
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8080"]

View file

@ -1,2 +1,99 @@
# ai-skills-api # AI Skills API
Local infrastructure for AI context management. Store skills, snippets, conventions, and cache responses to reduce token consumption.
## Quick Start
```bash
# Copy env file
cp .env.example .env
# Run with Docker
docker compose up -d
# Or run locally
pip install -r requirements.txt
uvicorn main:app --reload
```
API available at `http://localhost:8080`
Docs at `http://localhost:8080/docs`
## Endpoints
| Endpoint | Description |
|----------|-------------|
| `GET /skills` | List all skills |
| `GET /skills/{id}` | Get skill (increments usage_count) |
| `POST /skills` | Create skill |
| `PUT /skills/{id}` | Update skill |
| `DELETE /skills/{id}` | Delete skill |
| `GET /skills/search?q=query` | Search skills |
| `GET /snippets` | List snippets |
| `GET /snippets/{id}` | Get snippet |
| `POST /snippets` | Create snippet |
| `DELETE /snippets/{id}` | Delete snippet |
| `GET /conventions` | List conventions |
| `GET /conventions?project=/path` | Get conventions for project |
| `POST /conventions` | Create convention |
| `PUT /conventions/{id}` | Update convention |
| `DELETE /conventions/{id}` | Delete convention |
| `POST /cache/lookup` | Check cache for prompt |
| `POST /cache/store` | Store response in cache |
| `GET /cache/stats` | Cache statistics |
| `GET /memory` | List memory entries |
| `GET /memory?project=name` | Get memory for project |
| `POST /memory` | Create memory entry |
| `PUT /memory/{id}` | Update memory |
| `DELETE /memory/{id}` | Delete memory |
| `GET /context?project=/path&skills=id1,id2` | Get full context bundle |
## Example Usage
### Create a skill
```bash
curl -X POST http://localhost:8080/skills \
-H "Content-Type: application/json" \
-d '{
"id": "homelab-docker-compose",
"name": "Docker Compose Standard",
"category": "homelab",
"content": "Always use docker-compose v3.8+. Include health checks, restart policies, and resource limits.",
"tags": ["docker", "compose", "infrastructure"]
}'
```
### Get context bundle
```bash
curl "http://localhost:8080/context?project=/home/server/apps/media-server&skills=homelab-docker-compose,react-v2"
```
### Check cache
```bash
curl -X POST http://localhost:8080/cache/lookup \
-H "Content-Type: application/json" \
-d '{
"prompt": "How do I configure traefik?",
"model": "claude-3-opus"
}'
```
## Integration Pattern
In your agent's system prompt or pre-request hook:
1. Call `GET /context?project={current_project}&skills={skill_ids}`
2. Inject returned content into the prompt
3. Before sending to LLM, check `POST /cache/lookup`
4. After receiving response, optionally `POST /cache/store`
This avoids re-sending your standards every request and caches repeated queries.
## Database
SQLite database `ai.db` with tables:
- `skills` - Reusable patterns and instructions
- `snippets` - Code snippets
- `conventions` - Project-specific conventions
- `cache` - LRU cache of LLM responses
- `memory` - Project memory/notes

22
database.py Normal file
View file

@ -0,0 +1,22 @@
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from sqlalchemy.orm import DeclarativeBase
import os
DATABASE_URL = os.getenv("DATABASE_URL", "sqlite+aiosqlite:///./ai.db")
engine = create_async_engine(DATABASE_URL, echo=False)
async_session_maker = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
class Base(DeclarativeBase):
pass
async def get_db() -> AsyncSession:
async with async_session_maker() as session:
yield session
async def init_db():
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)

15
docker-compose.yml Normal file
View file

@ -0,0 +1,15 @@
services:
api:
build: .
ports:
- "8080:8080"
environment:
- DATABASE_URL=sqlite+aiosqlite:///./ai.db
volumes:
- ./data:/app/data
restart: unless-stopped
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8080/health"]
interval: 30s
timeout: 10s
retries: 3

221
examples/seed-data.py Normal file
View file

@ -0,0 +1,221 @@
#!/usr/bin/env python3
"""Seed the skills database with useful defaults"""
import httpx
BASE_URL = "http://localhost:8080"
SKILLS = [
{
"id": "homelab-docker-compose",
"name": "Docker Compose Standard",
"category": "homelab",
"description": "Standard Docker Compose configuration patterns",
"content": """Always use docker-compose v3.8+. Include:
- health checks for all services
- restart: unless-stopped policy
- resource limits (memory, CPU)
- named volumes for persistent data
- .env file for secrets (never hardcode)
- explicit network definitions
- logging driver with size limits""",
"tags": ["docker", "compose", "infrastructure"]
},
{
"id": "homelab-traefik",
"name": "Traefik Reverse Proxy",
"category": "homelab",
"description": "Traefik configuration standards",
"content": """For Traefik reverse proxy setups:
- Use Docker provider with watched containers
- Enable ACME/Let's Encrypt for HTTPS
- Store certs in persistent volume
- Use middlewares for auth, rate limiting, redirects
- Label-based routing on containers
- Dashboard protected by auth middleware""",
"tags": ["traefik", "proxy", "https", "infrastructure"]
},
{
"id": "typescript-react",
"name": "React TypeScript Component",
"category": "coding",
"description": "Standard React component patterns with TypeScript",
"content": """React component standards:
- Use functional components with TypeScript interfaces
- Props defined as interface, not type alias
- Use React.FC only when children needed
- Prefer composition over inheritance
- Custom hooks for reusable logic
- Strict null checks enabled
- Avoid any, use unknown if needed""",
"tags": ["react", "typescript", "frontend"]
},
{
"id": "python-async",
"name": "Python Async Patterns",
"category": "coding",
"description": "Async/await best practices in Python",
"content": """Python async standards:
- Use async/await consistently, don't mix sync/async
- Use asyncio.gather() for concurrent operations
- Proper exception handling in async contexts
- Use async context managers (async with)
- Avoid blocking calls in async functions
- Use httpx over requests for async HTTP
- Timeout all async operations""",
"tags": ["python", "async", "asyncio"]
},
{
"id": "api-design",
"name": "REST API Design",
"category": "coding",
"description": "RESTful API design patterns",
"content": """API design standards:
- Use nouns for resources, not verbs
- Proper HTTP methods (GET/POST/PUT/DELETE)
- Return appropriate status codes
- Version APIs (/api/v1/)
- Use query params for filtering, sorting
- Pagination with limit/offset or cursor
- Consistent error response format
- Rate limiting headers""",
"tags": ["api", "rest", "backend"]
},
{
"id": "valheim-server",
"name": "Valheim Server Setup",
"category": "gameserver",
"description": "Valheim dedicated server configuration",
"content": """Valheim server standards:
- Run in Docker with persistent volumes
- Backup world files regularly
- Set -public 0 for private servers
- Configure admin list properly
- Monitor RAM usage (2-4GB typical)
- Use server sync for crossplay
- Restart nightly for memory leaks""",
"tags": ["valheim", "gaming", "docker"]
},
{
"id": "minecraft-server",
"name": "Minecraft Server Setup",
"category": "gameserver",
"description": "Minecraft server configuration patterns",
"content": """Minecraft server standards:
- Use PaperMC for performance
- Pre-generate world chunks
- Configure view-distance appropriately (6-10)
- Use Aikar's flags for JVM optimization
- Regular backups with rotation
- Whitelist for private servers
- Monitor TPS and chunk loading""",
"tags": ["minecraft", "gaming", "java"]
},
{
"id": "git-commits",
"name": "Git Commit Standards",
"category": "coding",
"description": "Commit message conventions",
"content": """Commit message format:
- Conventional Commits (feat:, fix:, chore:, etc.)
- Imperative mood ("add feature" not "added feature")
- First line max 50 chars
- Blank line before body
- Body wraps at 72 chars
- Reference issues/PRs when applicable""",
"tags": ["git", "workflow", "documentation"]
}
]
CONVENTIONS = [
{
"id": "home-server-conventions",
"project_path": "/opt/home-server",
"name": "Home Server Standards",
"content": """All home server deployments:
- Docker Compose for all services
- Traefik for reverse proxy
- Health checks on all containers
- Centralized logging
- Automated backups
- Resource limits defined""",
"auto_inject": True
}
]
SNIPPETS = [
{
"id": "docker-healthcheck",
"name": "Docker Health Check Template",
"language": "yaml",
"category": "docker",
"content": """healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8080/health"]
interval: 30s
timeout: 10s
retries: 3
start_period: 40s""",
"tags": ["docker", "health", "template"]
},
{
"id": "traefik-labels",
"name": "Traefik Docker Labels",
"language": "yaml",
"category": "docker",
"content": """labels:
- "traefik.enable=true"
- "traefik.http.routers.myapp.rule=Host(`myapp.example.com`)"
- "traefik.http.routers.myapp.entrypoints=websecure"
- "traefik.http.routers.myapp.tls.certresolver=letsencrypt"
- "traefik.http.services.myapp.loadbalancer.server.port=8080""",
"tags": ["traefik", "labels", "template"]
}
]
def seed():
with httpx.Client(timeout=30.0) as client:
print("Seeding skills...")
for skill in SKILLS:
try:
response = client.post(f"{BASE_URL}/skills", json=skill)
if response.status_code == 200:
print(f"{skill['id']}")
elif response.status_code == 400:
print(f" ~ {skill['id']} (already exists)")
else:
print(f"{skill['id']}: {response.text}")
except Exception as e:
print(f"{skill['id']}: {e}")
print("\nSeeding conventions...")
for convention in CONVENTIONS:
try:
response = client.post(f"{BASE_URL}/conventions", json=convention)
if response.status_code == 200:
print(f"{convention['id']}")
elif response.status_code == 400:
print(f" ~ {convention['id']} (already exists)")
else:
print(f"{convention['id']}: {response.text}")
except Exception as e:
print(f"{convention['id']}: {e}")
print("\nSeeding snippets...")
for snippet in SNIPPETS:
try:
response = client.post(f"{BASE_URL}/snippets", json=snippet)
if response.status_code == 200:
print(f"{snippet['id']}")
elif response.status_code == 400:
print(f" ~ {snippet['id']} (already exists)")
else:
print(f"{snippet['id']}: {response.text}")
except Exception as e:
print(f"{snippet['id']}: {e}")
print("\nDone! Check http://localhost:8080/docs")
if __name__ == "__main__":
seed()

394
main.py Normal file
View file

@ -0,0 +1,394 @@
from fastapi import FastAPI, HTTPException, Depends, Query
from fastapi.middleware.cors import CORSMiddleware
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, func
from sqlalchemy.exc import IntegrityError
import hashlib
import json
import os
from database import get_db, init_db
from models import Skill, Snippet, Convention, Cache, Memory
from schemas import (
SkillBase, Skill,
SnippetBase, Snippet,
ConventionBase, Convention,
CacheStore, Cache as CacheSchema,
MemoryBase, Memory as MemorySchema,
ContextBundle, CacheLookup
)
app = FastAPI(title="AI Skills API", description="Local infrastructure for AI context management")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.on_event("startup")
async def startup():
await init_db()
# ============== SKILLS ==============
@app.get("/skills", response_model=list[Skill])
async def list_skills(
category: Optional[str] = None,
db: AsyncSession = Depends(get_db)
):
query = select(Skill)
if category:
query = query.where(Skill.category == category)
result = await db.execute(query.order_by(Skill.name))
return result.scalars().all()
@app.get("/skills/search")
async def search_skills(
q: str,
category: Optional[str] = None,
db: AsyncSession = Depends(get_db)
):
query = select(Skill).where(
(Skill.name.ilike(f"%{q}%")) |
(Skill.content.ilike(f"%{q}%")) |
(Skill.tags.ilike(f"%{q}%"))
)
if category:
query = query.where(Skill.category == category)
result = await db.execute(query)
return result.scalars().all()
@app.get("/skills/{skill_id}", response_model=Skill)
async def get_skill(skill_id: str, db: AsyncSession = Depends(get_db)):
result = await db.execute(select(Skill).where(Skill.id == skill_id))
skill = result.scalar_one_or_none()
if not skill:
raise HTTPException(status_code=404, detail="Skill not found")
skill.usage_count += 1
await db.commit()
return skill
@app.post("/skills", response_model=Skill)
async def create_skill(skill: SkillBase, db: AsyncSession = Depends(get_db)):
db_skill = Skill(**skill.model_dump())
db.add(db_skill)
try:
await db.commit()
await db.refresh(db_skill)
except IntegrityError:
await db.rollback()
raise HTTPException(status_code=400, detail="Skill with this ID already exists")
return db_skill
@app.put("/skills/{skill_id}", response_model=Skill)
async def update_skill(skill_id: str, skill: SkillBase, db: AsyncSession = Depends(get_db)):
result = await db.execute(select(Skill).where(Skill.id == skill_id))
db_skill = result.scalar_one_or_none()
if not db_skill:
raise HTTPException(status_code=404, detail="Skill not found")
for key, value in skill.model_dump().items():
setattr(db_skill, key, value)
await db.commit()
await db.refresh(db_skill)
return db_skill
@app.delete("/skills/{skill_id}")
async def delete_skill(skill_id: str, db: AsyncSession = Depends(get_db)):
result = await db.execute(select(Skill).where(Skill.id == skill_id))
skill = result.scalar_one_or_none()
if not skill:
raise HTTPException(status_code=404, detail="Skill not found")
await db.delete(skill)
await db.commit()
return {"deleted": skill_id}
# ============== SNIPPETS ==============
@app.get("/snippets", response_model=list[Snippet])
async def list_snippets(
category: Optional[str] = None,
language: Optional[str] = None,
db: AsyncSession = Depends(get_db)
):
query = select(Snippet)
if category:
query = query.where(Snippet.category == category)
if language:
query = query.where(Snippet.language == language)
result = await db.execute(query.order_by(Snippet.name))
return result.scalars().all()
@app.get("/snippets/{snippet_id}", response_model=Snippet)
async def get_snippet(snippet_id: str, db: AsyncSession = Depends(get_db)):
result = await db.execute(select(Snippet).where(Snippet.id == snippet_id))
snippet = result.scalar_one_or_none()
if not snippet:
raise HTTPException(status_code=404, detail="Snippet not found")
return snippet
@app.post("/snippets", response_model=Snippet)
async def create_snippet(snippet: SnippetBase, db: AsyncSession = Depends(get_db)):
db_snippet = Snippet(**snippet.model_dump())
db.add(db_snippet)
try:
await db.commit()
await db.refresh(db_snippet)
except IntegrityError:
await db.rollback()
raise HTTPException(status_code=400, detail="Snippet with this ID already exists")
return db_snippet
@app.delete("/snippets/{snippet_id}")
async def delete_snippet(snippet_id: str, db: AsyncSession = Depends(get_db)):
result = await db.execute(select(Snippet).where(Snippet.id == snippet_id))
snippet = result.scalar_one_or_none()
if not snippet:
raise HTTPException(status_code=404, detail="Snippet not found")
await db.delete(snippet)
await db.commit()
return {"deleted": snippet_id}
# ============== CONVENTIONS ==============
@app.get("/conventions", response_model=list[Convention])
async def list_conventions(
project: Optional[str] = None,
db: AsyncSession = Depends(get_db)
):
query = select(Convention)
if project:
query = query.where(Convention.project_path == project)
result = await db.execute(query.order_by(Convention.name))
return result.scalars().all()
@app.get("/conventions/{convention_id}", response_model=Convention)
async def get_convention(convention_id: str, db: AsyncSession = Depends(get_db)):
result = await db.execute(select(Convention).where(Convention.id == convention_id))
convention = result.scalar_one_or_none()
if not convention:
raise HTTPException(status_code=404, detail="Convention not found")
return convention
@app.post("/conventions", response_model=Convention)
async def create_convention(convention: ConventionBase, db: AsyncSession = Depends(get_db)):
db_convention = Convention(**convention.model_dump())
db.add(db_convention)
try:
await db.commit()
await db.refresh(db_convention)
except IntegrityError:
await db.rollback()
raise HTTPException(status_code=400, detail="Convention with this ID already exists")
return db_convention
@app.put("/conventions/{convention_id}", response_model=Convention)
async def update_convention(convention_id: str, convention: ConventionBase, db: AsyncSession = Depends(get_db)):
result = await db.execute(select(Convention).where(Convention.id == convention_id))
db_convention = result.scalar_one_or_none()
if not db_convention:
raise HTTPException(status_code=404, detail="Convention not found")
for key, value in convention.model_dump().items():
setattr(db_convention, key, value)
await db.commit()
await db.refresh(db_convention)
return db_convention
@app.delete("/conventions/{convention_id}")
async def delete_convention(convention_id: str, db: AsyncSession = Depends(get_db)):
result = await db.execute(select(Convention).where(Convention.id == convention_id))
convention = result.scalar_one_or_none()
if not convention:
raise HTTPException(status_code=404, detail="Convention not found")
await db.delete(convention)
await db.commit()
return {"deleted": convention_id}
# ============== CACHE ==============
@app.post("/cache/lookup", response_model=Optional[CacheSchema])
async def lookup_cache(lookup: CacheLookup, db: AsyncSession = Depends(get_db)):
prompt_hash = hashlib.sha256(
json.dumps({"prompt": lookup.prompt, "model": lookup.model}, sort_keys=True).encode()
).hexdigest()
result = await db.execute(
select(Cache).where(
(Cache.hash == prompt_hash) &
((Cache.expires_at == None) | (Cache.expires_at > func.now()))
)
)
return result.scalar_one_or_none()
@app.post("/cache/store", response_model=CacheSchema)
async def store_cache(cache: CacheStore, db: AsyncSession = Depends(get_db)):
prompt_hash = hashlib.sha256(
json.dumps({"prompt": cache.response, "model": cache.model}, sort_keys=True).encode()
).hexdigest()
db_cache = Cache(
hash=prompt_hash,
response=cache.response,
model=cache.model,
tokens_in=cache.tokens_in,
tokens_out=cache.tokens_out,
expires_at=cache.expires_at
)
db.add(db_cache)
await db.commit()
await db.refresh(db_cache)
return db_cache
@app.delete("/cache/{cache_hash}")
async def delete_cache(cache_hash: str, db: AsyncSession = Depends(get_db)):
result = await db.execute(select(Cache).where(Cache.hash == cache_hash))
cache = result.scalar_one_or_none()
if not cache:
raise HTTPException(status_code=404, detail="Cache entry not found")
await db.delete(cache)
await db.commit()
return {"deleted": cache_hash}
@app.get("/cache/stats")
async def cache_stats(db: AsyncSession = Depends(get_db)):
result = await db.execute(select(Cache))
entries = result.scalars().all()
return {
"total_entries": len(entries),
"total_tokens_saved": sum((c.tokens_in or 0) + (c.tokens_out or 0) for c in entries)
}
# ============== MEMORY ==============
@app.get("/memory", response_model=list[MemorySchema])
async def list_memory(
project: Optional[str] = None,
db: AsyncSession = Depends(get_db)
):
query = select(Memory)
if project:
query = query.where(Memory.project == project)
result = await db.execute(query.order_by(Memory.key))
return result.scalars().all()
@app.get("/memory/{memory_id}", response_model=MemorySchema)
async def get_memory(memory_id: str, db: AsyncSession = Depends(get_db)):
result = await db.execute(select(Memory).where(Memory.id == memory_id))
memory = result.scalar_one_or_none()
if not memory:
raise HTTPException(status_code=404, detail="Memory not found")
return memory
@app.post("/memory", response_model=MemorySchema)
async def create_memory(memory: MemoryBase, db: AsyncSession = Depends(get_db)):
db_memory = Memory(**memory.model_dump())
db.add(db_memory)
try:
await db.commit()
await db.refresh(db_memory)
except IntegrityError:
await db.rollback()
raise HTTPException(status_code=400, detail="Memory with this ID already exists")
return db_memory
@app.put("/memory/{memory_id}", response_model=MemorySchema)
async def update_memory(memory_id: str, memory: MemoryBase, db: AsyncSession = Depends(get_db)):
result = await db.execute(select(Memory).where(Memory.id == memory_id))
db_memory = result.scalar_one_or_none()
if not db_memory:
raise HTTPException(status_code=404, detail="Memory not found")
for key, value in memory.model_dump().items():
setattr(db_memory, key, value)
await db.commit()
await db.refresh(db_memory)
return db_memory
@app.delete("/memory/{memory_id}")
async def delete_memory(memory_id: str, db: AsyncSession = Depends(get_db)):
result = await db.execute(select(Memory).where(Memory.id == memory_id))
memory = result.scalar_one_or_none()
if not memory:
raise HTTPException(status_code=404, detail="Memory not found")
await db.delete(memory)
await db.commit()
return {"deleted": memory_id}
# ============== CONTEXT BUNDLE ==============
@app.get("/context", response_model=ContextBundle)
async def get_context(
project: Optional[str] = None,
skills: Optional[str] = Query(None, description="Comma-separated skill IDs to include"),
db: AsyncSession = Depends(get_db)
):
skill_list = []
snippet_list = []
convention_list = []
memory_list = []
if skills:
skill_ids = [s.strip() for s in skills.split(",")]
result = await db.execute(select(Skill).where(Skill.id.in_(skill_ids)))
skill_list = result.scalars().all()
if project:
result = await db.execute(select(Convention).where(Convention.project_path == project))
convention_list = result.scalars().all()
result = await db.execute(select(Memory).where(Memory.project == project))
memory_list = result.scalars().all()
result = await db.execute(select(Snippet).where(Snippet.category == project.split("/")[-1]))
snippet_list = result.scalars().all()
return ContextBundle(
skills=skill_list,
snippets=snippet_list,
conventions=convention_list,
memories=memory_list
)
@app.get("/health")
async def health():
return {"status": "healthy"}

2
mcp/.env.example Normal file
View file

@ -0,0 +1,2 @@
SKILLS_API_URL=http://localhost:8080
GAME_SERVERS_DIR=/opt/game-servers

10
mcp/Dockerfile Normal file
View file

@ -0,0 +1,10 @@
FROM python:3.11-slim
WORKDIR /app
COPY mcp/requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY mcp/ .
CMD ["python", "homelab.py"]

36
mcp/docker-compose.yml Normal file
View file

@ -0,0 +1,36 @@
services:
homelab:
build:
context: ..
dockerfile: mcp/Dockerfile
command: python homelab.py
volumes:
- /var/run/docker.sock:/var/run/docker.sock
environment:
- DOCKER_HOST=unix:///var/run/docker.sock
network_mode: host
restart: unless-stopped
gameservers:
build:
context: ..
dockerfile: mcp/Dockerfile
command: python gameservers.py
volumes:
- /opt/game-servers:/opt/game-servers
environment:
- GAME_SERVERS_DIR=/opt/game-servers
network_mode: host
restart: unless-stopped
skills:
build:
context: ..
dockerfile: mcp/Dockerfile
command: python skills.py
environment:
- SKILLS_API_URL=http://host.docker.internal:8080
extra_hosts:
- "host.docker.internal:host-gateway"
network_mode: host
restart: unless-stopped

236
mcp/gameservers.py Normal file
View file

@ -0,0 +1,236 @@
from mcp.server.fastmcp import FastMCP
import os
import json
from pathlib import Path
from typing import Optional
import subprocess
mcp = FastMCP("gameservers")
GAME_SERVERS_DIR = Path(os.getenv("GAME_SERVERS_DIR", "/opt/game-servers"))
@mcp.tool()
def list_servers() -> list[dict]:
"""List all game servers"""
if not GAME_SERVERS_DIR.exists():
return []
servers = []
for server_dir in GAME_SERVERS_DIR.iterdir():
if server_dir.is_dir():
config_file = server_dir / "config.json"
servers.append({
"name": server_dir.name,
"path": str(server_dir),
"has_config": config_file.exists(),
"config": json.loads(config_file.read_text()) if config_file.exists() else None
})
return servers
@mcp.tool()
def get_server_config(server_name: str) -> dict:
"""Get config for a specific game server"""
config_file = GAME_SERVERS_DIR / server_name / "config.json"
if not config_file.exists():
return {"error": f"No config found for {server_name}"}
return json.loads(config_file.read_text())
@mcp.tool()
def update_server_config(server_name: str, config: dict) -> dict:
"""Update game server config"""
server_dir = GAME_SERVERS_DIR / server_name
if not server_dir.exists():
return {"error": f"Server {server_name} not found"}
config_file = server_dir / "config.json"
config_file.write_text(json.dumps(config, indent=2))
return {"success": True, "message": f"Updated config for {server_name}"}
@mcp.tool()
def server_status(server_name: str) -> dict:
"""Get status of a game server"""
server_dir = GAME_SERVERS_DIR / server_name
if not server_dir.exists():
return {"error": f"Server {server_name} not found"}
status_file = server_dir / "status.json"
if not status_file.exists():
return {"status": "unknown", "message": "No status file found"}
return json.loads(status_file.read_text())
@mcp.tool()
def start_server(server_name: str) -> dict:
"""Start a game server"""
server_dir = GAME_SERVERS_DIR / server_name
if not server_dir.exists():
return {"error": f"Server {server_name} not found"}
start_script = server_dir / "start.sh"
if not start_script.exists():
return {"error": f"No start script found for {server_name}"}
try:
subprocess.run(
["bash", str(start_script)],
cwd=server_dir,
capture_output=True,
text=True,
timeout=30
)
return {"success": True, "message": f"Started {server_name}"}
except subprocess.TimeoutExpired:
return {"error": "Start command timed out"}
except Exception as e:
return {"error": str(e)}
@mcp.tool()
def stop_server(server_name: str) -> dict:
"""Stop a game server"""
server_dir = GAME_SERVERS_DIR / server_name
if not server_dir.exists():
return {"error": f"Server {server_name} not found"}
stop_script = server_dir / "stop.sh"
if not stop_script.exists():
return {"error": f"No stop script found for {server_name}"}
try:
subprocess.run(
["bash", str(stop_script)],
cwd=server_dir,
capture_output=True,
text=True,
timeout=30
)
return {"success": True, "message": f"Stopped {server_name}"}
except subprocess.TimeoutExpired:
return {"error": "Stop command timed out"}
except Exception as e:
return {"error": str(e)}
@mcp.tool()
def get_server_logs(server_name: str, lines: int = 100) -> str:
"""Get logs from a game server"""
server_dir = GAME_SERVERS_DIR / server_name
if not server_dir.exists():
return f"Server {server_name} not found"
log_file = server_dir / "logs" / "latest.log"
if not log_file.exists():
return f"No log file found for {server_name}"
try:
result = subprocess.run(
["tail", "-n", str(lines), str(log_file)],
capture_output=True,
text=True
)
return result.stdout
except Exception as e:
return f"Error reading logs: {e}"
@mcp.tool()
def create_server(
name: str,
game_type: str,
port: int,
max_players: int = 10,
config: Optional[dict] = None
) -> dict:
"""Create a new game server directory structure"""
server_dir = GAME_SERVERS_DIR / name
if server_dir.exists():
return {"error": f"Server {name} already exists"}
server_dir.mkdir(parents=True, exist_ok=True)
(server_dir / "logs").mkdir(exist_ok=True)
default_config = {
"name": name,
"game_type": game_type,
"port": port,
"max_players": max_players,
"auto_restart": True,
"restart_cron": "0 4 * * *"
}
if config:
default_config.update(config)
config_file = server_dir / "config.json"
config_file.write_text(json.dumps(default_config, indent=2))
start_script = server_dir / "start.sh"
start_script.write_text(f"#!/bin/bash\n# TODO: Add start command for {game_type}\necho 'Starting {name}'\n")
start_script.chmod(0o755)
stop_script = server_dir / "stop.sh"
stop_script.write_text(f"#!/bin/bash\n# TODO: Add stop command for {game_type}\necho 'Stopping {name}'\n")
stop_script.chmod(0o755)
return {
"success": True,
"message": f"Created server {name}",
"path": str(server_dir)
}
@mcp.tool()
def delete_server(server_name: str, keep_logs: bool = False) -> dict:
"""Delete a game server"""
server_dir = GAME_SERVERS_DIR / server_name
if not server_dir.exists():
return {"error": f"Server {server_name} not found"}
try:
if keep_logs:
import shutil
logs_dir = server_dir / "logs"
if logs_dir.exists():
shutil.rmtree(logs_dir)
else:
import shutil
shutil.rmtree(server_dir)
return {"success": True, "message": f"Deleted {server_name}"}
except Exception as e:
return {"error": str(e)}
@mcp.tool()
def get_templates() -> list[dict]:
"""Get available game server templates"""
templates = {
"valheim": {
"game_type": "valheim",
"default_port": 2456,
"description": "Valheim dedicated server"
},
"minecraft": {
"game_type": "minecraft",
"default_port": 25565,
"description": "Minecraft Java Edition server"
},
"terraria": {
"game_type": "terraria",
"default_port": 7777,
"description": "Terraria server"
},
"factorio": {
"game_type": "factorio",
"default_port": 34197,
"description": "Factorio dedicated server"
}
}
return [{"name": k, **v} for k, v in templates.items()]
if __name__ == "__main__":
mcp.run()

169
mcp/homelab.py Normal file
View file

@ -0,0 +1,169 @@
from mcp.server.fastmcp import FastMCP
import docker
import psutil
import subprocess
import os
from typing import Optional
mcp = FastMCP("homelab")
DOCKER_CLIENT = docker.from_env()
@mcp.tool()
def container_status(container_name: str) -> dict:
"""Get status of a Docker container"""
try:
container = DOCKER_CLIENT.containers.get(container_name)
return {
"status": container.status,
"running": container.status == "running",
"image": container.image.tags[0] if container.image.tags else container.image.id,
"ports": container.ports,
"health": container.attrs.get("State", {}).get("Health", {}).get("Status", "none")
}
except docker.errors.NotFound:
return {"error": f"Container {container_name} not found"}
@mcp.tool()
def list_containers(all: bool = False) -> list[dict]:
"""List Docker containers"""
containers = DOCKER_CLIENT.containers.list(all=all)
return [
{
"name": c.name,
"status": c.status,
"image": c.image.tags[0] if c.image.tags else c.image.id[:12],
"ports": c.ports
}
for c in containers
]
@mcp.tool()
def start_container(container_name: str) -> dict:
"""Start a Docker container"""
try:
container = DOCKER_CLIENT.containers.get(container_name)
container.start()
return {"success": True, "message": f"Started {container_name}"}
except docker.errors.NotFound:
return {"error": f"Container {container_name} not found"}
except Exception as e:
return {"error": str(e)}
@mcp.tool()
def stop_container(container_name: str, timeout: int = 10) -> dict:
"""Stop a Docker container"""
try:
container = DOCKER_CLIENT.containers.get(container_name)
container.stop(timeout=timeout)
return {"success": True, "message": f"Stopped {container_name}"}
except docker.errors.NotFound:
return {"error": f"Container {container_name} not found"}
except Exception as e:
return {"error": str(e)}
@mcp.tool()
def restart_container(container_name: str) -> dict:
"""Restart a Docker container"""
try:
container = DOCKER_CLIENT.containers.get(container_name)
container.restart()
return {"success": True, "message": f"Restarted {container_name}"}
except docker.errors.NotFound:
return {"error": f"Container {container_name} not found"}
except Exception as e:
return {"error": str(e)}
@mcp.tool()
def container_logs(container_name: str, lines: int = 100) -> str:
"""Get logs from a Docker container"""
try:
container = DOCKER_CLIENT.containers.get(container_name)
return container.logs(tail=lines).decode("utf-8")
except docker.errors.NotFound:
return f"Container {container_name} not found"
@mcp.tool()
def system_resources() -> dict:
"""Get system resource usage"""
return {
"cpu_percent": psutil.cpu_percent(interval=1),
"memory": {
"total": psutil.virtual_memory().total // (1024 * 1024),
"available": psutil.virtual_memory().available // (1024 * 1024),
"percent": psutil.virtual_memory().percent
},
"disk": {
"total": psutil.disk_usage("/").total // (1024 * 1024 * 1024),
"used": psutil.disk_usage("/").used // (1024 * 1024 * 1024),
"percent": psutil.disk_usage("/").percent
}
}
@mcp.tool()
def run_command(command: str, cwd: Optional[str] = None) -> dict:
"""Run a shell command (use carefully)"""
try:
result = subprocess.run(
command,
shell=True,
cwd=cwd,
capture_output=True,
text=True,
timeout=30
)
return {
"success": result.returncode == 0,
"stdout": result.stdout,
"stderr": result.stderr,
"returncode": result.returncode
}
except subprocess.TimeoutExpired:
return {"error": "Command timed out after 30s"}
except Exception as e:
return {"error": str(e)}
@mcp.tool()
def docker_compose_action(
compose_file: str,
action: str,
service: Optional[str] = None
) -> dict:
"""Run docker-compose action (up, down, restart, pull)"""
if action not in ["up", "down", "restart", "pull"]:
return {"error": f"Invalid action: {action}"}
cmd = f"docker-compose -f {compose_file} {action}"
if service:
cmd += f" {service}"
try:
result = subprocess.run(
cmd,
shell=True,
capture_output=True,
text=True,
timeout=120
)
return {
"success": result.returncode == 0,
"stdout": result.stdout,
"stderr": result.stderr
}
except subprocess.TimeoutExpired:
return {"error": "Command timed out after 120s"}
except Exception as e:
return {"error": str(e)}
if __name__ == "__main__":
mcp.run()

5
mcp/requirements.txt Normal file
View file

@ -0,0 +1,5 @@
mcp==1.0.0
httpx==0.26.0
python-dotenv==1.0.0
docker==7.0.0
psutil==5.9.7

180
mcp/skills.py Normal file
View file

@ -0,0 +1,180 @@
from mcp.server.fastmcp import FastMCP
import httpx
import os
mcp = FastMCP("skills")
SKILLS_API_URL = os.getenv("SKILLS_API_URL", "http://localhost:8080")
@mcp.tool()
def get_skill(skill_id: str) -> dict:
"""Get a skill by ID from the skills database"""
try:
with httpx.Client() as client:
response = client.get(f"{SKILLS_API_URL}/skills/{skill_id}")
response.raise_for_status()
return response.json()
except httpx.HTTPError as e:
return {"error": f"Failed to fetch skill: {e}"}
@mcp.tool()
def search_skills(query: str, category: str | None = None) -> list[dict]:
"""Search skills by query"""
try:
with httpx.Client() as client:
params = {"q": query}
if category:
params["category"] = category
response = client.get(f"{SKILLS_API_URL}/skills/search", params=params)
response.raise_for_status()
return response.json()
except httpx.HTTPError as e:
return [{"error": f"Failed to search skills: {e}"}]
@mcp.tool()
def list_skills(category: str | None = None) -> list[dict]:
"""List all skills, optionally filtered by category"""
try:
with httpx.Client() as client:
params = {}
if category:
params["category"] = category
response = client.get(f"{SKILLS_API_URL}/skills", params=params)
response.raise_for_status()
return response.json()
except httpx.HTTPError as e:
return [{"error": f"Failed to list skills: {e}"}]
@mcp.tool()
def get_context(project: str | None = None, skills: list[str] | None = None) -> dict:
"""Get context bundle for a project"""
try:
with httpx.Client() as client:
params = {}
if project:
params["project"] = project
if skills:
params["skills"] = ",".join(skills)
response = client.get(f"{SKILLS_API_URL}/context", params=params)
response.raise_for_status()
return response.json()
except httpx.HTTPError as e:
return {"error": f"Failed to fetch context: {e}"}
@mcp.tool()
def get_conventions(project: str | None = None) -> list[dict]:
"""Get conventions for a project"""
try:
with httpx.Client() as client:
params = {}
if project:
params["project"] = project
response = client.get(f"{SKILLS_API_URL}/conventions", params=params)
response.raise_for_status()
return response.json()
except httpx.HTTPError as e:
return [{"error": f"Failed to fetch conventions: {e}"}]
@mcp.tool()
def get_snippets(category: str | None = None, language: str | None = None) -> list[dict]:
"""Get code snippets"""
try:
with httpx.Client() as client:
params = {}
if category:
params["category"] = category
if language:
params["language"] = language
response = client.get(f"{SKILLS_API_URL}/snippets", params=params)
response.raise_for_status()
return response.json()
except httpx.HTTPError as e:
return [{"error": f"Failed to fetch snippets: {e}"}]
@mcp.tool()
def check_cache(prompt: str, model: str | None = None) -> dict | None:
"""Check if a response is cached for this prompt"""
try:
with httpx.Client() as client:
response = client.post(
f"{SKILLS_API_URL}/cache/lookup",
json={"prompt": prompt, "model": model}
)
response.raise_for_status()
return response.json()
except httpx.HTTPError as e:
return {"error": f"Failed to check cache: {e}"}
@mcp.tool()
def get_memory(project: str) -> list[dict]:
"""Get memory entries for a project"""
try:
with httpx.Client() as client:
params = {"project": project}
response = client.get(f"{SKILLS_API_URL}/memory", params=params)
response.raise_for_status()
return response.json()
except httpx.HTTPError as e:
return [{"error": f"Failed to fetch memory: {e}"}]
@mcp.tool()
def add_memory(project: str, key: str, content: str) -> dict:
"""Add a memory entry for a project"""
import uuid
try:
with httpx.Client() as client:
response = client.post(
f"{SKILLS_API_URL}/memory",
json={
"id": str(uuid.uuid4())[:8],
"project": project,
"key": key,
"content": content
}
)
response.raise_for_status()
return response.json()
except httpx.HTTPError as e:
return {"error": f"Failed to add memory: {e}"}
@mcp.tool()
def create_skill(
id: str,
name: str,
content: str,
category: str | None = None,
description: str | None = None,
tags: list[str] | None = None
) -> dict:
"""Create a new skill"""
try:
with httpx.Client() as client:
response = client.post(
f"{SKILLS_API_URL}/skills",
json={
"id": id,
"name": name,
"content": content,
"category": category,
"description": description,
"tags": tags
}
)
response.raise_for_status()
return response.json()
except httpx.HTTPError as e:
return {"error": f"Failed to create skill: {e}"}
if __name__ == "__main__":
mcp.run()

64
models.py Normal file
View file

@ -0,0 +1,64 @@
from sqlalchemy import Column, String, Text, DateTime, Boolean, Integer, ForeignKey
from sqlalchemy.orm import relationship
from sqlalchemy.sql import func
from database import Base
class Skill(Base):
__tablename__ = "skills"
id = Column(String, primary_key=True)
name = Column(String, nullable=False)
description = Column(Text)
category = Column(String)
content = Column(Text, nullable=False)
tags = Column(String)
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), server_default=func.now(), onupdate=func.now())
usage_count = Column(Integer, default=0)
class Snippet(Base):
__tablename__ = "snippets"
id = Column(String, primary_key=True)
name = Column(String, nullable=False)
language = Column(String)
content = Column(Text, nullable=False)
category = Column(String)
tags = Column(String)
created_at = Column(DateTime(timezone=True), server_default=func.now())
class Convention(Base):
__tablename__ = "conventions"
id = Column(String, primary_key=True)
project_path = Column(String)
name = Column(String, nullable=False)
content = Column(Text, nullable=False)
auto_inject = Column(Boolean, default=False)
created_at = Column(DateTime(timezone=True), server_default=func.now())
class Cache(Base):
__tablename__ = "cache"
hash = Column(String, primary_key=True)
response = Column(Text, nullable=False)
model = Column(String)
tokens_in = Column(Integer)
tokens_out = Column(Integer)
created_at = Column(DateTime(timezone=True), server_default=func.now())
expires_at = Column(DateTime(timezone=True))
class Memory(Base):
__tablename__ = "memory"
id = Column(String, primary_key=True)
project = Column(String, nullable=False)
key = Column(String, nullable=False)
content = Column(Text, nullable=False)
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), server_default=func.now(), onupdate=func.now())

6
requirements.txt Normal file
View file

@ -0,0 +1,6 @@
fastapi==0.109.0
uvicorn[standard]==0.27.0
sqlalchemy==2.0.25
pydantic==2.5.3
python-dotenv==1.0.0
aiosqlite==0.19.0

99
schemas.py Normal file
View file

@ -0,0 +1,99 @@
from pydantic import BaseModel
from datetime import datetime
from typing import Optional, List
class SkillBase(BaseModel):
id: str
name: str
description: Optional[str] = None
category: Optional[str] = None
content: str
tags: Optional[List[str]] = None
class Skill(SkillBase):
created_at: datetime
updated_at: datetime
usage_count: int
class Config:
from_attributes = True
class SnippetBase(BaseModel):
id: str
name: str
language: Optional[str] = None
content: str
category: Optional[str] = None
tags: Optional[List[str]] = None
class Snippet(SnippetBase):
created_at: datetime
class Config:
from_attributes = True
class ConventionBase(BaseModel):
id: str
project_path: Optional[str] = None
name: str
content: str
auto_inject: bool = False
class Convention(ConventionBase):
created_at: datetime
class Config:
from_attributes = True
class CacheBase(BaseModel):
hash: str
response: str
model: Optional[str] = None
tokens_in: Optional[int] = None
tokens_out: Optional[int] = None
expires_at: Optional[datetime] = None
class CacheStore(CacheBase):
pass
class Cache(CacheBase):
created_at: datetime
class Config:
from_attributes = True
class MemoryBase(BaseModel):
id: str
project: str
key: str
content: str
class Memory(MemoryBase):
created_at: datetime
updated_at: datetime
class Config:
from_attributes = True
class ContextBundle(BaseModel):
skills: List[Skill]
snippets: List[Snippet]
conventions: List[Convention]
memories: List[Memory]
class CacheLookup(BaseModel):
prompt: str
model: Optional[str] = None