Skip to content

Instantly share code, notes, and snippets.

@swombat
Last active February 20, 2026 07:40
Show Gist options
  • Select an option

  • Save swombat/6f9f25d54710fdb94aa17840155c749b to your computer and use it in GitHub Desktop.

Select an option

Save swombat/6f9f25d54710fdb94aa17840155c749b to your computer and use it in GitHub Desktop.
Gmail CLI for Claude Code - multi-account Gmail access via command line
#!/usr/bin/env python3
"""
Gmail CLI tool for Claude Code skill.
Supports multiple Gmail accounts via separate credential files.
Uses the Google Gmail API directly (no third-party wrappers).
"""
import argparse
import base64
import json
import os
import sys
from pathlib import Path
# Config directory for storing credentials
CONFIG_DIR = Path.home() / ".claude" / "skills" / "gmail" / "config"
# Account aliases mapping to credential files
# Customize these with your own email addresses
ACCOUNTS = {
"personal": "you@gmail.com",
"work": "you@company.com",
"business": "you@business.io",
}
def _resolve_account(account: str):
"""Resolve account alias to email and safe filename."""
email = ACCOUNTS.get(account, account)
safe_name = email.replace("@", "_at_").replace(".", "_")
return email, safe_name
def _run_oauth_flow(client_secret_file: str, token_file: str, email: str):
"""Perform OAuth flow and save token as plain JSON."""
from google_auth_oauthlib.flow import InstalledAppFlow
SCOPES = ['https://mail.google.com/']
print(f"\nAuthenticating: {email}", file=sys.stderr)
print(f"A browser window will open — sign in with: {email}\n", file=sys.stderr)
flow = InstalledAppFlow.from_client_secrets_file(client_secret_file, SCOPES)
creds = flow.run_local_server(port=0)
# Save as plain JSON (compatible with get_service)
token_data = {
"access_token": creds.token,
"refresh_token": creds.refresh_token,
"token_uri": creds.token_uri or "https://oauth2.googleapis.com/token",
"client_id": creds.client_id,
"client_secret": creds.client_secret,
}
with open(token_file, "w") as f:
json.dump(token_data, f, indent=2)
print(f"Token saved for {email}\n", file=sys.stderr)
def get_service(account: str):
"""Get a Gmail API service client for the given account.
Triggers OAuth flow if no token exists yet."""
from google.oauth2.credentials import Credentials
from googleapiclient.discovery import build
email, safe_name = _resolve_account(account)
client_secret = CONFIG_DIR / f"client_secret_{safe_name}.json"
token_path = CONFIG_DIR / f"token_{safe_name}.json"
if not client_secret.exists():
print(f"ERROR: No credentials found for {email}", file=sys.stderr)
print(f"Expected: {client_secret}", file=sys.stderr)
print(f"\nRun 'gmail_cli.py setup' for instructions.", file=sys.stderr)
sys.exit(1)
if not token_path.exists():
_run_oauth_flow(str(client_secret), str(token_path), email)
with open(token_path) as f:
td = json.load(f)
creds = Credentials(
token=td.get("access_token"),
refresh_token=td.get("refresh_token"),
token_uri=td.get("token_uri", "https://oauth2.googleapis.com/token"),
client_id=td.get("client_id"),
client_secret=td.get("client_secret"),
)
return build("gmail", "v1", credentials=creds, cache_discovery=False)
def _extract_body(payload):
"""Extract text/plain body from a Gmail API message payload."""
if payload.get("mimeType") == "text/plain":
data = payload["body"].get("data", "")
if data:
return base64.urlsafe_b64decode(data).decode("utf-8", errors="replace")
for part in payload.get("parts", []):
result = _extract_body(part)
if result:
return result
return ""
def _extract_html(payload):
"""Extract text/html body from a Gmail API message payload."""
if payload.get("mimeType") == "text/html":
data = payload["body"].get("data", "")
if data:
return base64.urlsafe_b64decode(data).decode("utf-8", errors="replace")
for part in payload.get("parts", []):
result = _extract_html(part)
if result:
return result
return ""
def _extract_attachments(payload, attachments=None):
"""Extract attachment filenames from a Gmail API message payload."""
if attachments is None:
attachments = []
filename = payload.get("filename")
if filename:
attachments.append(filename)
for part in payload.get("parts", []):
_extract_attachments(part, attachments)
return attachments
def cmd_list(args):
"""List recent emails."""
service = get_service(args.account)
# Build query
query_parts = ["in:inbox"]
if args.unread:
query_parts.append("is:unread")
if args.starred:
query_parts = ["is:starred"]
if args.label:
query_parts = [f"label:{args.label}"]
query_parts.append("newer_than:30d")
query = " ".join(query_parts)
try:
result = service.users().messages().list(userId="me", q=query, maxResults=args.limit).execute()
except Exception as e:
print(f"ERROR: List failed: {e}", file=sys.stderr)
sys.exit(1)
msg_refs = result.get("messages", [])
if args.json:
output = []
for ref in msg_refs:
msg = service.users().messages().get(userId="me", id=ref["id"], format="metadata",
metadataHeaders=["From", "To", "Subject", "Date"]).execute()
headers = {h["name"]: h["value"] for h in msg.get("payload", {}).get("headers", [])}
labels = msg.get("labelIds", [])
output.append({
"id": msg["id"],
"thread_id": msg["threadId"],
"from": headers.get("From", ""),
"to": headers.get("To", ""),
"subject": headers.get("Subject", ""),
"date": headers.get("Date", ""),
"snippet": msg.get("snippet", ""),
"labels": labels,
"unread": "UNREAD" in labels,
})
print(json.dumps(output, indent=2, default=str))
else:
for ref in msg_refs:
msg = service.users().messages().get(userId="me", id=ref["id"], format="metadata",
metadataHeaders=["From", "Subject", "Date"]).execute()
headers = {h["name"]: h["value"] for h in msg.get("payload", {}).get("headers", [])}
labels = msg.get("labelIds", [])
status = "[UNREAD]" if "UNREAD" in labels else ""
print(f"{status} {headers.get('Date', '?')} | From: {headers.get('From', '?')}")
print(f" Subject: {headers.get('Subject', '?')}")
print(f" ID: {msg['id']}")
print()
def cmd_read(args):
"""Read a specific email by ID."""
service = get_service(args.account)
try:
msg = service.users().messages().get(userId="me", id=args.message_id, format="full").execute()
except Exception as e:
print(f"ERROR: Could not read message {args.message_id}: {e}", file=sys.stderr)
sys.exit(1)
headers = {h["name"]: h["value"] for h in msg["payload"]["headers"]}
body_plain = _extract_body(msg["payload"])
body_html = _extract_html(msg["payload"])
attachments = _extract_attachments(msg["payload"])
if args.json:
output = {
"id": msg["id"],
"thread_id": msg["threadId"],
"from": headers.get("From", ""),
"to": headers.get("To", ""),
"cc": headers.get("Cc", ""),
"subject": headers.get("Subject", ""),
"date": headers.get("Date", ""),
"body_plain": body_plain,
"body_html": body_html,
"attachments": attachments,
}
print(json.dumps(output, indent=2, default=str))
else:
print(f"From: {headers.get('From', '?')}")
print(f"To: {headers.get('To', '?')}")
cc = headers.get("Cc", "")
if cc:
print(f"CC: {cc}")
print(f"Date: {headers.get('Date', '?')}")
print(f"Subject: {headers.get('Subject', '?')}")
print("-" * 60)
print(body_plain or body_html or "(no body)")
if attachments:
print("-" * 60)
print(f"Attachments: {', '.join(attachments)}")
def cmd_search(args):
"""Search emails with Gmail query syntax."""
service = get_service(args.account)
try:
result = service.users().messages().list(userId="me", q=args.query, maxResults=args.limit).execute()
except Exception as e:
print(f"ERROR: Search failed: {e}", file=sys.stderr)
sys.exit(1)
msg_refs = result.get("messages", [])
if args.json:
output = []
for ref in msg_refs:
msg = service.users().messages().get(userId="me", id=ref["id"], format="metadata",
metadataHeaders=["From", "Subject", "Date"]).execute()
headers = {h["name"]: h["value"] for h in msg.get("payload", {}).get("headers", [])}
output.append({
"id": msg["id"],
"from": headers.get("From", ""),
"subject": headers.get("Subject", ""),
"date": headers.get("Date", ""),
"snippet": msg.get("snippet", ""),
})
print(json.dumps(output, indent=2, default=str))
else:
print(f"Found {len(msg_refs)} messages matching: {args.query}\n")
for ref in msg_refs:
msg = service.users().messages().get(userId="me", id=ref["id"], format="metadata",
metadataHeaders=["From", "Subject", "Date"]).execute()
headers = {h["name"]: h["value"] for h in msg.get("payload", {}).get("headers", [])}
print(f"{headers.get('Date', '?')} | From: {headers.get('From', '?')}")
print(f" Subject: {headers.get('Subject', '?')}")
print(f" ID: {msg['id']}")
print()
def cmd_send(args):
"""Send an email."""
import email.mime.text
import email.mime.multipart
import email.mime.base
import mimetypes
service = get_service(args.account)
sender = ACCOUNTS.get(args.account, args.account)
# Read body from stdin if needed
body = args.body
if body == "-":
body = sys.stdin.read()
if args.attachments:
msg = email.mime.multipart.MIMEMultipart()
if args.html:
msg.attach(email.mime.text.MIMEText(body, "html"))
else:
msg.attach(email.mime.text.MIMEText(body, "plain"))
for filepath in args.attachments:
content_type, _ = mimetypes.guess_type(filepath)
if content_type is None:
content_type = "application/octet-stream"
main_type, sub_type = content_type.split("/", 1)
with open(filepath, "rb") as f:
attachment = email.mime.base.MIMEBase(main_type, sub_type)
attachment.set_payload(f.read())
import email.encoders
email.encoders.encode_base64(attachment)
attachment.add_header("Content-Disposition", "attachment", filename=os.path.basename(filepath))
msg.attach(attachment)
else:
if args.html:
msg = email.mime.text.MIMEText(body, "html")
else:
msg = email.mime.text.MIMEText(body, "plain")
msg["to"] = args.to
msg["from"] = sender
msg["subject"] = args.subject
if args.cc:
msg["cc"] = args.cc
if args.bcc:
msg["bcc"] = args.bcc
raw = base64.urlsafe_b64encode(msg.as_bytes()).decode()
try:
sent = service.users().messages().send(userId="me", body={"raw": raw}).execute()
except Exception as e:
print(f"ERROR: Send failed: {e}", file=sys.stderr)
sys.exit(1)
if args.json:
print(json.dumps({"status": "sent", "id": sent.get("id", "unknown")}, indent=2))
else:
print(f"Email sent successfully. ID: {sent.get('id', '?')}")
def cmd_reply(args):
"""Reply to an email."""
import email.mime.text
service = get_service(args.account)
try:
original = service.users().messages().get(userId="me", id=args.message_id, format="metadata",
metadataHeaders=["From", "To", "Subject", "Message-ID", "References", "In-Reply-To"]).execute()
except Exception as e:
print(f"ERROR: Could not find message {args.message_id}: {e}", file=sys.stderr)
sys.exit(1)
headers = {h["name"]: h["value"] for h in original.get("payload", {}).get("headers", [])}
thread_id = original["threadId"]
body = args.body
if body == "-":
body = sys.stdin.read()
reply_to = headers.get("From", "")
subject = headers.get("Subject", "")
if not subject.lower().startswith("re:"):
subject = f"Re: {subject}"
message_id = headers.get("Message-ID", "")
references = headers.get("References", "")
if message_id:
references = f"{references} {message_id}".strip()
sender = ACCOUNTS.get(args.account, args.account)
msg = email.mime.text.MIMEText(body)
msg["to"] = reply_to
msg["from"] = sender
msg["subject"] = subject
if message_id:
msg["In-Reply-To"] = message_id
if references:
msg["References"] = references
raw = base64.urlsafe_b64encode(msg.as_bytes()).decode()
try:
sent = service.users().messages().send(userId="me", body={"raw": raw, "threadId": thread_id}).execute()
except Exception as e:
print(f"ERROR: Reply failed: {e}", file=sys.stderr)
sys.exit(1)
if args.json:
print(json.dumps({"status": "sent", "id": sent.get("id", "unknown")}, indent=2))
else:
print(f"Reply sent successfully. ID: {sent.get('id', '?')}")
def cmd_labels(args):
"""List all labels."""
service = get_service(args.account)
try:
result = service.users().labels().list(userId="me").execute()
except Exception as e:
print(f"ERROR: Failed to list labels: {e}", file=sys.stderr)
sys.exit(1)
labels = result.get("labels", [])
if args.json:
output = [{"id": l["id"], "name": l["name"]} for l in labels]
print(json.dumps(output, indent=2))
else:
for label in sorted(labels, key=lambda l: l["name"]):
print(f"{label['name']} (ID: {label['id']})")
def cmd_mark_read(args):
"""Mark message(s) as read."""
service = get_service(args.account)
for msg_id in args.message_ids:
try:
service.users().messages().modify(
userId='me', id=msg_id,
body={'removeLabelIds': ['UNREAD']}
).execute()
print(f"Marked as read: {msg_id}")
except Exception as e:
print(f"ERROR marking {msg_id} as read: {e}", file=sys.stderr)
def cmd_mark_unread(args):
"""Mark message(s) as unread."""
service = get_service(args.account)
for msg_id in args.message_ids:
try:
service.users().messages().modify(
userId='me', id=msg_id,
body={'addLabelIds': ['UNREAD']}
).execute()
print(f"Marked as unread: {msg_id}")
except Exception as e:
print(f"ERROR marking {msg_id} as unread: {e}", file=sys.stderr)
def cmd_archive(args):
"""Archive message(s) by removing from inbox."""
service = get_service(args.account)
for msg_id in args.message_ids:
try:
service.users().messages().modify(
userId='me', id=msg_id,
body={'removeLabelIds': ['INBOX']}
).execute()
print(f"Archived: {msg_id}")
except Exception as e:
print(f"ERROR archiving {msg_id}: {e}", file=sys.stderr)
def cmd_star(args):
"""Star message(s)."""
service = get_service(args.account)
for msg_id in args.message_ids:
try:
service.users().messages().modify(
userId='me', id=msg_id,
body={'addLabelIds': ['STARRED']}
).execute()
print(f"Starred: {msg_id}")
except Exception as e:
print(f"ERROR starring {msg_id}: {e}", file=sys.stderr)
def cmd_unstar(args):
"""Remove star from message(s)."""
service = get_service(args.account)
for msg_id in args.message_ids:
try:
service.users().messages().modify(
userId='me', id=msg_id,
body={'removeLabelIds': ['STARRED']}
).execute()
print(f"Unstarred: {msg_id}")
except Exception as e:
print(f"ERROR unstarring {msg_id}: {e}", file=sys.stderr)
def cmd_trash(args):
"""Move message(s) to trash."""
service = get_service(args.account)
for msg_id in args.message_ids:
try:
service.users().messages().trash(userId='me', id=msg_id).execute()
print(f"Trashed: {msg_id}")
except Exception as e:
print(f"ERROR trashing {msg_id}: {e}", file=sys.stderr)
def cmd_accounts(args):
"""List configured accounts."""
print("Configured account aliases:")
for alias, email in ACCOUNTS.items():
_, safe_name = _resolve_account(alias)
token_file = CONFIG_DIR / f"token_{safe_name}.json"
status = "authenticated" if token_file.exists() else "NOT SET UP"
print(f" {alias}: {email} [{status}]")
def cmd_setup(args):
"""Print setup instructions."""
print("""
Gmail Skill Setup
=================
You only need ONE Google Cloud project for all accounts.
1. Go to https://console.cloud.google.com/
2. Create a project (e.g. "Claude Gmail Access")
3. Enable the Gmail API (APIs & Services > Library)
4. Configure OAuth consent screen:
- External type
- Add ALL email addresses as test users
- Add scope: https://mail.google.com/
5. Create OAuth credentials:
- APIs & Services > Credentials > Create > OAuth client ID > Desktop app
- Download the JSON file
6. Copy the SAME JSON file for each account (just rename):
""")
for alias, email in ACCOUNTS.items():
_, safe_name = _resolve_account(alias)
print(f" cp client_secret.json ~/.claude/skills/gmail/config/client_secret_{safe_name}.json")
print("""
7. Authorize each account (browser opens for OAuth):
python3 gmail_cli.py --account personal list --limit 1
python3 gmail_cli.py --account work list --limit 1
(etc.)
8. IMPORTANT: Publish your OAuth app (APIs & Services > OAuth consent screen
> Publish App) or refresh tokens expire after 7 days!
""")
def main():
parser = argparse.ArgumentParser(
description="Gmail CLI for Claude Code",
formatter_class=argparse.RawDescriptionHelpFormatter
)
parser.add_argument(
"--account", "-a",
default="personal",
help="Account to use: personal, work, business (or full email)"
)
parser.add_argument(
"--json", "-j",
action="store_true",
help="Output in JSON format"
)
subparsers = parser.add_subparsers(dest="command", help="Commands")
# List command
list_parser = subparsers.add_parser("list", help="List recent emails")
list_parser.add_argument("--limit", "-n", type=int, default=10, help="Number of emails")
list_parser.add_argument("--unread", "-u", action="store_true", help="Only unread")
list_parser.add_argument("--starred", "-s", action="store_true", help="Only starred")
list_parser.add_argument("--label", "-l", help="Filter by label")
list_parser.set_defaults(func=cmd_list)
# Read command
read_parser = subparsers.add_parser("read", help="Read a specific email")
read_parser.add_argument("message_id", help="Message ID")
read_parser.set_defaults(func=cmd_read)
# Search command
search_parser = subparsers.add_parser("search", help="Search emails")
search_parser.add_argument("query", help="Gmail search query")
search_parser.add_argument("--limit", "-n", type=int, default=20, help="Max results")
search_parser.set_defaults(func=cmd_search)
# Send command
send_parser = subparsers.add_parser("send", help="Send an email")
send_parser.add_argument("--to", "-t", required=True, help="Recipient")
send_parser.add_argument("--subject", "-s", required=True, help="Subject")
send_parser.add_argument("--body", "-b", default="-", help="Body (use - for stdin)")
send_parser.add_argument("--cc", help="CC recipients")
send_parser.add_argument("--bcc", help="BCC recipients")
send_parser.add_argument("--html", action="store_true", help="Body is HTML")
send_parser.add_argument("--attachments", nargs="*", help="File paths to attach")
send_parser.add_argument("--no-signature", action="store_true", help="Omit signature")
send_parser.set_defaults(func=cmd_send)
# Reply command
reply_parser = subparsers.add_parser("reply", help="Reply to an email")
reply_parser.add_argument("message_id", help="Message ID to reply to")
reply_parser.add_argument("--body", "-b", default="-", help="Body (use - for stdin)")
reply_parser.set_defaults(func=cmd_reply)
# Labels command
labels_parser = subparsers.add_parser("labels", help="List all labels")
labels_parser.set_defaults(func=cmd_labels)
# Mark read/unread commands
mark_read_parser = subparsers.add_parser("mark-read", help="Mark messages as read")
mark_read_parser.add_argument("message_ids", nargs="+", help="Message IDs")
mark_read_parser.set_defaults(func=cmd_mark_read)
mark_unread_parser = subparsers.add_parser("mark-unread", help="Mark messages as unread")
mark_unread_parser.add_argument("message_ids", nargs="+", help="Message IDs")
mark_unread_parser.set_defaults(func=cmd_mark_unread)
# Archive command
archive_parser = subparsers.add_parser("archive", help="Archive messages (remove from inbox)")
archive_parser.add_argument("message_ids", nargs="+", help="Message IDs")
archive_parser.set_defaults(func=cmd_archive)
# Star command
star_parser = subparsers.add_parser("star", help="Star messages")
star_parser.add_argument("message_ids", nargs="+", help="Message IDs")
star_parser.set_defaults(func=cmd_star)
# Unstar command
unstar_parser = subparsers.add_parser("unstar", help="Remove star from messages")
unstar_parser.add_argument("message_ids", nargs="+", help="Message IDs")
unstar_parser.set_defaults(func=cmd_unstar)
# Trash command
trash_parser = subparsers.add_parser("trash", help="Move messages to trash")
trash_parser.add_argument("message_ids", nargs="+", help="Message IDs")
trash_parser.set_defaults(func=cmd_trash)
# Accounts command
accounts_parser = subparsers.add_parser("accounts", help="List configured accounts")
accounts_parser.set_defaults(func=cmd_accounts)
# Setup command
setup_parser = subparsers.add_parser("setup", help="Show setup instructions")
setup_parser.set_defaults(func=cmd_setup)
args = parser.parse_args()
if not args.command:
parser.print_help()
sys.exit(1)
args.func(args)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment