Every shipped Litmus MCP tool — arguments, return shape, and a runnable Python example.

Tool reference

Every tool below is callable through any MCP client connected to https://litmushiring.com/mcp/ with a valid bearer token (Clerk OAuth or API key — see Authentication). Every call is scoped to the calling org — you cannot read or modify another org's data, and the server returns "not found" rather than a permission error so cross-tenant existence is never leaked.

The Python examples assume you already have an mcp.ClientSession open and initialized; see Quickstart for the connection boilerplate.

Submissions

Read-only tools for working with the submissions your candidates send back.

list_submissions#
python
async def list_submissions(
role_id: str | None = None,
candidate_id: str | None = None,
status: str | None = None,
limit: int = 50,
cursor: str | None = None,
) -> dict
Paginated list of submissions in your org, ordered by submitted-at descending. Filter by role, candidate, or status.

Arguments

role_id
str | None
Limit to one role.
candidate_id
str | None
Limit to one candidate.
status
str | None
One of pending, advance, reject. Invalid values raise.
limit
int
Page size, capped server-side at 100. Default 50.
cursor
str | None
Opaque page token from a prior response's nextCursor.

Returns

{ items: [...], nextCursor: str | null }

Each item has id, candidateId, candidateName, candidateEmail, assessmentId, assessmentName, roleId, roleName, status, submittedAt, score. nextCursor is null on the last page.

Example

python
# Walk every pending submission for one role.
cursor = None
while True:
  resp = await session.call_tool("list_submissions", {
      "role_id": "role_abc123",
      "status": "pending",
      "cursor": cursor,
  })
  page = resp.structuredContent
  for item in page["items"]:
      print(item["candidateName"], item["score"])
  cursor = page["nextCursor"]
  if cursor is None:
      break

Pagination is keyset-based — the cursor encodes (submittedAt, id) internally. Don't try to parse or fabricate a cursor; just round-trip whatever the server hands back.

get_submission#
python
async def get_submission(submission_id: str) -> dict
Full detail for one submission: candidate, assessment, role, latest report summary, and absolute URLs to the submission zip and walkthrough video.

Arguments

submission_idrequired
str
Submission ID from list_submissions.

Returns

All fields from list_submissions plus zipUrl, videoUrl, videoTranscript, and a report object (status, overallScore, verdict, summary, generatedAt) when one has been generated.

zipUrl and videoUrl are absolute, point at /mcp/artifacts/submission/{id}/zip|video on the same host, and accept the same bearer token used to call this tool. Either may be null if the artifact doesn't exist.

Example

python
import httpx

resp = await session.call_tool("get_submission", {
  "submission_id": "sub_abc123",
})
sub = resp.structuredContent

if sub["zipUrl"]:
  headers = {"Authorization": "Bearer litmus_sk_..."}
  async with httpx.AsyncClient() as client:
      r = await client.get(sub["zipUrl"], headers=headers)
      r.raise_for_status()
      open("submission.zip", "wb").write(r.content)

Submissions created before 2026 may have file artifacts but no zip blob — those return zipUrl: null rather than a broken link.

Pipeline

Read-only tools for understanding pipeline state across roles and candidates.

get_pipeline_status#
python
async def get_pipeline_status(role_id: str) -> dict
Disjoint stage counts for one role. Useful for dashboards and 'how many candidates are stuck at X' summaries.

Arguments

role_idrequired
str
Role to count.

Returns

{ roleId, roleTitle, total, stages: { invited, in_progress, pending_review, advanced, rejected, expired } }

All six stage keys are always present (zero when empty), and the counts sum to total.

Example

python
resp = await session.call_tool("get_pipeline_status", {
  "role_id": "role_abc123",
})
status = resp.structuredContent
print(f"{status['roleTitle']}: {status['total']} candidates")
for stage, count in status["stages"].items():
  print(f"  {stage}: {count}")

Each candidate appears in exactly one stage. Decisions (advance / reject) win over candidate status; submissions without a candidate decision land in pending_review.

list_candidates#
python
async def list_candidates(
role_id: str | None = None,
stage: str | None = None,
limit: int = 50,
cursor: str | None = None,
) -> dict
Paginated list of candidates in your org. Includes candidates who haven't submitted yet — use this when list_submissions would miss the people you want.

Arguments

role_id
str | None
Limit to one role.
stage
str | None
One of invited, in_progress, pending_review, advanced, rejected, expired. Invalid values raise.
limit
int
Page size, capped server-side at 100. Default 50.
cursor
str | None
Opaque page token from a prior response.

Returns

{ items: [...], nextCursor: str | null }

Each item has id, name, email, assessmentId, assessmentName, roleId, roleName, stage, source, invitedAt, submittedAt. Items are ordered by (invitedAt DESC, id DESC).

Example

python
resp = await session.call_tool("list_candidates", {
  "role_id": "role_abc123",
  "stage": "pending_review",
})
for c in resp.structuredContent["items"]:
  print(c["name"], c["email"], c["submittedAt"])
get_candidate_status#
python
async def get_candidate_status(candidate_id: str) -> dict
Full status detail for one candidate: current stage, lifecycle timestamps, source, ATS provider, and the linked assessment + role.

Arguments

candidate_idrequired
str
Candidate ID from list_candidates.

Returns

{ id, name, email, currentStage, lifecycle: { invitedAt, acceptedAt, startedAt, submittedAt }, source, atsProvider, assessment: { id, name }, role: { id, title } | null }

invitedAt is always populated; the other lifecycle timestamps are null until that event occurs.

Example

python
resp = await session.call_tool("get_candidate_status", {
  "candidate_id": "cand_abc123",
})
c = resp.structuredContent
print(f"{c['name']}{c['currentStage']}")
print(f"  invited:   {c['lifecycle']['invitedAt']}")
print(f"  submitted: {c['lifecycle']['submittedAt']}")

Assessment files

Write tools for building and iterating on assessments programmatically. They do not run the assessment generation agent — you bring the files, Litmus stores and serves them.

upload_assessment#
python
async def upload_assessment(
name: str,
files: list[dict],
language: str | None = None,
time_limit: int | None = None,
role_id: str | None = None,
) -> dict
Create a new assessment from caller-supplied files. The new assessment is immediately available as a draft in your dashboard; nothing is sent to candidates until you wire it up to a role.

Arguments

namerequired
str
Display title for the assessment.
filesrequired
list[dict]
List of { "path": str, "content": str } objects. Paths must be unique within the assessment.
language
str | None
Primary language hint (e.g. python, typescript) for the candidate UI.
time_limit
int | None
Suggested completion time in minutes.
role_id
str | None
Existing role to attach the assessment to.

Returns

{ assessmentId: str }

Example

python
resp = await session.call_tool("upload_assessment", {
  "name": "Backend take-home v2",
  "language": "python",
  "time_limit": 120,
  "files": [
      {"path": "README.md", "content": "# Build a small queue worker..."},
      {"path": "src/worker.py", "content": "# starter code\n"},
  ],
})
print("created", resp.structuredContent["assessmentId"])

Uploading via this tool does not consume credits — only AI-generated assessments do. Iterate by calling update_assessment_files rather than uploading new copies.

get_assessment_files#
python
async def get_assessment_files(assessment_id: str) -> dict
Pull the current files and version of an assessment. Call this before update_assessment_files to get the version you'll need for the optimistic-concurrency check.

Arguments

assessment_idrequired
str
Assessment ID.

Returns

{ assessmentId, name, version, files: [{ path, content }, ...] }

version increments on every successful update.

Example

python
resp = await session.call_tool("get_assessment_files", {
  "assessment_id": "asmt_abc123",
})
asmt = resp.structuredContent
for f in asmt["files"]:
  print(f["path"])
print("version:", asmt["version"])
update_assessment_files#
python
async def update_assessment_files(
assessment_id: str,
files: list[dict],
expected_version: int | None = None,
) -> dict
Replace the file set for an existing assessment. This is a full replace, not a patch — pass every file you want present after the call.

Arguments

assessment_idrequired
str
Assessment to update.
filesrequired
list[dict]
Complete { "path", "content" } list. Anything not in this list is removed.
expected_version
int | None
Version returned by your last get_assessment_files. If supplied, the write is refused (with a clear error) when another writer has bumped the version in between. Omit only if you genuinely want last-write-wins.

Returns

{ assessmentId, version } — the new version after the write.

Example

python
# Pull, edit, push back with a version guard.
resp = await session.call_tool("get_assessment_files", {
  "assessment_id": "asmt_abc123",
})
current = resp.structuredContent
files = current["files"]

# ...edit files locally...

resp = await session.call_tool("update_assessment_files", {
  "assessment_id": "asmt_abc123",
  "files": files,
  "expected_version": current["version"],
})
print("now at version", resp.structuredContent["version"])

On a version mismatch the error message includes the current version. The right move is to re-fetch with get_assessment_files, merge your changes against the new state, and retry.