mcp-ru-regions/server.py

220 lines
7.4 KiB
Python
Executable file
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env -S uv run --quiet
# /// script
# requires-python = ">=3.11"
# dependencies = [
# "mcp[cli]>=1.2.0",
# "httpx>=0.27",
# ]
# ///
"""MCP-сервер: по названию населённого пункта РФ отдаёт регион (через DaData).
Запросы кешируются в SQLite (cache.sqlite рядом со скриптом), повторные
вызовы с тем же названием не идут в API.
"""
import json
import os
import sqlite3
from pathlib import Path
import httpx
from mcp.server.fastmcp import FastMCP
DADATA_URL = "https://suggestions.dadata.ru/suggestions/api/4_1/rs/suggest/address"
DADATA_GEOLOCATE_URL = "https://suggestions.dadata.ru/suggestions/api/4_1/rs/geolocate/address"
API_KEY = os.environ.get("DADATA_API_KEY")
if not API_KEY:
raise RuntimeError("DADATA_API_KEY env var is required")
CACHE_PATH = Path(
os.environ.get("RU_REGIONS_CACHE_PATH")
or Path(__file__).resolve().parent / "cache.sqlite"
)
CACHE_PATH.parent.mkdir(parents=True, exist_ok=True)
TRANSPORT = os.environ.get("MCP_TRANSPORT", "stdio")
HOST = os.environ.get("MCP_HOST", "127.0.0.1")
PORT = int(os.environ.get("MCP_PORT", "8311"))
mcp = FastMCP("ru-regions", host=HOST, port=PORT)
def _db() -> sqlite3.Connection:
conn = sqlite3.connect(CACHE_PATH)
conn.execute(
"""CREATE TABLE IF NOT EXISTS lookups (
key TEXT NOT NULL,
limit_n INTEGER NOT NULL,
response TEXT NOT NULL,
cached_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (key, limit_n)
)"""
)
return conn
def _normalize(city: str) -> str:
return " ".join(city.lower().replace("ё", "е").split())
def _cache_get(key: str, limit: int) -> dict | None:
with _db() as conn:
row = conn.execute(
"SELECT response FROM lookups WHERE key = ? AND limit_n >= ? ORDER BY limit_n ASC LIMIT 1",
(key, limit),
).fetchone()
if not row:
return None
data = json.loads(row[0])
data["matches"] = data["matches"][:limit]
data["count"] = len(data["matches"])
data["from_cache"] = True
return data
def _cache_put(key: str, limit: int, data: dict) -> None:
payload = {"count": data["count"], "matches": data["matches"]}
with _db() as conn:
conn.execute(
"INSERT OR REPLACE INTO lookups (key, limit_n, response) VALUES (?, ?, ?)",
(key, limit, json.dumps(payload, ensure_ascii=False)),
)
def _format_suggestion(s: dict) -> dict:
d = s["data"]
place = d.get("settlement_with_type") or d.get("city_with_type") or s.get("value")
return {
"place": place,
"full_address": s.get("value"),
"region": d.get("region_with_type"),
"region_iso_code": d.get("region_iso_code"),
"federal_district": d.get("federal_district"),
"area": d.get("area_with_type"),
"region_kladr_id": d.get("region_kladr_id"),
"region_fias_id": d.get("region_fias_id"),
"fias_id": d.get("fias_id"),
}
@mcp.tool()
async def get_region(city: str, limit: int = 5, refresh: bool = False) -> dict:
"""Определить регион РФ по названию населённого пункта.
Результаты кешируются локально в SQLite — повторный запрос по тому же
названию не расходует лимит DaData.
Args:
city: Название города, посёлка, села или деревни (например, "Мытищи").
limit: Сколько вариантов вернуть, если есть омонимы (120).
refresh: Принудительно сходить в API, обновив кеш.
Returns:
Словарь:
- matches: список найденных вариантов (region, federal_district, area, place, kladr/fias id).
- count: количество совпадений.
- from_cache: True, если ответ взят из кеша.
"""
limit = max(1, min(int(limit), 20))
key = _normalize(city)
if not refresh:
cached = _cache_get(key, limit)
if cached is not None:
return cached
payload = {
"query": city,
"from_bound": {"value": "city"},
"to_bound": {"value": "settlement"},
"count": limit,
}
headers = {
"Content-Type": "application/json",
"Accept": "application/json",
"Authorization": f"Token {API_KEY}",
}
async with httpx.AsyncClient(timeout=10.0) as client:
r = await client.post(DADATA_URL, json=payload, headers=headers)
r.raise_for_status()
data = r.json()
matches = [_format_suggestion(s) for s in data.get("suggestions", [])]
result = {"count": len(matches), "matches": matches, "from_cache": False}
_cache_put(key, limit, result)
return result
@mcp.tool()
async def get_region_by_coords(
lat: float, lon: float, radius_meters: int = 1000, limit: int = 1
) -> dict:
"""Определить регион РФ по координатам (обратный геокодинг через DaData).
Args:
lat: Широта (например, 55.7558).
lon: Долгота (например, 37.6173).
radius_meters: Радиус поиска ближайшего адреса в метрах (11000).
limit: Сколько ближайших объектов вернуть (120).
Returns:
Словарь:
- matches: список найденных объектов (region, federal_district, place, kladr/fias id).
- count: количество совпадений.
- from_cache: True, если ответ взят из кеша.
"""
radius_meters = max(1, min(int(radius_meters), 1000))
limit = max(1, min(int(limit), 20))
key = f"geo:{round(float(lat), 4)},{round(float(lon), 4)}:r{radius_meters}"
cached = _cache_get(key, limit)
if cached is not None:
return cached
payload = {
"lat": lat,
"lon": lon,
"radius_meters": radius_meters,
"count": limit,
}
headers = {
"Content-Type": "application/json",
"Accept": "application/json",
"Authorization": f"Token {API_KEY}",
}
async with httpx.AsyncClient(timeout=10.0) as client:
r = await client.post(DADATA_GEOLOCATE_URL, json=payload, headers=headers)
r.raise_for_status()
data = r.json()
matches = [_format_suggestion(s) for s in data.get("suggestions", [])]
result = {"count": len(matches), "matches": matches, "from_cache": False}
_cache_put(key, limit, result)
return result
@mcp.tool()
def cache_stats() -> dict:
"""Статистика локального кеша запросов."""
with _db() as conn:
rows = conn.execute(
"SELECT COUNT(*), MIN(cached_at), MAX(cached_at) FROM lookups"
).fetchone()
return {
"entries": rows[0] or 0,
"oldest": rows[1],
"newest": rows[2],
"path": str(CACHE_PATH),
}
@mcp.tool()
def cache_clear() -> dict:
"""Очистить локальный кеш запросов."""
with _db() as conn:
deleted = conn.execute("DELETE FROM lookups").rowcount
return {"deleted": deleted}
if __name__ == "__main__":
mcp.run(transport=TRANSPORT)