Skip to content

Commit 2881fe6

Browse files
committed
feat: add system backends, generic and flux
Signed-off-by: vsoch <vsoch@users.noreply.github.com>
1 parent 9ae8f4d commit 2881fe6

13 files changed

Lines changed: 284 additions & 71 deletions

File tree

README.md

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -262,7 +262,20 @@ mcpserver start --hub --hub-secret potato
262262
In another terminal, start a worker using the token that is generated. Add some functions for fun.
263263

264264
```bash
265-
mcpserver start --config examples/jobspec/mcpserver.yaml --join http://0.0.0.0:8000 --join-secret portato --port 7777
265+
mcpserver start --config examples/jobspec/mcpserver.yaml --join http://0.0.0.0:8000 --join-secret potato --port 7777
266+
```
267+
268+
Note that you can also set the secret in the environemnt.
269+
270+
```bash
271+
export MCPSERVER_JOIN_SECRET=potato
272+
mcpserver start --config examples/jobspec/mcpserver.yaml --join http://0.0.0.0:8000 --port 7777
273+
```
274+
275+
Register the worker sytem type instead as flux:
276+
277+
```bash
278+
mcpserver start --config examples/jobspec/mcpserver.yaml --join http://0.0.0.0:8000 --port 7777 --system-type flux
266279
```
267280

268281
Test doing queries for status:
@@ -293,10 +306,9 @@ Here are a few design choices (subject to change, of course). I am starting with
293306

294307
## TODO
295308

296-
- [ ] join secret should be allowed from environment
297-
- [ ] debug why port not taking
298-
- [ ] design --system-name and --system
299-
- [ ] Need to handle worker disconnect and reconnect.
309+
- [ ] need to expose tools from system (child worker) instances
310+
- [ ] need to decide on dispatch strategy / algorithm
311+
- [ ] add in fluxion queue stats via RPC call to flux status
300312

301313
## License
302314

examples/mcp-query.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111

1212
console = Console()
1313

14-
# --- Defaults ---
1514
DEFAULT_URL = "http://localhost:8000/mcp"
1615
DEFAULT_TOOL = "get_fleet_status"
1716

@@ -76,7 +75,7 @@ async def query_mcp(url, tool_name):
7675
except:
7776
data = text_content
7877

79-
# --- PRETTY RENDERING ---
78+
# MAKE IT PRETTY.
8079
if tool_name == "get_fleet_status" and isinstance(data, dict):
8180
console.print("\n")
8281
console.print(Panel(render_fleet_tree(data), border_style="cyan", expand=False))

mcpserver/cli/args.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,13 +64,22 @@ def populate_start_args(start):
6464
default=os.environ.get("MCP_HUB_SECRET"),
6565
help="Secret key required for workers to register. (Auto-generated if omitted)",
6666
)
67+
start.add_argument(
68+
"--system-type",
69+
default="generic",
70+
help="System type/template (e.g., 'generic', 'flux', 'kubernetes') or a full python module path.",
71+
)
6772

6873
# Worker Registration Group
6974
worker_group = start.add_argument_group("🐝 Worker Registration")
7075
worker_group.add_argument(
7176
"--join", help="URL of the MCP Hub to join (e.g., http://hub-host:8089)"
7277
)
73-
worker_group.add_argument("--join-secret", help="The registration secret provided by the Hub.")
78+
worker_group.add_argument(
79+
"--join-secret",
80+
help="The registration secret provided by the Hub.",
81+
default=os.environ.get("MCPSERVER_JOIN_SECRET"),
82+
)
7483
worker_group.add_argument(
7584
"--register-id",
7685
help="Unique ID for this worker. Defaults to the hostname.",

mcpserver/cli/manager.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
manager.register()
77

88

9-
def get_manager(mcp, cfg):
9+
def get_manager(mcp, cfg, system_type=None):
1010
"""
1111
Get the common tool manager and register tools.
1212
"""
@@ -20,7 +20,13 @@ def get_manager(mcp, cfg):
2020
print(f" ✅ Registered: {endpoint.name}")
2121

2222
# Load into the manager (tools, resources, prompts)
23-
for tool in manager.load_tools(mcp, cfg.include, cfg.exclude):
23+
# We pass the system_name and system path here
24+
for tool in manager.load_tools(
25+
mcp,
26+
cfg.include,
27+
cfg.exclude,
28+
system_type=system_type,
29+
):
2430
print(f" ✅ Registered: {tool.name}")
2531

2632
# Visual to show user we have ssl

mcpserver/cli/start.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ def main(args, extra, **kwargs):
3535

3636
# Get the tool manager and register discovered tools
3737
mcp = init_mcp(cfg.exclude, cfg.include, args.mask_error_details)
38-
get_manager(mcp, cfg)
38+
get_manager(mcp, cfg, system_type=args.system_type)
3939

4040
# Create ASGI app from MCP server
4141
mcp_app = mcp.http_app(path=cfg.server.path)

mcpserver/core/hub.py

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -76,14 +76,13 @@ async def get_one(wid, info):
7676
}
7777
try:
7878
async with info["client"] as sess:
79-
# 1. Call the tool
8079
mcp_result = await sess.call_tool("get_status", {})
8180

82-
# 2. Extract the text content from the MCP wrapper
81+
# Extract the text content from the MCP wrapper
8382
# FastMCP result.content is a list of content blocks
8483
raw_text = mcp_result.content[0].text
8584

86-
# 3. Parse the string back into a dictionary
85+
# Parse the string back into a dictionary
8786
try:
8887
# Handle potential single quotes from Python's str(dict)
8988
status_data = json.loads(raw_text.replace("'", '"'))
@@ -150,7 +149,7 @@ def _create_proxy(self, worker_id: str, tool: Tool):
150149
# Generate a safe function name
151150
proxy_name = f"{utils.sanitize(worker_id)}_{utils.sanitize(tool.name)}"
152151

153-
# FIX: Check if this tool is already registered to avoid ValueError on re-registration
152+
# Check if this tool is already registered to avoid ValueError on re-registration
154153
if proxy_name in self._registered_proxies:
155154
print(f"🛰️ Re-discovered worker tool: [blue]{proxy_name}[/blue]")
156155
return
@@ -164,8 +163,8 @@ def _create_proxy(self, worker_id: str, tool: Tool):
164163
# Create the signature string: arg_1=None, arg_2=None
165164
arg_string = ", ".join([f"{safe_name}=None" for safe_name in arg_mapping.keys()])
166165

167-
# 3. Build the dynamic function
168-
# FIX: We pass 'self' (the HubManager instance) as 'hub' to the exec scope.
166+
# Build the dynamic function
167+
# We pass 'self' (the HubManager instance) as 'hub' to the exec scope.
169168
# This allows the proxy function to look up the LATEST url for the worker
170169
# every time it is called, instead of using a hardcoded stale URL.
171170
exec_globals = {

mcpserver/core/worker.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,13 +42,16 @@ def from_args(cls, mcp, args, cfg) -> Optional["WorkerManager"]:
4242
public_url = (
4343
args.public_url or f"http://{cfg.server.host}:{cfg.server.port}{cfg.server.path}"
4444
)
45+
46+
sys_type = getattr(args, "system_type", "generic")
47+
worker_type = sys_type.split(".")[-1] if "." in sys_type else sys_type
4548
return cls(
4649
mcp,
4750
hub_url=args.join,
4851
secret=args.join_secret,
4952
worker_id=args.register_id,
5053
public_url=public_url,
51-
worker_type=args.worker_type,
54+
worker_type=worker_type,
5255
labels=args.labels,
5356
)
5457

mcpserver/tools/manager.py

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@
1111
# These are the function types we want to discover
1212
from fastmcp.tools import Tool
1313

14+
import mcpserver.defaults as defaults
15+
import mcpserver.tools.system as systems
16+
1417
from .base import BaseTool
1518

1619

@@ -104,13 +107,20 @@ def discover_tools(self, root_path: str, module_path: str) -> Dict[str, Path]:
104107
discovered[tool_id] = {"path": file_path, "module": import_path, "root": root_path}
105108
return discovered
106109

107-
def load_tools(self, mcp, include=None, exclude=None, status_tool: str = None):
110+
def load_tools(self, mcp, include=None, exclude=None, system_type="generic"):
108111
"""
109112
Load a set of named tools, or default to all those discovered.
110113
"""
111-
# Start with the system status tool
112-
status_tool = status_tool or "mcpserver.tools.system.tool"
113-
self.tools["system"] = {"module": status_tool}
114+
if system_type in systems.system_tools:
115+
# Map short name to internal library path
116+
sys_module = f"mcpserver.tools.system.{system_type}"
117+
else:
118+
# Assume it is a custom external module path provided by the user
119+
# e.g., 'my_custom_package.system_logic'
120+
sys_module = system_type
121+
122+
# Seed the system tool into discovery
123+
self.tools["system"] = {"module": sys_module}
114124

115125
# If no tools are selected... select all tools discovered
116126
names = self.tools
@@ -199,13 +209,13 @@ def get_available_prompts(self):
199209
"""
200210
prompts = set()
201211

202-
# 2. Load them (to execute decorators)
212+
# Load them (to execute decorators)
203213
for tool_id, path in self.tools.items():
204214
mod = self.load_tool_module(tool_id, path)
205215
if not mod:
206216
continue
207217

208-
# 3. Inspect the classes/functions in the module
218+
# Inspect the classes/functions in the module
209219
for name, obj in inspect.getmembers(mod):
210220
# We usually look for classes inheriting from BaseTool
211221
# But we can also just scan the class attributes

mcpserver/tools/system/__init__.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
from .flux import SystemTool as FluxSystemTool
2+
from .generic import SystemTool as GenericSystemTool
3+
from .kubernetes import SystemTool as KubernetesSystemTool
4+
5+
system_tools = {
6+
"flux": FluxSystemTool,
7+
"generic": GenericSystemTool,
8+
"kubernetes": KubernetesSystemTool,
9+
}

mcpserver/tools/system/flux.py

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
import time
2+
from typing import Any, Dict
3+
4+
from mcpserver.tools.base import BaseTool
5+
from mcpserver.tools.decorator import mcp
6+
7+
8+
class SystemTool(BaseTool):
9+
"""
10+
System tool specialized for Flux Framework.
11+
"""
12+
13+
def setup(self, manager=None):
14+
self.manager = manager
15+
16+
@mcp.tool(name="get_status")
17+
def get_status(self) -> Dict[str, Any]:
18+
"""
19+
Get status of the flux cluster.
20+
"""
21+
import flux
22+
import flux.resource
23+
24+
flux_meta = {"status": "error", "message": "Flux handle failed"}
25+
try:
26+
h = flux.Flux()
27+
listing = flux.resource.list.resource_list(h).get()
28+
flux_meta = {
29+
"status": "online",
30+
"free_cores": listing.free.ncores,
31+
"up_nodes": listing.up.nnodes,
32+
}
33+
except Exception as e:
34+
flux_meta["error"] = str(e)
35+
36+
res = {"timestamp": time.time(), "system_type": "flux", "flux": flux_meta, "tools": {}}
37+
if self.manager:
38+
for tid, inst in self.manager.instances.items():
39+
if inst == self:
40+
continue
41+
res["tools"][tid] = {"class": inst.__class__.__name__}
42+
return res

0 commit comments

Comments
 (0)