Created
February 20, 2026 23:07
-
-
Save itseffi/7eeb3a0dd800569d048ca6d54bbfa0ac to your computer and use it in GitHub Desktop.
nanoclaw
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| # nanoclaw.py - A minimal OpenClaw | |
| # Multi-agent CLI with role-based agents | |
| # Run: uv run --with anthropic --with schedule python nanoclaw.py | |
| import json | |
| import os | |
| import shlex | |
| import subprocess | |
| import threading | |
| import time | |
| from collections import defaultdict | |
| from datetime import datetime | |
| import anthropic | |
| import schedule | |
| client = anthropic.Anthropic() | |
| # Configuration | |
| WORKSPACE = os.path.expanduser("~/.nanoclaw") | |
| SESSIONS_DIR = os.path.join(WORKSPACE, "sessions") | |
| MEMORY_DIR = os.path.join(WORKSPACE, "memory") | |
| APPROVALS_FILE = os.path.join(WORKSPACE, "exec-approvals.json") | |
| DEFAULT_MODEL = "claude-opus-4-1-20250805" | |
| # Agents | |
| AGENTS = { | |
| "main": { | |
| "name": "Claw", | |
| "model": DEFAULT_MODEL, | |
| "soul": ( | |
| "You are Claw, a personal AI assistant.\n" | |
| "Be genuinely helpful (not performatively), have opinions, be resourceful before asking, earn trust through competence.\n" | |
| "You have tools - use them proactively.\n\n" | |
| "## Memory\n" | |
| f"Your workspace is {WORKSPACE}.\n" | |
| "Use save_memory to store important information across sessions.\n" | |
| "Use memory_search at the start of conversations to recall context." | |
| ), | |
| "session_prefix": "agent:main", | |
| }, | |
| "researcher": { | |
| "name": "Researcher", | |
| "model": DEFAULT_MODEL, | |
| "soul": ( | |
| "You are Researcher, a product-and-research hybrid.\n" | |
| "Your job: deliver evidence-backed findings and clear product recommendations.\n" | |
| "Use tools to gather data, cite sources for key claims, and separate facts from assumptions.\n" | |
| "Save important findings with save_memory for other agents to reference." | |
| ), | |
| "session_prefix": "agent:researcher", | |
| }, | |
| } | |
| # Tools | |
| TOOLS = [ | |
| { | |
| "name": "run_command", | |
| "description": "Run a shell command", | |
| "input_schema": { | |
| "type": "object", | |
| "properties": { | |
| "command": {"type": "string", "description": "The command to run"} | |
| }, | |
| "required": ["command"], | |
| }, | |
| }, | |
| { | |
| "name": "read_file", | |
| "description": "Read a file from the filesystem", | |
| "input_schema": { | |
| "type": "object", | |
| "properties": {"path": {"type": "string", "description": "Path to the file"}}, | |
| "required": ["path"], | |
| }, | |
| }, | |
| { | |
| "name": "write_file", | |
| "description": "Write content to a file (creates directories if needed)", | |
| "input_schema": { | |
| "type": "object", | |
| "properties": { | |
| "path": {"type": "string", "description": "Path to the file"}, | |
| "content": {"type": "string", "description": "Content to write"}, | |
| }, | |
| "required": ["path", "content"], | |
| }, | |
| }, | |
| { | |
| "name": "save_memory", | |
| "description": "Save important information to long-term memory", | |
| "input_schema": { | |
| "type": "object", | |
| "properties": { | |
| "key": { | |
| "type": "string", | |
| "description": "Short label (e.g. 'user-preferences')", | |
| }, | |
| "content": {"type": "string", "description": "The information to remember"}, | |
| }, | |
| "required": ["key", "content"], | |
| }, | |
| }, | |
| { | |
| "name": "memory_search", | |
| "description": "Search long-term memory for relevant information", | |
| "input_schema": { | |
| "type": "object", | |
| "properties": { | |
| "query": {"type": "string", "description": "What to search for"} | |
| }, | |
| "required": ["query"], | |
| }, | |
| }, | |
| ] | |
| # Permission controls | |
| SAFE_COMMANDS = { | |
| "ls", | |
| "cat", | |
| "head", | |
| "tail", | |
| "wc", | |
| "date", | |
| "whoami", | |
| "echo", | |
| "pwd", | |
| "which", | |
| } | |
| DISALLOWED_TOKENS = {"|", "||", "&", "&&", ";", ">", ">>", "<", "<<"} | |
| def load_approvals(): | |
| if os.path.exists(APPROVALS_FILE): | |
| with open(APPROVALS_FILE) as f: | |
| return json.load(f) | |
| return {"allowed": [], "denied": []} | |
| def save_approval(command, approved): | |
| approvals = load_approvals() | |
| key = "allowed" if approved else "denied" | |
| if command not in approvals[key]: | |
| approvals[key].append(command) | |
| with open(APPROVALS_FILE, "w") as f: | |
| json.dump(approvals, f, indent=2) | |
| def parse_command(command): | |
| command = command.strip() | |
| if not command: | |
| return None, "Empty command" | |
| try: | |
| args = shlex.split(command) | |
| except ValueError as e: | |
| return None, f"Invalid command syntax: {e}" | |
| if not args: | |
| return None, "Empty command" | |
| if any(token in DISALLOWED_TOKENS for token in args): | |
| return None, "Pipes/redirection/chaining are disabled for safety" | |
| return args, None | |
| def check_command_safety(command): | |
| args, err = parse_command(command) | |
| if err: | |
| return "invalid" | |
| base_cmd = args[0] | |
| if base_cmd in SAFE_COMMANDS: | |
| return "safe" | |
| approvals = load_approvals() | |
| if command in approvals["allowed"]: | |
| return "approved" | |
| return "needs_approval" | |
| # Tool execution | |
| def execute_tool(name, tool_input): | |
| if name == "run_command": | |
| cmd = tool_input["command"] | |
| args, parse_err = parse_command(cmd) | |
| if parse_err: | |
| return parse_err | |
| safety = check_command_safety(cmd) | |
| if safety == "invalid": | |
| return "Invalid or blocked command" | |
| if safety == "needs_approval": | |
| print(f"\n WARNING Command: {cmd}") | |
| confirm = input(" Allow? (y/n): ").strip().lower() | |
| if confirm != "y": | |
| save_approval(cmd, False) | |
| return "Permission denied by user." | |
| save_approval(cmd, True) | |
| try: | |
| result = subprocess.run( | |
| args, | |
| shell=False, | |
| capture_output=True, | |
| text=True, | |
| timeout=30, | |
| ) | |
| output = result.stdout + result.stderr | |
| return output if output else "(no output)" | |
| except subprocess.TimeoutExpired: | |
| return "Command timed out after 30 seconds" | |
| except Exception as e: | |
| return f"Error: {e}" | |
| if name == "read_file": | |
| try: | |
| with open(tool_input["path"], "r") as f: | |
| return f.read()[:10000] | |
| except Exception as e: | |
| return f"Error: {e}" | |
| if name == "write_file": | |
| try: | |
| os.makedirs(os.path.dirname(tool_input["path"]) or ".", exist_ok=True) | |
| with open(tool_input["path"], "w") as f: | |
| f.write(tool_input["content"]) | |
| return f"Wrote to {tool_input['path']}" | |
| except Exception as e: | |
| return f"Error: {e}" | |
| if name == "save_memory": | |
| os.makedirs(MEMORY_DIR, exist_ok=True) | |
| filepath = os.path.join(MEMORY_DIR, f"{tool_input['key']}.md") | |
| with open(filepath, "w") as f: | |
| f.write(tool_input["content"]) | |
| return f"Saved to memory: {tool_input['key']}" | |
| if name == "memory_search": | |
| query = tool_input["query"].lower() | |
| results = [] | |
| if os.path.exists(MEMORY_DIR): | |
| for fname in os.listdir(MEMORY_DIR): | |
| if fname.endswith(".md"): | |
| with open(os.path.join(MEMORY_DIR, fname), "r") as f: | |
| content = f.read() | |
| if any(word in content.lower() for word in query.split()): | |
| results.append(f"--- {fname} ---\n{content}") | |
| return "\n\n".join(results) if results else "No matching memories found." | |
| return f"Unknown tool: {name}" | |
| # Session management | |
| def get_session_path(session_key): | |
| os.makedirs(SESSIONS_DIR, exist_ok=True) | |
| safe_key = session_key.replace(":", "_").replace("/", "_") | |
| return os.path.join(SESSIONS_DIR, f"{safe_key}.jsonl") | |
| def load_session(session_key): | |
| path = get_session_path(session_key) | |
| messages = [] | |
| if os.path.exists(path): | |
| with open(path, "r") as f: | |
| for line in f: | |
| if line.strip(): | |
| try: | |
| messages.append(json.loads(line)) | |
| except json.JSONDecodeError: | |
| continue | |
| return messages | |
| def append_message(session_key, message): | |
| with open(get_session_path(session_key), "a") as f: | |
| f.write(json.dumps(message) + "\n") | |
| def save_session(session_key, messages): | |
| with open(get_session_path(session_key), "w") as f: | |
| for msg in messages: | |
| f.write(json.dumps(msg) + "\n") | |
| # Compaction | |
| def estimate_tokens(messages): | |
| return sum(len(json.dumps(msg)) for msg in messages) // 4 | |
| def compact_session(session_key, messages): | |
| if estimate_tokens(messages) < 100_000: | |
| return messages | |
| split = len(messages) // 2 | |
| old, recent = messages[:split], messages[split:] | |
| print("\n Compacting session history...") | |
| summary = client.messages.create( | |
| model=DEFAULT_MODEL, | |
| max_tokens=2000, | |
| messages=[ | |
| { | |
| "role": "user", | |
| "content": ( | |
| "Summarize this conversation concisely. Preserve key facts, " | |
| "decisions, and open tasks:\n\n" | |
| f"{json.dumps(old, indent=2)}" | |
| ), | |
| } | |
| ], | |
| ) | |
| compacted = [ | |
| {"role": "user", "content": f"[Conversation summary]\n{summary.content[0].text}"} | |
| ] + recent | |
| save_session(session_key, compacted) | |
| return compacted | |
| session_locks = defaultdict(threading.Lock) | |
| def serialize_content(content): | |
| serialized = [] | |
| for block in content: | |
| if hasattr(block, "text"): | |
| serialized.append({"type": "text", "text": block.text}) | |
| elif block.type == "tool_use": | |
| serialized.append( | |
| { | |
| "type": "tool_use", | |
| "id": block.id, | |
| "name": block.name, | |
| "input": block.input, | |
| } | |
| ) | |
| return serialized | |
| def run_agent_turn(session_key, user_text, agent_config): | |
| with session_locks[session_key]: | |
| messages = load_session(session_key) | |
| messages = compact_session(session_key, messages) | |
| user_msg = {"role": "user", "content": user_text} | |
| messages.append(user_msg) | |
| append_message(session_key, user_msg) | |
| for _ in range(20): | |
| response = client.messages.create( | |
| model=agent_config["model"], | |
| max_tokens=4096, | |
| system=agent_config["soul"], | |
| tools=TOOLS, | |
| messages=messages, | |
| ) | |
| content = serialize_content(response.content) | |
| assistant_msg = {"role": "assistant", "content": content} | |
| messages.append(assistant_msg) | |
| append_message(session_key, assistant_msg) | |
| if response.stop_reason == "end_turn": | |
| return "".join( | |
| block.text for block in response.content if hasattr(block, "text") | |
| ) | |
| if response.stop_reason == "tool_use": | |
| tool_results = [] | |
| for block in response.content: | |
| if block.type == "tool_use": | |
| print(f" Tool {block.name}: {json.dumps(block.input)[:100]}") | |
| result = execute_tool(block.name, block.input) | |
| print(f" -> {str(result)[:150]}") | |
| tool_results.append( | |
| { | |
| "type": "tool_result", | |
| "tool_use_id": block.id, | |
| "content": str(result), | |
| } | |
| ) | |
| results_msg = {"role": "user", "content": tool_results} | |
| messages.append(results_msg) | |
| append_message(session_key, results_msg) | |
| return "(max turns reached)" | |
| def resolve_agent(message_text): | |
| if message_text.startswith("/research "): | |
| return "researcher", message_text[len("/research ") :] | |
| return "main", message_text | |
| def setup_heartbeats(): | |
| def morning_check(): | |
| print("\nHeartbeat: morning check") | |
| result = run_agent_turn( | |
| "cron:morning-check", | |
| "Good morning! Check today's date and give me a motivational quote.", | |
| AGENTS["main"], | |
| ) | |
| print(f"[Claw] {result}\n") | |
| schedule.every().day.at("07:30").do(morning_check) | |
| def scheduler_loop(): | |
| while True: | |
| schedule.run_pending() | |
| time.sleep(60) | |
| threading.Thread(target=scheduler_loop, daemon=True).start() | |
| def main(): | |
| for directory in [WORKSPACE, SESSIONS_DIR, MEMORY_DIR]: | |
| os.makedirs(directory, exist_ok=True) | |
| setup_heartbeats() | |
| session_key = "agent:main:repl" | |
| print("nanoclaw") | |
| print(f" Agents: {', '.join(agent['name'] for agent in AGENTS.values())}") | |
| print(f" Workspace: {WORKSPACE}") | |
| print(" Commands: /new (reset), /research <query>, /quit\n") | |
| while True: | |
| try: | |
| user_input = input("You: ").strip() | |
| except (EOFError, KeyboardInterrupt): | |
| print("\nGoodbye!") | |
| break | |
| if not user_input: | |
| continue | |
| if user_input.lower() in ["/quit", "/exit", "/q"]: | |
| print("Goodbye!") | |
| break | |
| if user_input.lower() == "/new": | |
| session_key = f"agent:main:repl:{datetime.now().strftime('%Y%m%d%H%M%S')}" | |
| print(" Session reset.\n") | |
| continue | |
| agent_id, message_text = resolve_agent(user_input) | |
| agent_config = AGENTS[agent_id] | |
| active_session = ( | |
| f"{agent_config['session_prefix']}:repl" | |
| if agent_id != "main" | |
| else session_key | |
| ) | |
| response = run_agent_turn(active_session, message_text, agent_config) | |
| print(f"\n[{agent_config['name']}] {response}\n") | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment