Skip to content

Instantly share code, notes, and snippets.

@itseffi
Created February 20, 2026 23:07
Show Gist options
  • Select an option

  • Save itseffi/7eeb3a0dd800569d048ca6d54bbfa0ac to your computer and use it in GitHub Desktop.

Select an option

Save itseffi/7eeb3a0dd800569d048ca6d54bbfa0ac to your computer and use it in GitHub Desktop.
nanoclaw
#!/usr/bin/env python3
# nanoclaw.py - A minimal OpenClaw
# Multi-agent CLI with role-based agents
# Run: uv run --with anthropic --with schedule python nanoclaw.py
import json
import os
import shlex
import subprocess
import threading
import time
from collections import defaultdict
from datetime import datetime
import anthropic
import schedule
client = anthropic.Anthropic()
# Configuration
WORKSPACE = os.path.expanduser("~/.nanoclaw")
SESSIONS_DIR = os.path.join(WORKSPACE, "sessions")
MEMORY_DIR = os.path.join(WORKSPACE, "memory")
APPROVALS_FILE = os.path.join(WORKSPACE, "exec-approvals.json")
DEFAULT_MODEL = "claude-opus-4-1-20250805"
# Agents
AGENTS = {
"main": {
"name": "Claw",
"model": DEFAULT_MODEL,
"soul": (
"You are Claw, a personal AI assistant.\n"
"Be genuinely helpful (not performatively), have opinions, be resourceful before asking, earn trust through competence.\n"
"You have tools - use them proactively.\n\n"
"## Memory\n"
f"Your workspace is {WORKSPACE}.\n"
"Use save_memory to store important information across sessions.\n"
"Use memory_search at the start of conversations to recall context."
),
"session_prefix": "agent:main",
},
"researcher": {
"name": "Researcher",
"model": DEFAULT_MODEL,
"soul": (
"You are Researcher, a product-and-research hybrid.\n"
"Your job: deliver evidence-backed findings and clear product recommendations.\n"
"Use tools to gather data, cite sources for key claims, and separate facts from assumptions.\n"
"Save important findings with save_memory for other agents to reference."
),
"session_prefix": "agent:researcher",
},
}
# Tools
TOOLS = [
{
"name": "run_command",
"description": "Run a shell command",
"input_schema": {
"type": "object",
"properties": {
"command": {"type": "string", "description": "The command to run"}
},
"required": ["command"],
},
},
{
"name": "read_file",
"description": "Read a file from the filesystem",
"input_schema": {
"type": "object",
"properties": {"path": {"type": "string", "description": "Path to the file"}},
"required": ["path"],
},
},
{
"name": "write_file",
"description": "Write content to a file (creates directories if needed)",
"input_schema": {
"type": "object",
"properties": {
"path": {"type": "string", "description": "Path to the file"},
"content": {"type": "string", "description": "Content to write"},
},
"required": ["path", "content"],
},
},
{
"name": "save_memory",
"description": "Save important information to long-term memory",
"input_schema": {
"type": "object",
"properties": {
"key": {
"type": "string",
"description": "Short label (e.g. 'user-preferences')",
},
"content": {"type": "string", "description": "The information to remember"},
},
"required": ["key", "content"],
},
},
{
"name": "memory_search",
"description": "Search long-term memory for relevant information",
"input_schema": {
"type": "object",
"properties": {
"query": {"type": "string", "description": "What to search for"}
},
"required": ["query"],
},
},
]
# Permission controls
SAFE_COMMANDS = {
"ls",
"cat",
"head",
"tail",
"wc",
"date",
"whoami",
"echo",
"pwd",
"which",
}
DISALLOWED_TOKENS = {"|", "||", "&", "&&", ";", ">", ">>", "<", "<<"}
def load_approvals():
if os.path.exists(APPROVALS_FILE):
with open(APPROVALS_FILE) as f:
return json.load(f)
return {"allowed": [], "denied": []}
def save_approval(command, approved):
approvals = load_approvals()
key = "allowed" if approved else "denied"
if command not in approvals[key]:
approvals[key].append(command)
with open(APPROVALS_FILE, "w") as f:
json.dump(approvals, f, indent=2)
def parse_command(command):
command = command.strip()
if not command:
return None, "Empty command"
try:
args = shlex.split(command)
except ValueError as e:
return None, f"Invalid command syntax: {e}"
if not args:
return None, "Empty command"
if any(token in DISALLOWED_TOKENS for token in args):
return None, "Pipes/redirection/chaining are disabled for safety"
return args, None
def check_command_safety(command):
args, err = parse_command(command)
if err:
return "invalid"
base_cmd = args[0]
if base_cmd in SAFE_COMMANDS:
return "safe"
approvals = load_approvals()
if command in approvals["allowed"]:
return "approved"
return "needs_approval"
# Tool execution
def execute_tool(name, tool_input):
if name == "run_command":
cmd = tool_input["command"]
args, parse_err = parse_command(cmd)
if parse_err:
return parse_err
safety = check_command_safety(cmd)
if safety == "invalid":
return "Invalid or blocked command"
if safety == "needs_approval":
print(f"\n WARNING Command: {cmd}")
confirm = input(" Allow? (y/n): ").strip().lower()
if confirm != "y":
save_approval(cmd, False)
return "Permission denied by user."
save_approval(cmd, True)
try:
result = subprocess.run(
args,
shell=False,
capture_output=True,
text=True,
timeout=30,
)
output = result.stdout + result.stderr
return output if output else "(no output)"
except subprocess.TimeoutExpired:
return "Command timed out after 30 seconds"
except Exception as e:
return f"Error: {e}"
if name == "read_file":
try:
with open(tool_input["path"], "r") as f:
return f.read()[:10000]
except Exception as e:
return f"Error: {e}"
if name == "write_file":
try:
os.makedirs(os.path.dirname(tool_input["path"]) or ".", exist_ok=True)
with open(tool_input["path"], "w") as f:
f.write(tool_input["content"])
return f"Wrote to {tool_input['path']}"
except Exception as e:
return f"Error: {e}"
if name == "save_memory":
os.makedirs(MEMORY_DIR, exist_ok=True)
filepath = os.path.join(MEMORY_DIR, f"{tool_input['key']}.md")
with open(filepath, "w") as f:
f.write(tool_input["content"])
return f"Saved to memory: {tool_input['key']}"
if name == "memory_search":
query = tool_input["query"].lower()
results = []
if os.path.exists(MEMORY_DIR):
for fname in os.listdir(MEMORY_DIR):
if fname.endswith(".md"):
with open(os.path.join(MEMORY_DIR, fname), "r") as f:
content = f.read()
if any(word in content.lower() for word in query.split()):
results.append(f"--- {fname} ---\n{content}")
return "\n\n".join(results) if results else "No matching memories found."
return f"Unknown tool: {name}"
# Session management
def get_session_path(session_key):
os.makedirs(SESSIONS_DIR, exist_ok=True)
safe_key = session_key.replace(":", "_").replace("/", "_")
return os.path.join(SESSIONS_DIR, f"{safe_key}.jsonl")
def load_session(session_key):
path = get_session_path(session_key)
messages = []
if os.path.exists(path):
with open(path, "r") as f:
for line in f:
if line.strip():
try:
messages.append(json.loads(line))
except json.JSONDecodeError:
continue
return messages
def append_message(session_key, message):
with open(get_session_path(session_key), "a") as f:
f.write(json.dumps(message) + "\n")
def save_session(session_key, messages):
with open(get_session_path(session_key), "w") as f:
for msg in messages:
f.write(json.dumps(msg) + "\n")
# Compaction
def estimate_tokens(messages):
return sum(len(json.dumps(msg)) for msg in messages) // 4
def compact_session(session_key, messages):
if estimate_tokens(messages) < 100_000:
return messages
split = len(messages) // 2
old, recent = messages[:split], messages[split:]
print("\n Compacting session history...")
summary = client.messages.create(
model=DEFAULT_MODEL,
max_tokens=2000,
messages=[
{
"role": "user",
"content": (
"Summarize this conversation concisely. Preserve key facts, "
"decisions, and open tasks:\n\n"
f"{json.dumps(old, indent=2)}"
),
}
],
)
compacted = [
{"role": "user", "content": f"[Conversation summary]\n{summary.content[0].text}"}
] + recent
save_session(session_key, compacted)
return compacted
session_locks = defaultdict(threading.Lock)
def serialize_content(content):
serialized = []
for block in content:
if hasattr(block, "text"):
serialized.append({"type": "text", "text": block.text})
elif block.type == "tool_use":
serialized.append(
{
"type": "tool_use",
"id": block.id,
"name": block.name,
"input": block.input,
}
)
return serialized
def run_agent_turn(session_key, user_text, agent_config):
with session_locks[session_key]:
messages = load_session(session_key)
messages = compact_session(session_key, messages)
user_msg = {"role": "user", "content": user_text}
messages.append(user_msg)
append_message(session_key, user_msg)
for _ in range(20):
response = client.messages.create(
model=agent_config["model"],
max_tokens=4096,
system=agent_config["soul"],
tools=TOOLS,
messages=messages,
)
content = serialize_content(response.content)
assistant_msg = {"role": "assistant", "content": content}
messages.append(assistant_msg)
append_message(session_key, assistant_msg)
if response.stop_reason == "end_turn":
return "".join(
block.text for block in response.content if hasattr(block, "text")
)
if response.stop_reason == "tool_use":
tool_results = []
for block in response.content:
if block.type == "tool_use":
print(f" Tool {block.name}: {json.dumps(block.input)[:100]}")
result = execute_tool(block.name, block.input)
print(f" -> {str(result)[:150]}")
tool_results.append(
{
"type": "tool_result",
"tool_use_id": block.id,
"content": str(result),
}
)
results_msg = {"role": "user", "content": tool_results}
messages.append(results_msg)
append_message(session_key, results_msg)
return "(max turns reached)"
def resolve_agent(message_text):
if message_text.startswith("/research "):
return "researcher", message_text[len("/research ") :]
return "main", message_text
def setup_heartbeats():
def morning_check():
print("\nHeartbeat: morning check")
result = run_agent_turn(
"cron:morning-check",
"Good morning! Check today's date and give me a motivational quote.",
AGENTS["main"],
)
print(f"[Claw] {result}\n")
schedule.every().day.at("07:30").do(morning_check)
def scheduler_loop():
while True:
schedule.run_pending()
time.sleep(60)
threading.Thread(target=scheduler_loop, daemon=True).start()
def main():
for directory in [WORKSPACE, SESSIONS_DIR, MEMORY_DIR]:
os.makedirs(directory, exist_ok=True)
setup_heartbeats()
session_key = "agent:main:repl"
print("nanoclaw")
print(f" Agents: {', '.join(agent['name'] for agent in AGENTS.values())}")
print(f" Workspace: {WORKSPACE}")
print(" Commands: /new (reset), /research <query>, /quit\n")
while True:
try:
user_input = input("You: ").strip()
except (EOFError, KeyboardInterrupt):
print("\nGoodbye!")
break
if not user_input:
continue
if user_input.lower() in ["/quit", "/exit", "/q"]:
print("Goodbye!")
break
if user_input.lower() == "/new":
session_key = f"agent:main:repl:{datetime.now().strftime('%Y%m%d%H%M%S')}"
print(" Session reset.\n")
continue
agent_id, message_text = resolve_agent(user_input)
agent_config = AGENTS[agent_id]
active_session = (
f"{agent_config['session_prefix']}:repl"
if agent_id != "main"
else session_key
)
response = run_agent_turn(active_session, message_text, agent_config)
print(f"\n[{agent_config['name']}] {response}\n")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment