Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions .github/workflows/mcp-tests.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
name: MCP integration tests

on:
push:
branches: ["**"]
pull_request:

jobs:
mcp-tests:
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v4

- name: Install Nix
uses: cachix/install-nix-action@v27
with:
nix_path: nixpkgs=channel:nixos-unstable

- name: Build echidna
run: nix build .#echidna --out-link result

- name: Add echidna to PATH
run: echo "$GITHUB_WORKSPACE/result/bin" >> "$GITHUB_PATH"

- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: "3.11"

- name: Install test dependencies
run: pip install pytest httpx

- name: Run MCP integration tests
run: pytest tests/mcp/test_mcp.py -v
52 changes: 52 additions & 0 deletions examples/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# Echidna MCP Agent Example

An AI agent that connects to a live Echidna fuzzing campaign via the [MCP](https://spec.modelcontextprotocol.io/) server and autonomously guides the fuzzer.

## Requirements

```
pip install langchain-anthropic langgraph httpx
```

## Usage

**1. Start Echidna with the MCP server enabled:**

```bash
echidna MyContract.sol --server 8080 --format text
```

The `--format text` flag is required — it disables the interactive TUI so the MCP server thread can run.

**2. Run the agent:**

```bash
export ANTHROPIC_API_KEY=your_key_here
python examples/mcp_agent.py
```

## What the agent does

The agent runs in a loop at a configurable interval. Each step it:

1. **Observes** — calls `status` to read coverage, iterations, and corpus size.
2. **Injects** — when coverage stagnates, asks Claude to suggest targeted transaction sequences and injects them via `inject_fuzz_transactions`.
3. **Resets** — periodically calls `clear_fuzz_priorities` to prevent the fuzzer from getting stuck on a single function.

## Available MCP tools

| Tool | Description |
|------|-------------|
| `status` | Campaign metrics: coverage, iterations, corpus size, last log lines |
| `target` | ABI of the target contract |
| `show_coverage` | Per-contract source coverage with line annotations |
| `dump_lcov` | Export coverage in LCOV format |
| `inject_fuzz_transactions` | Inject a semicolon-separated sequence of function calls |
| `clear_fuzz_priorities` | Reset function call weighting on all workers |
| `reload_corpus` | Reload transaction sequences from the corpus directory |

Transactions passed to `inject_fuzz_transactions` use Solidity call syntax and support `?` as a fuzzer wildcard:

```
transfer(0xdeadbeef00000000000000000000000000000001, 100);approve(?, ?)
```
166 changes: 166 additions & 0 deletions examples/mcp_agent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
#!/usr/bin/env python3
"""
LangGraph agent for Echidna MCP integration.

Connects to a running Echidna campaign via its MCP server and autonomously
guides the fuzzer: analyzes coverage, injects targeted transactions, and
resets priorities when coverage stagnates.

Requirements:
pip install langchain-anthropic langgraph httpx

Usage:
# Start Echidna with MCP enabled:
echidna MyContract.sol --server 8080 --format text

# Run the agent:
ANTHROPIC_API_KEY=... python examples/mcp_agent.py
"""

import os
import time
import httpx
from typing import TypedDict, List
from langchain_anthropic import ChatAnthropic
from langgraph.graph import StateGraph, END


# ---------------------------------------------------------------------------
# MCP client
# ---------------------------------------------------------------------------

def call_tool(tool: str, args: dict = None) -> str:
"""Call an MCP tool and return the text response."""
payload = {
"jsonrpc": "2.0", "id": 1,
"method": "tools/call",
"params": {"name": tool, "arguments": args or {}},
}
resp = httpx.post("http://localhost:8080/mcp", json=payload, timeout=30)
result = resp.json().get("result", {})
return result.get("content", [{}])[0].get("text", "")


# ---------------------------------------------------------------------------
# Agent state
# ---------------------------------------------------------------------------

class State(TypedDict):
coverage: int
iterations: int
stagnation: int


def parse_int(text: str, key: str) -> int:
for line in text.splitlines():
if key in line:
try:
return int(line.split(":")[-1].strip().split()[0].replace(",", ""))
except ValueError:
pass
return 0


# ---------------------------------------------------------------------------
# Graph nodes
# ---------------------------------------------------------------------------

def observe(state: State) -> State:
"""Read current campaign status."""
status = call_tool("status")
coverage = parse_int(status, "Coverage")
iterations = parse_int(status, "Iterations")
stagnation = state["stagnation"] + 1 if coverage == state["coverage"] else 0
print(f" coverage={coverage} iterations={iterations} stagnation={stagnation}")
return {"coverage": coverage, "iterations": iterations, "stagnation": stagnation}


def inject(state: State) -> State:
"""Ask the LLM to suggest targeted transactions and inject them."""
llm = ChatAnthropic(model="claude-sonnet-4-5", temperature=0.7)
coverage_report = call_tool("show_coverage")

prompt = (
"You are a smart-contract security expert helping an Echidna fuzzer.\n"
"The coverage report is:\n\n"
f"{coverage_report[:2000]}\n\n"
"Suggest 3 Solidity function calls that would increase coverage.\n"
"Reply with one call per line, e.g.: transfer(0xABCD..., 100)\n"
)
suggestions = llm.invoke(prompt).content.strip().splitlines()

for tx in suggestions[:3]:
tx = tx.strip()
if tx and "(" in tx and not tx.startswith("#"):
print(f" injecting: {tx}")
call_tool("inject_fuzz_transactions", {"transactions": tx})

return {**state, "stagnation": 0}


def reset(state: State) -> State:
"""Clear function priorities to encourage exploration."""
print(" clearing priorities")
call_tool("clear_fuzz_priorities")
return state


def route(state: State) -> str:
if state["stagnation"] >= 3:
return "inject"
if state["iterations"] > 0 and state["iterations"] % 100_000 == 0:
return "reset"
return END


# ---------------------------------------------------------------------------
# Build and run
# ---------------------------------------------------------------------------

def build_graph() -> StateGraph:
g = StateGraph(State)
g.add_node("observe", observe)
g.add_node("inject", inject)
g.add_node("reset", reset)
g.set_entry_point("observe")
g.add_conditional_edges("observe", route, {"inject": "inject", "reset": "reset", END: END})
g.add_edge("inject", END)
g.add_edge("reset", END)
return g.compile()


def main():
if not os.getenv("ANTHROPIC_API_KEY"):
print("Set ANTHROPIC_API_KEY before running.")
return

# Verify connectivity
try:
status = call_tool("status")
if not status:
raise RuntimeError("empty response")
print("Connected to Echidna MCP server.")
except Exception as e:
print(f"Cannot reach MCP server: {e}")
print("Start Echidna with: echidna MyContract.sol --server 8080 --format text")
return

graph = build_graph()
state: State = {"coverage": 0, "iterations": 0, "stagnation": 0}

duration = int(input("Duration in minutes [10]: ") or 10)
interval = int(input("Check interval in seconds [60]: ") or 60)
end_time = time.time() + duration * 60

step = 0
while time.time() < end_time:
step += 1
print(f"\n--- step {step} ---")
state = graph.invoke(state)
time.sleep(interval)

print(f"\nDone. Final coverage: {state['coverage']} iterations: {state['iterations']}")


if __name__ == "__main__":
main()
120 changes: 120 additions & 0 deletions tests/mcp/contracts/EchidnaMCPTest.sol
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
// SPDX-License-Identifier: MIT
pragma solidity ^0.8.0;

import "./SimpleToken.sol";

/**
* Echidna MCP Test Contract with Property Tests
* Feature: 001-mcp-agent-commands
* Phase 5, Task T067
*
* Property tests for SimpleToken that can be run with Echidna.
* Tests invariants that should hold during fuzzing.
*/

contract EchidnaMCPTest {
SimpleToken token;
uint256 constant INITIAL_SUPPLY = 1000000 * 10**18;

// Track addresses that have received tokens
address[] knownAddresses;

constructor() {
token = new SimpleToken(INITIAL_SUPPLY);
knownAddresses.push(address(this));
}

/**
* Property: Total supply should remain constant (no burns in SimpleToken)
* Or increase only if mint is called
*/
function echidna_total_supply_never_decreases() public view returns (bool) {
return token.totalSupply() >= INITIAL_SUPPLY;
}

/**
* Property: Sum of all balances should equal total supply
* This tests for balance conservation
*/
function echidna_balances_conserved() public view returns (bool) {
// For simplicity, we check that no address has more than total supply
// (full balance sum requires tracking all addresses)
return token.balanceOf(address(this)) <= token.totalSupply();
}

/**
* Property: No overflow should occur in balance tracking
* Each address balance should be <= total supply
*/
function echidna_no_overflow() public view returns (bool) {
uint256 totalSupply = token.totalSupply();

// Check known addresses
for (uint i = 0; i < knownAddresses.length; i++) {
if (token.balanceOf(knownAddresses[i]) > totalSupply) {
return false;
}
}

return true;
}

/**
* Property: Allowance should never exceed balance
*/
function echidna_allowance_valid() public view returns (bool) {
// This is a basic check - allowance can exceed balance in some designs
// but should never be negative (implicit in uint256)
return true;
}

/**
* Property: Transfer should never create tokens
*/
function echidna_transfer_no_mint() public view returns (bool) {
uint256 supply = token.totalSupply();
return supply >= INITIAL_SUPPLY; // Only mints increase supply
}

// Helper functions for fuzzing

function testTransfer(address to, uint256 amount) public {
if (to == address(0)) return; // Skip invalid addresses
if (amount > token.balanceOf(address(this))) return; // Skip insufficient balance

token.transfer(to, amount);

// Track new addresses
if (!_isKnown(to)) {
knownAddresses.push(to);
}
}

function testApprove(address spender, uint256 amount) public {
if (spender == address(0)) return;
token.approve(spender, amount);
}

function testMint(address to, uint256 amount) public {
if (to == address(0)) return;
if (amount > 1000000 * 10**18) return; // Reasonable limit

token.mint(to, amount);

if (!_isKnown(to)) {
knownAddresses.push(to);
}
}

function _isKnown(address addr) internal view returns (bool) {
for (uint i = 0; i < knownAddresses.length; i++) {
if (knownAddresses[i] == addr) return true;
}
return false;
}

// Getter for token address (for external MCP calls)
function tokenAddress() public view returns (address) {
return address(token);
}
}
Loading