agentnet is a lightweight Python package that lets agents discover each other and exchange direct messages over NATS.
Agent demo/runtime code has been moved out of this repository to a dedicated playground repo:
realm-agents-playground(external repo)
This repository now focuses on network infrastructure only:
src/agentnetservices/registrydocker- network/SDK docs
Legacy agent entrypoints are temporarily kept as compatibility stubs and print a "moved" notice.
Network operator reference: NETWORK_CLI_GUIDE.md
UI integration reference: UI_BACKEND_MAPPING.md
Canonical SDK bridge doc for external agent repos: BUILD_AGENT_BRIDGE_IN_15_MINUTES.md
Streaming/UI protocol reference: STREAMING_PROTOCOL.md
- Agents join the network with a single
AgentNode - Account inbox routing (
account.<account_id>.inbox) with stable account identity - Online registry with agent metadata and capabilities
- Thread-aware messaging (
thread_id,parent_message_id) - Discovery RPCs (
registry.search,registry.profile) - Thread discovery RPC (
registry.thread_list) - Thread history RPC (
registry.thread_messages) with cursor pagination - Message search RPC (
registry.message_search) with filters - Thread stats RPC (
registry.thread_status) for storage inspection - Envelope protocol versioning (
schema_version) with backward-compatible defaulting - Optional idempotency keys (
idempotency_key) to suppress duplicate logical operations - Delivery receipts with sender retry policy
- Async-first API for scripts and long-running workers
- Local blob store helpers for file/image references (
blob_refpayloads over AgentNet) - Bounded in-process concurrency (worker cap + pending queue cap)
- Safety guardrails: TTL checks, dedupe, rate limiting, work timeouts, circuit breaker
- Durable metadata in Postgres (
agent_accounts,agent_sessions,agent_threads,agent_messages)
pip install agentnetFrom this repo:
cp .env.example .env
# edit .env and set POSTGRES_PASSWORD / DATABASE_URL before first run
docker compose -f docker/docker-compose.yml up -dThis starts:
- NATS on
localhost:4222 - Postgres for durable session metadata
- Registry service subscribed to
registry.register,registry.hello,registry.goodbye,registry.list,registry.search,registry.profile,registry.thread_list,registry.thread_messages,registry.message_search, andregistry.thread_status
import asyncio
from agentnet import AgentNode
async def main():
# Notice the security token is passed directly in the URL!
node = AgentNode(
agent_id="agent_foo",
name="FooAgent",
capabilities=["chat"],
nats_url="nats://agentnet_secret_token@localhost:4222"
)
@node.on_message
async def handle(msg):
print("Got message:", msg.payload)
await node.start_forever()
asyncio.run(main())await node.send(to="@weather_bot", payload={"text": "yo"})
await node.send_to_account(to_account_id="acct_01...", payload={"text": "yo"})
await node.send_to_username(username="weather_bot", payload={"text": "yo"})Use the high-level SDK for common agent workflows:
from agentnet.sdk import AgentSDK
async with AgentSDK(
agent_id="agent_a",
name="Agent A",
nats_url="nats://agentnet_secret_token@localhost:4222",
) as sdk:
online = await sdk.list_online()
print([a.username for a in online])
profile = await sdk.get_profile("mesh_agent_1")
print(profile.get("bio", ""))
result = await sdk.ask_text(
"mesh_agent_1",
"Analyze spread and total for Celtics @ Suns",
thread_id="ops_1",
)
print(result.text)
thread = sdk.thread("ops_1")
await thread.send_text("mesh_agent_1", "Confirm your final pick")LLM local tool wrappers can directly call:
sdk.list_online()sdk.get_profile(target)sdk.ask_text(to, text, thread_id=...)sdk.send_text(to, text, thread_id=...)sdk.list_threads(...)sdk.get_thread_messages(thread_id, limit=..., cursor=...)sdk.search_messages(...)sdk.thread_status(thread_id)
Blob/file helpers are available for local integrations:
sdk.put_blob_bytes(...)sdk.put_blob_file(...)sdk.get_blob_bytes(blob_id)sdk.get_blob_text(blob_id)sdk.head_blob(blob_id)sdk.send_blob_ref(to, blob, thread_id=...)
Blob references travel as normal message payloads:
blob = sdk.put_blob_file("/tmp/chart.png")
await sdk.send_blob_ref("@vision_agent", blob, thread_id="media_ops_1")Under the hood this publishes to:
account.acct_01....inbox
TypeScript SDK (work-in-progress) lives at:
ts-sdk/agentnet-sdk
Python API:
from agentnet import list_online_agents
agents = await list_online_agents("nats://agentnet_secret_token@localhost:4222")
print([a.to_dict() for a in agents])CLI:
python -m agentnet list --nats-url nats://agentnet_secret_token@localhost:4222
# or, after install:
agentnet list --nats-url nats://agentnet_secret_token@localhost:4222Account-route message examples:
agentnet send --nats-url nats://agentnet_secret_token@localhost:4222 --to-username weather_bot '{"text":"yo"}'
agentnet request --nats-url nats://agentnet_secret_token@localhost:4222 --to-account acct_01abc... '{"text":"ping"}'
agentnet search --nats-url nats://agentnet_secret_token@localhost:4222 --query weather --online-only
agentnet profile --nats-url nats://agentnet_secret_token@localhost:4222 --username weather_botOperator workflows (easy testing + visibility):
# Watch live inbox traffic (summary lines)
agentnet watch --nats-url nats://agentnet_secret_token@localhost:4222 --subject 'account.*.inbox'
# Watch delivery receipts
agentnet watch --nats-url nats://agentnet_secret_token@localhost:4222 --subject 'account.*.receipts'
# Join as an operator and chat interactively with an agent
agentnet chat --nats-url nats://agentnet_secret_token@localhost:4222 --to-username mesh_agent_1 --thread-id ops_thread_1
# Inspect stored thread stats
agentnet thread-status --nats-url nats://agentnet_secret_token@localhost:4222 --thread-id ops_thread_1
# Discover old threads and pick one to resume
agentnet threads --nats-url nats://agentnet_secret_token@localhost:4222 --participant-username mesh_agent_1 --limit 20
# Page through messages inside one thread
agentnet thread-messages --nats-url nats://agentnet_secret_token@localhost:4222 --thread-id ops_thread_1 --limit 50
# Search messages by filters
agentnet message-search --nats-url nats://agentnet_secret_token@localhost:4222 --thread-id ops_thread_1 --kind request --limit 50agentnet chat commands:
/showthreadprints current thread id/thread <id>switches thread context/quitexits interactive mode
agentnet threads outputs:
- recent matching thread IDs
- message/tail/token counts and last activity time
agentnet thread-status outputs:
message_count,pending_messages,byte_count,approx_tokens,latest_checkpoint_end- storage metadata for the thread; active context assembly is expected to be owned by agent infrastructure
- compatibility
statusfield remains present and currently reportsok
AgentNet relies on NATS Token Authentication to ensure that unauthorized third parties cannot connect to your backend router and spoof AI agents.
Your docker-compose.yml backend spins up requiring the default token agentnet_secret_token. You must prepend this token (like nats://<TOKEN>@<IP>) to every AgentNode or CLI command, or the server will instantly reject the TCP connection. Before deploying to production, ALWAYS change this --auth token in your docker network!
For local anti-spoof testing, you can enable dev auth:
- Start registry with
DEV_AUTH=true(compose env is supported):
DEV_AUTH=true docker compose -f docker/docker-compose.yml up -d --build registry- Set
DEV_AUTH=truein agent env. - Each agent auto-creates a local key file in
.keys/<username-or-agent>.json. - Register requests are signed (
dev-ed25519-v1), and registry binds one public key per account.
If a different key later tries to register for the same account, registry rejects it with auth_public_key_mismatch.
Registry persists account metadata into agent_accounts and session metadata into agent_sessions.
agent_accounts:
account_id(PK)username(unique)display_namebiocapabilities(JSONB)metadata(JSONB)visibilitystatuscreated_atupdated_at
agent_sessions:
session_tag(PK)agent_idaccount_idusernameserver_idconnected_atdisconnected_atlast_seenstatusmetadata(JSONB)
Retention controls:
SESSION_RETENTION_DAYS(default:14): offline sessions older than retention are pruned.THREAD_RETENTION_DAYS(default:30): stale threads (and their messages) older than retention are pruned by registry GC.
Thread/message persistence:
agent_threadsstores thread metadata and participant account IDs.agent_messagesstores message envelopes keyed bymessage_idwiththread_idandparent_message_id.- Registry persists messages by tapping NATS subjects (
account.*.inbox,agent.capability.*,_INBOX.>).
- Account messages:
account.<account_id>.inbox - Delivery receipts:
account.<account_id>.receipts - Register (request/reply):
registry.register - Account resolve (request/reply):
registry.resolve_account - Key resolve (request/reply):
registry.resolve_key - Search (request/reply):
registry.search - Profile (request/reply):
registry.profile - Thread list (request/reply):
registry.thread_list - Thread messages (request/reply):
registry.thread_messages - Message search (request/reply):
registry.message_search - Thread status (request/reply):
registry.thread_status - Presence hello:
registry.hello - Presence goodbye:
registry.goodbye - List request:
registry.list(request/reply)
{
"message_id": "b3d0...",
"from_agent": "agent_a",
"from_account_id": "acct_01...",
"from_session_tag": "registry_01J...",
"to_agent": "agent_b",
"to_account_id": "acct_02...",
"payload": {"text": "yo"},
"sent_at": "2026-02-21T08:00:00Z",
"ttl_ms": 30000,
"expires_at": "2026-02-21T08:00:30Z",
"trace_id": "b3d0...",
"thread_id": "thread_b3d0...",
"parent_message_id": null,
"kind": "direct",
"schema_version": "1.1",
"idempotency_key": "order_123_step_1",
"auth": {
"scheme": "dev-ed25519-v1",
"public_key": "<base64url>",
"claims": {"message_id": "b3d0...", "payload_sha256": "..."},
"signature": "<base64url>"
}
}{
"message_id": "b3d0...",
"status": "accepted",
"event_at": "2026-02-21T08:00:00Z",
"from_account_id": "acct_02...",
"from_session_tag": "registry_01J...",
"to_account_id": "acct_01...",
"trace_id": "b3d0...",
"thread_id": "thread_b3d0..."
}{
"agent_id": "agent_a",
"account_id": "acct_01...",
"username": "agent_a",
"name": "Agent A",
"session_tag": "registry_01J...",
"capabilities": ["chat", "summarize"],
"metadata": {"team": "ops"},
"last_seen": "2026-02-21T08:00:00Z"
}{
"account_id": "acct_01...",
"username": "agent_a",
"session_tag": "registry_01J...",
"heartbeat_interval": 12.0,
"ttl_seconds": 40.0
}agent_id is the logical role label. account_id is stable routing identity. session_tag is the unique identity for that specific running process instance.
send() / request() are account-routed only. Use account targets (to="account:acct_...", to="acct_...") or usernames (to="@name" or to="name"). You can also call send_to_account(), request_account(), send_to_username(), and request_username() directly.
send_*() waits for a delivery receipt by default and retries publish on receipt timeout (default_send_retry_attempts=2, default_receipt_timeout_seconds=1.5).
Incoming messages without ttl_ms or expires_at are rejected by default.
Legacy thread-budget env knobs (registry service):
THREAD_SOFT_LIMIT_TOKENS(default50000)THREAD_HARD_LIMIT_TOKENS(default60000)THREAD_KEEP_TAIL_MESSAGES(default24)TOKEN_ESTIMATE_CHARS_PER_TOKEN(default4)THREAD_COMPACTION_EVENT_ENABLED(deprecated; ignored by registry)
Thread compaction, checkpointing, and prompt assembly are now expected to be owned by agent infrastructure rather than the network.
MESH_COMPACTION_KEEP_TAIL_MESSAGES(default24)MESH_COMPACTION_MAX_MESSAGES(default400)
Presence is kept accurate via heartbeat:
- Agents re-publish
registry.helloevery 10-15 seconds (default: 12 seconds) - Registry evicts agents with no heartbeat for 30-45 seconds (default TTL: 40 seconds)
AgentNode enforces bounded in-process concurrency and backpressure by default:
max_concurrency=4max_pending=100work_timeout_seconds=30max_payload_bytes=256000rate_limit_per_sender_per_sec=5withrate_limit_burst=10dedupe_ttl_seconds=600circuit_breaker_failures=5withcircuit_breaker_reset_seconds=15
You can override these in AgentNode(...) per agent process.
Use AgentWrapper if you want a stable adapter API for embedding AgentNet in your own agent runtime:
import asyncio
from agentnet import AgentWrapper
async def main() -> None:
sdk = AgentWrapper(
agent_id="demo_worker",
name="Demo Worker",
username="demo_worker",
nats_url="nats://agentnet_secret_token@localhost:4222",
)
@sdk.receive
async def on_message(msg):
if msg.reply_to:
await sdk.reply(request=msg, payload={"ok": True})
await sdk.connect()
await sdk.request(to_username="weather_bot", payload={"text": "status?"}, timeout=5.0)
await sdk.close()
asyncio.run(main())In this repo:
python -m venv .venv
source .venv/bin/activate
pip install -U pip
pip install -e .
python examples/agent_a.py
python examples/agent_b.py
python examples/list_agents.py- Install dependencies and start infra:
python -m venv .venv
source .venv/bin/activate
pip install -U pip
pip install -e .
docker compose -f docker/docker-compose.yml up -d --build- Create env file and add your key:
cp agents/.env.example agents/.envSet NOVITA_API_KEY in agents/.env.
- Start Agent B (responder) in terminal 1:
set -a
source agents/.env
set +a
python agents/agent_novita_b.py- Run Agent A (initiator) in terminal 2:
set -a
source agents/.env
set +a
python agents/agent_novita_a.pyAgent A will generate questions with zai-org/glm-5, send them to Agent B over AgentNet, and print turn-by-turn replies.
For cleaner recording output, tune these in agents/.env:
DEMO_TURNS=4for a fuller back-and-forthNOVITA_MAX_TOKENS=256for shorter answersLOG_TEXT_MAX_CHARS=700to cap displayed text length per log blockTARGET_USERNAME=agent_novita_bto control who Agent A talks to
This repo now includes a 3-agent skeleton that uses:
- per-agent system prompt files in
agents/prompts/ - per-agent tool allowlists from
tools/ - per-agent provider/model config from YAML
- OpenAI-compatible
/v1/chat/completionsand Claude/v1/messages
Files:
agents/agent_mesh_trio.py(runner)agents/config/mesh_agents.yaml(agent/model/tool config)agents/prompts/agent_1.txt,agents/prompts/agent_2.txt,agents/prompts/agent_3.txt
Run:
cd Realm
source venv/bin/activate
set -a
source agents/.env
set +a
python agents/agent_mesh_trio.py --config agents/config/mesh_agents.yamlRun a subset:
python agents/agent_mesh_trio.py --config agents/config/mesh_agents.yaml --only agent_1 --only agent_2Required env vars for this skeleton:
OPENAI_API_KEY,OPENAI_BASE_URLANTHROPIC_API_KEY,ANTHROPIC_BASE_URL- tool-side keys (if you call those tools):
RSC_TOKEN,ODDS_API_KEY,PERPLEXITY_API_KEY