Last active
February 20, 2026 07:40
-
-
Save swombat/6f9f25d54710fdb94aa17840155c749b to your computer and use it in GitHub Desktop.
Gmail CLI for Claude Code - multi-account Gmail access via command line
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| Gmail CLI tool for Claude Code skill. | |
| Supports multiple Gmail accounts via separate credential files. | |
| Uses the Google Gmail API directly (no third-party wrappers). | |
| """ | |
| import argparse | |
| import base64 | |
| import json | |
| import os | |
| import sys | |
| from pathlib import Path | |
| # Config directory for storing credentials | |
| CONFIG_DIR = Path.home() / ".claude" / "skills" / "gmail" / "config" | |
| # Account aliases mapping to credential files | |
| # Customize these with your own email addresses | |
| ACCOUNTS = { | |
| "personal": "you@gmail.com", | |
| "work": "you@company.com", | |
| "business": "you@business.io", | |
| } | |
| def _resolve_account(account: str): | |
| """Resolve account alias to email and safe filename.""" | |
| email = ACCOUNTS.get(account, account) | |
| safe_name = email.replace("@", "_at_").replace(".", "_") | |
| return email, safe_name | |
| def _run_oauth_flow(client_secret_file: str, token_file: str, email: str): | |
| """Perform OAuth flow and save token as plain JSON.""" | |
| from google_auth_oauthlib.flow import InstalledAppFlow | |
| SCOPES = ['https://mail.google.com/'] | |
| print(f"\nAuthenticating: {email}", file=sys.stderr) | |
| print(f"A browser window will open — sign in with: {email}\n", file=sys.stderr) | |
| flow = InstalledAppFlow.from_client_secrets_file(client_secret_file, SCOPES) | |
| creds = flow.run_local_server(port=0) | |
| # Save as plain JSON (compatible with get_service) | |
| token_data = { | |
| "access_token": creds.token, | |
| "refresh_token": creds.refresh_token, | |
| "token_uri": creds.token_uri or "https://oauth2.googleapis.com/token", | |
| "client_id": creds.client_id, | |
| "client_secret": creds.client_secret, | |
| } | |
| with open(token_file, "w") as f: | |
| json.dump(token_data, f, indent=2) | |
| print(f"Token saved for {email}\n", file=sys.stderr) | |
| def get_service(account: str): | |
| """Get a Gmail API service client for the given account. | |
| Triggers OAuth flow if no token exists yet.""" | |
| from google.oauth2.credentials import Credentials | |
| from googleapiclient.discovery import build | |
| email, safe_name = _resolve_account(account) | |
| client_secret = CONFIG_DIR / f"client_secret_{safe_name}.json" | |
| token_path = CONFIG_DIR / f"token_{safe_name}.json" | |
| if not client_secret.exists(): | |
| print(f"ERROR: No credentials found for {email}", file=sys.stderr) | |
| print(f"Expected: {client_secret}", file=sys.stderr) | |
| print(f"\nRun 'gmail_cli.py setup' for instructions.", file=sys.stderr) | |
| sys.exit(1) | |
| if not token_path.exists(): | |
| _run_oauth_flow(str(client_secret), str(token_path), email) | |
| with open(token_path) as f: | |
| td = json.load(f) | |
| creds = Credentials( | |
| token=td.get("access_token"), | |
| refresh_token=td.get("refresh_token"), | |
| token_uri=td.get("token_uri", "https://oauth2.googleapis.com/token"), | |
| client_id=td.get("client_id"), | |
| client_secret=td.get("client_secret"), | |
| ) | |
| return build("gmail", "v1", credentials=creds, cache_discovery=False) | |
| def _extract_body(payload): | |
| """Extract text/plain body from a Gmail API message payload.""" | |
| if payload.get("mimeType") == "text/plain": | |
| data = payload["body"].get("data", "") | |
| if data: | |
| return base64.urlsafe_b64decode(data).decode("utf-8", errors="replace") | |
| for part in payload.get("parts", []): | |
| result = _extract_body(part) | |
| if result: | |
| return result | |
| return "" | |
| def _extract_html(payload): | |
| """Extract text/html body from a Gmail API message payload.""" | |
| if payload.get("mimeType") == "text/html": | |
| data = payload["body"].get("data", "") | |
| if data: | |
| return base64.urlsafe_b64decode(data).decode("utf-8", errors="replace") | |
| for part in payload.get("parts", []): | |
| result = _extract_html(part) | |
| if result: | |
| return result | |
| return "" | |
| def _extract_attachments(payload, attachments=None): | |
| """Extract attachment filenames from a Gmail API message payload.""" | |
| if attachments is None: | |
| attachments = [] | |
| filename = payload.get("filename") | |
| if filename: | |
| attachments.append(filename) | |
| for part in payload.get("parts", []): | |
| _extract_attachments(part, attachments) | |
| return attachments | |
| def cmd_list(args): | |
| """List recent emails.""" | |
| service = get_service(args.account) | |
| # Build query | |
| query_parts = ["in:inbox"] | |
| if args.unread: | |
| query_parts.append("is:unread") | |
| if args.starred: | |
| query_parts = ["is:starred"] | |
| if args.label: | |
| query_parts = [f"label:{args.label}"] | |
| query_parts.append("newer_than:30d") | |
| query = " ".join(query_parts) | |
| try: | |
| result = service.users().messages().list(userId="me", q=query, maxResults=args.limit).execute() | |
| except Exception as e: | |
| print(f"ERROR: List failed: {e}", file=sys.stderr) | |
| sys.exit(1) | |
| msg_refs = result.get("messages", []) | |
| if args.json: | |
| output = [] | |
| for ref in msg_refs: | |
| msg = service.users().messages().get(userId="me", id=ref["id"], format="metadata", | |
| metadataHeaders=["From", "To", "Subject", "Date"]).execute() | |
| headers = {h["name"]: h["value"] for h in msg.get("payload", {}).get("headers", [])} | |
| labels = msg.get("labelIds", []) | |
| output.append({ | |
| "id": msg["id"], | |
| "thread_id": msg["threadId"], | |
| "from": headers.get("From", ""), | |
| "to": headers.get("To", ""), | |
| "subject": headers.get("Subject", ""), | |
| "date": headers.get("Date", ""), | |
| "snippet": msg.get("snippet", ""), | |
| "labels": labels, | |
| "unread": "UNREAD" in labels, | |
| }) | |
| print(json.dumps(output, indent=2, default=str)) | |
| else: | |
| for ref in msg_refs: | |
| msg = service.users().messages().get(userId="me", id=ref["id"], format="metadata", | |
| metadataHeaders=["From", "Subject", "Date"]).execute() | |
| headers = {h["name"]: h["value"] for h in msg.get("payload", {}).get("headers", [])} | |
| labels = msg.get("labelIds", []) | |
| status = "[UNREAD]" if "UNREAD" in labels else "" | |
| print(f"{status} {headers.get('Date', '?')} | From: {headers.get('From', '?')}") | |
| print(f" Subject: {headers.get('Subject', '?')}") | |
| print(f" ID: {msg['id']}") | |
| print() | |
| def cmd_read(args): | |
| """Read a specific email by ID.""" | |
| service = get_service(args.account) | |
| try: | |
| msg = service.users().messages().get(userId="me", id=args.message_id, format="full").execute() | |
| except Exception as e: | |
| print(f"ERROR: Could not read message {args.message_id}: {e}", file=sys.stderr) | |
| sys.exit(1) | |
| headers = {h["name"]: h["value"] for h in msg["payload"]["headers"]} | |
| body_plain = _extract_body(msg["payload"]) | |
| body_html = _extract_html(msg["payload"]) | |
| attachments = _extract_attachments(msg["payload"]) | |
| if args.json: | |
| output = { | |
| "id": msg["id"], | |
| "thread_id": msg["threadId"], | |
| "from": headers.get("From", ""), | |
| "to": headers.get("To", ""), | |
| "cc": headers.get("Cc", ""), | |
| "subject": headers.get("Subject", ""), | |
| "date": headers.get("Date", ""), | |
| "body_plain": body_plain, | |
| "body_html": body_html, | |
| "attachments": attachments, | |
| } | |
| print(json.dumps(output, indent=2, default=str)) | |
| else: | |
| print(f"From: {headers.get('From', '?')}") | |
| print(f"To: {headers.get('To', '?')}") | |
| cc = headers.get("Cc", "") | |
| if cc: | |
| print(f"CC: {cc}") | |
| print(f"Date: {headers.get('Date', '?')}") | |
| print(f"Subject: {headers.get('Subject', '?')}") | |
| print("-" * 60) | |
| print(body_plain or body_html or "(no body)") | |
| if attachments: | |
| print("-" * 60) | |
| print(f"Attachments: {', '.join(attachments)}") | |
| def cmd_search(args): | |
| """Search emails with Gmail query syntax.""" | |
| service = get_service(args.account) | |
| try: | |
| result = service.users().messages().list(userId="me", q=args.query, maxResults=args.limit).execute() | |
| except Exception as e: | |
| print(f"ERROR: Search failed: {e}", file=sys.stderr) | |
| sys.exit(1) | |
| msg_refs = result.get("messages", []) | |
| if args.json: | |
| output = [] | |
| for ref in msg_refs: | |
| msg = service.users().messages().get(userId="me", id=ref["id"], format="metadata", | |
| metadataHeaders=["From", "Subject", "Date"]).execute() | |
| headers = {h["name"]: h["value"] for h in msg.get("payload", {}).get("headers", [])} | |
| output.append({ | |
| "id": msg["id"], | |
| "from": headers.get("From", ""), | |
| "subject": headers.get("Subject", ""), | |
| "date": headers.get("Date", ""), | |
| "snippet": msg.get("snippet", ""), | |
| }) | |
| print(json.dumps(output, indent=2, default=str)) | |
| else: | |
| print(f"Found {len(msg_refs)} messages matching: {args.query}\n") | |
| for ref in msg_refs: | |
| msg = service.users().messages().get(userId="me", id=ref["id"], format="metadata", | |
| metadataHeaders=["From", "Subject", "Date"]).execute() | |
| headers = {h["name"]: h["value"] for h in msg.get("payload", {}).get("headers", [])} | |
| print(f"{headers.get('Date', '?')} | From: {headers.get('From', '?')}") | |
| print(f" Subject: {headers.get('Subject', '?')}") | |
| print(f" ID: {msg['id']}") | |
| print() | |
| def cmd_send(args): | |
| """Send an email.""" | |
| import email.mime.text | |
| import email.mime.multipart | |
| import email.mime.base | |
| import mimetypes | |
| service = get_service(args.account) | |
| sender = ACCOUNTS.get(args.account, args.account) | |
| # Read body from stdin if needed | |
| body = args.body | |
| if body == "-": | |
| body = sys.stdin.read() | |
| if args.attachments: | |
| msg = email.mime.multipart.MIMEMultipart() | |
| if args.html: | |
| msg.attach(email.mime.text.MIMEText(body, "html")) | |
| else: | |
| msg.attach(email.mime.text.MIMEText(body, "plain")) | |
| for filepath in args.attachments: | |
| content_type, _ = mimetypes.guess_type(filepath) | |
| if content_type is None: | |
| content_type = "application/octet-stream" | |
| main_type, sub_type = content_type.split("/", 1) | |
| with open(filepath, "rb") as f: | |
| attachment = email.mime.base.MIMEBase(main_type, sub_type) | |
| attachment.set_payload(f.read()) | |
| import email.encoders | |
| email.encoders.encode_base64(attachment) | |
| attachment.add_header("Content-Disposition", "attachment", filename=os.path.basename(filepath)) | |
| msg.attach(attachment) | |
| else: | |
| if args.html: | |
| msg = email.mime.text.MIMEText(body, "html") | |
| else: | |
| msg = email.mime.text.MIMEText(body, "plain") | |
| msg["to"] = args.to | |
| msg["from"] = sender | |
| msg["subject"] = args.subject | |
| if args.cc: | |
| msg["cc"] = args.cc | |
| if args.bcc: | |
| msg["bcc"] = args.bcc | |
| raw = base64.urlsafe_b64encode(msg.as_bytes()).decode() | |
| try: | |
| sent = service.users().messages().send(userId="me", body={"raw": raw}).execute() | |
| except Exception as e: | |
| print(f"ERROR: Send failed: {e}", file=sys.stderr) | |
| sys.exit(1) | |
| if args.json: | |
| print(json.dumps({"status": "sent", "id": sent.get("id", "unknown")}, indent=2)) | |
| else: | |
| print(f"Email sent successfully. ID: {sent.get('id', '?')}") | |
| def cmd_reply(args): | |
| """Reply to an email.""" | |
| import email.mime.text | |
| service = get_service(args.account) | |
| try: | |
| original = service.users().messages().get(userId="me", id=args.message_id, format="metadata", | |
| metadataHeaders=["From", "To", "Subject", "Message-ID", "References", "In-Reply-To"]).execute() | |
| except Exception as e: | |
| print(f"ERROR: Could not find message {args.message_id}: {e}", file=sys.stderr) | |
| sys.exit(1) | |
| headers = {h["name"]: h["value"] for h in original.get("payload", {}).get("headers", [])} | |
| thread_id = original["threadId"] | |
| body = args.body | |
| if body == "-": | |
| body = sys.stdin.read() | |
| reply_to = headers.get("From", "") | |
| subject = headers.get("Subject", "") | |
| if not subject.lower().startswith("re:"): | |
| subject = f"Re: {subject}" | |
| message_id = headers.get("Message-ID", "") | |
| references = headers.get("References", "") | |
| if message_id: | |
| references = f"{references} {message_id}".strip() | |
| sender = ACCOUNTS.get(args.account, args.account) | |
| msg = email.mime.text.MIMEText(body) | |
| msg["to"] = reply_to | |
| msg["from"] = sender | |
| msg["subject"] = subject | |
| if message_id: | |
| msg["In-Reply-To"] = message_id | |
| if references: | |
| msg["References"] = references | |
| raw = base64.urlsafe_b64encode(msg.as_bytes()).decode() | |
| try: | |
| sent = service.users().messages().send(userId="me", body={"raw": raw, "threadId": thread_id}).execute() | |
| except Exception as e: | |
| print(f"ERROR: Reply failed: {e}", file=sys.stderr) | |
| sys.exit(1) | |
| if args.json: | |
| print(json.dumps({"status": "sent", "id": sent.get("id", "unknown")}, indent=2)) | |
| else: | |
| print(f"Reply sent successfully. ID: {sent.get('id', '?')}") | |
| def cmd_labels(args): | |
| """List all labels.""" | |
| service = get_service(args.account) | |
| try: | |
| result = service.users().labels().list(userId="me").execute() | |
| except Exception as e: | |
| print(f"ERROR: Failed to list labels: {e}", file=sys.stderr) | |
| sys.exit(1) | |
| labels = result.get("labels", []) | |
| if args.json: | |
| output = [{"id": l["id"], "name": l["name"]} for l in labels] | |
| print(json.dumps(output, indent=2)) | |
| else: | |
| for label in sorted(labels, key=lambda l: l["name"]): | |
| print(f"{label['name']} (ID: {label['id']})") | |
| def cmd_mark_read(args): | |
| """Mark message(s) as read.""" | |
| service = get_service(args.account) | |
| for msg_id in args.message_ids: | |
| try: | |
| service.users().messages().modify( | |
| userId='me', id=msg_id, | |
| body={'removeLabelIds': ['UNREAD']} | |
| ).execute() | |
| print(f"Marked as read: {msg_id}") | |
| except Exception as e: | |
| print(f"ERROR marking {msg_id} as read: {e}", file=sys.stderr) | |
| def cmd_mark_unread(args): | |
| """Mark message(s) as unread.""" | |
| service = get_service(args.account) | |
| for msg_id in args.message_ids: | |
| try: | |
| service.users().messages().modify( | |
| userId='me', id=msg_id, | |
| body={'addLabelIds': ['UNREAD']} | |
| ).execute() | |
| print(f"Marked as unread: {msg_id}") | |
| except Exception as e: | |
| print(f"ERROR marking {msg_id} as unread: {e}", file=sys.stderr) | |
| def cmd_archive(args): | |
| """Archive message(s) by removing from inbox.""" | |
| service = get_service(args.account) | |
| for msg_id in args.message_ids: | |
| try: | |
| service.users().messages().modify( | |
| userId='me', id=msg_id, | |
| body={'removeLabelIds': ['INBOX']} | |
| ).execute() | |
| print(f"Archived: {msg_id}") | |
| except Exception as e: | |
| print(f"ERROR archiving {msg_id}: {e}", file=sys.stderr) | |
| def cmd_star(args): | |
| """Star message(s).""" | |
| service = get_service(args.account) | |
| for msg_id in args.message_ids: | |
| try: | |
| service.users().messages().modify( | |
| userId='me', id=msg_id, | |
| body={'addLabelIds': ['STARRED']} | |
| ).execute() | |
| print(f"Starred: {msg_id}") | |
| except Exception as e: | |
| print(f"ERROR starring {msg_id}: {e}", file=sys.stderr) | |
| def cmd_unstar(args): | |
| """Remove star from message(s).""" | |
| service = get_service(args.account) | |
| for msg_id in args.message_ids: | |
| try: | |
| service.users().messages().modify( | |
| userId='me', id=msg_id, | |
| body={'removeLabelIds': ['STARRED']} | |
| ).execute() | |
| print(f"Unstarred: {msg_id}") | |
| except Exception as e: | |
| print(f"ERROR unstarring {msg_id}: {e}", file=sys.stderr) | |
| def cmd_trash(args): | |
| """Move message(s) to trash.""" | |
| service = get_service(args.account) | |
| for msg_id in args.message_ids: | |
| try: | |
| service.users().messages().trash(userId='me', id=msg_id).execute() | |
| print(f"Trashed: {msg_id}") | |
| except Exception as e: | |
| print(f"ERROR trashing {msg_id}: {e}", file=sys.stderr) | |
| def cmd_accounts(args): | |
| """List configured accounts.""" | |
| print("Configured account aliases:") | |
| for alias, email in ACCOUNTS.items(): | |
| _, safe_name = _resolve_account(alias) | |
| token_file = CONFIG_DIR / f"token_{safe_name}.json" | |
| status = "authenticated" if token_file.exists() else "NOT SET UP" | |
| print(f" {alias}: {email} [{status}]") | |
| def cmd_setup(args): | |
| """Print setup instructions.""" | |
| print(""" | |
| Gmail Skill Setup | |
| ================= | |
| You only need ONE Google Cloud project for all accounts. | |
| 1. Go to https://console.cloud.google.com/ | |
| 2. Create a project (e.g. "Claude Gmail Access") | |
| 3. Enable the Gmail API (APIs & Services > Library) | |
| 4. Configure OAuth consent screen: | |
| - External type | |
| - Add ALL email addresses as test users | |
| - Add scope: https://mail.google.com/ | |
| 5. Create OAuth credentials: | |
| - APIs & Services > Credentials > Create > OAuth client ID > Desktop app | |
| - Download the JSON file | |
| 6. Copy the SAME JSON file for each account (just rename): | |
| """) | |
| for alias, email in ACCOUNTS.items(): | |
| _, safe_name = _resolve_account(alias) | |
| print(f" cp client_secret.json ~/.claude/skills/gmail/config/client_secret_{safe_name}.json") | |
| print(""" | |
| 7. Authorize each account (browser opens for OAuth): | |
| python3 gmail_cli.py --account personal list --limit 1 | |
| python3 gmail_cli.py --account work list --limit 1 | |
| (etc.) | |
| 8. IMPORTANT: Publish your OAuth app (APIs & Services > OAuth consent screen | |
| > Publish App) or refresh tokens expire after 7 days! | |
| """) | |
| def main(): | |
| parser = argparse.ArgumentParser( | |
| description="Gmail CLI for Claude Code", | |
| formatter_class=argparse.RawDescriptionHelpFormatter | |
| ) | |
| parser.add_argument( | |
| "--account", "-a", | |
| default="personal", | |
| help="Account to use: personal, work, business (or full email)" | |
| ) | |
| parser.add_argument( | |
| "--json", "-j", | |
| action="store_true", | |
| help="Output in JSON format" | |
| ) | |
| subparsers = parser.add_subparsers(dest="command", help="Commands") | |
| # List command | |
| list_parser = subparsers.add_parser("list", help="List recent emails") | |
| list_parser.add_argument("--limit", "-n", type=int, default=10, help="Number of emails") | |
| list_parser.add_argument("--unread", "-u", action="store_true", help="Only unread") | |
| list_parser.add_argument("--starred", "-s", action="store_true", help="Only starred") | |
| list_parser.add_argument("--label", "-l", help="Filter by label") | |
| list_parser.set_defaults(func=cmd_list) | |
| # Read command | |
| read_parser = subparsers.add_parser("read", help="Read a specific email") | |
| read_parser.add_argument("message_id", help="Message ID") | |
| read_parser.set_defaults(func=cmd_read) | |
| # Search command | |
| search_parser = subparsers.add_parser("search", help="Search emails") | |
| search_parser.add_argument("query", help="Gmail search query") | |
| search_parser.add_argument("--limit", "-n", type=int, default=20, help="Max results") | |
| search_parser.set_defaults(func=cmd_search) | |
| # Send command | |
| send_parser = subparsers.add_parser("send", help="Send an email") | |
| send_parser.add_argument("--to", "-t", required=True, help="Recipient") | |
| send_parser.add_argument("--subject", "-s", required=True, help="Subject") | |
| send_parser.add_argument("--body", "-b", default="-", help="Body (use - for stdin)") | |
| send_parser.add_argument("--cc", help="CC recipients") | |
| send_parser.add_argument("--bcc", help="BCC recipients") | |
| send_parser.add_argument("--html", action="store_true", help="Body is HTML") | |
| send_parser.add_argument("--attachments", nargs="*", help="File paths to attach") | |
| send_parser.add_argument("--no-signature", action="store_true", help="Omit signature") | |
| send_parser.set_defaults(func=cmd_send) | |
| # Reply command | |
| reply_parser = subparsers.add_parser("reply", help="Reply to an email") | |
| reply_parser.add_argument("message_id", help="Message ID to reply to") | |
| reply_parser.add_argument("--body", "-b", default="-", help="Body (use - for stdin)") | |
| reply_parser.set_defaults(func=cmd_reply) | |
| # Labels command | |
| labels_parser = subparsers.add_parser("labels", help="List all labels") | |
| labels_parser.set_defaults(func=cmd_labels) | |
| # Mark read/unread commands | |
| mark_read_parser = subparsers.add_parser("mark-read", help="Mark messages as read") | |
| mark_read_parser.add_argument("message_ids", nargs="+", help="Message IDs") | |
| mark_read_parser.set_defaults(func=cmd_mark_read) | |
| mark_unread_parser = subparsers.add_parser("mark-unread", help="Mark messages as unread") | |
| mark_unread_parser.add_argument("message_ids", nargs="+", help="Message IDs") | |
| mark_unread_parser.set_defaults(func=cmd_mark_unread) | |
| # Archive command | |
| archive_parser = subparsers.add_parser("archive", help="Archive messages (remove from inbox)") | |
| archive_parser.add_argument("message_ids", nargs="+", help="Message IDs") | |
| archive_parser.set_defaults(func=cmd_archive) | |
| # Star command | |
| star_parser = subparsers.add_parser("star", help="Star messages") | |
| star_parser.add_argument("message_ids", nargs="+", help="Message IDs") | |
| star_parser.set_defaults(func=cmd_star) | |
| # Unstar command | |
| unstar_parser = subparsers.add_parser("unstar", help="Remove star from messages") | |
| unstar_parser.add_argument("message_ids", nargs="+", help="Message IDs") | |
| unstar_parser.set_defaults(func=cmd_unstar) | |
| # Trash command | |
| trash_parser = subparsers.add_parser("trash", help="Move messages to trash") | |
| trash_parser.add_argument("message_ids", nargs="+", help="Message IDs") | |
| trash_parser.set_defaults(func=cmd_trash) | |
| # Accounts command | |
| accounts_parser = subparsers.add_parser("accounts", help="List configured accounts") | |
| accounts_parser.set_defaults(func=cmd_accounts) | |
| # Setup command | |
| setup_parser = subparsers.add_parser("setup", help="Show setup instructions") | |
| setup_parser.set_defaults(func=cmd_setup) | |
| args = parser.parse_args() | |
| if not args.command: | |
| parser.print_help() | |
| sys.exit(1) | |
| args.func(args) | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment