mcghidra/src/mcghidra/core/http_client.py
Ryan Malloy 14b2b575c8
Some checks failed
Build Ghidra Plugin / build (push) Has been cancelled
Remove AI buzzwords from descriptions and metadata
Describe what the tool does, not how it works — builds trust
and sets accurate expectations.
2026-03-02 05:07:44 -07:00

401 lines
12 KiB
Python

"""HTTP client for Ghidra REST API communication.
Provides safe request methods with error handling, HATEOAS compliance,
and response simplification for MCP tool consumption.
"""
import time
from typing import Any, Dict, Optional, Union
from urllib.parse import urlparse
import requests
from ..config import get_config
# Allowed origins for CORS-like validation
ALLOWED_ORIGINS = {
"http://localhost",
"http://127.0.0.1",
"https://localhost",
"https://127.0.0.1",
}
def validate_origin(headers: Dict[str, str]) -> bool:
"""Validate request origin against allowed origins.
Args:
headers: Request headers dict
Returns:
True if origin is allowed or not present
"""
origin = headers.get("Origin")
if not origin:
# No origin header - allow (browser same-origin policy applies)
return True
try:
parsed = urlparse(origin)
origin_base = f"{parsed.scheme}://{parsed.hostname}"
if parsed.port:
origin_base += f":{parsed.port}"
except Exception:
return False
return origin_base in ALLOWED_ORIGINS
def get_instance_url(port: int, host: Optional[str] = None) -> str:
"""Get URL for a Ghidra instance by port.
Args:
port: Port number
host: Optional host override (defaults to config)
Returns:
Full URL for the Ghidra instance
"""
if host is None:
host = get_config().ghidra_host
return f"http://{host}:{port}"
def _make_request(
method: str,
port: int,
endpoint: str,
params: Optional[Dict[str, Any]] = None,
json_data: Optional[Dict[str, Any]] = None,
data: Optional[str] = None,
headers: Optional[Dict[str, str]] = None,
host: Optional[str] = None,
) -> Dict[str, Any]:
"""Make HTTP request to Ghidra instance with error handling.
Args:
method: HTTP method (GET, POST, PUT, PATCH, DELETE)
port: Ghidra instance port
endpoint: API endpoint path
params: Query parameters
json_data: JSON payload for POST/PUT/PATCH
data: Raw text payload
headers: Additional headers
host: Optional host override
Returns:
Response dict with success flag and result or error
"""
config = get_config()
url = f"{get_instance_url(port, host)}/{endpoint}"
# Set up headers for HATEOAS API
request_headers = {
"Accept": "application/json",
"X-Request-ID": f"mcp-bridge-{int(time.time() * 1000)}",
}
if headers:
request_headers.update(headers)
# Validate origin for state-changing requests
is_state_changing = method.upper() in ["POST", "PUT", "PATCH", "DELETE"]
if is_state_changing:
check_headers = (
json_data.get("headers", {})
if isinstance(json_data, dict)
else (headers or {})
)
if not validate_origin(check_headers):
return {
"success": False,
"error": {
"code": "ORIGIN_NOT_ALLOWED",
"message": "Origin not allowed for state-changing request",
},
"status_code": 403,
"timestamp": int(time.time() * 1000),
}
if json_data is not None:
request_headers["Content-Type"] = "application/json"
elif data is not None:
request_headers["Content-Type"] = "text/plain"
try:
response = requests.request(
method,
url,
params=params,
json=json_data,
data=data,
headers=request_headers,
timeout=config.request_timeout,
)
try:
parsed_json = response.json()
# Add timestamp if not present
if isinstance(parsed_json, dict) and "timestamp" not in parsed_json:
parsed_json["timestamp"] = int(time.time() * 1000)
# Normalize error format
if (
not response.ok
and isinstance(parsed_json, dict)
and "success" in parsed_json
and not parsed_json["success"]
):
if "error" in parsed_json and not isinstance(
parsed_json["error"], dict
):
error_message = parsed_json["error"]
parsed_json["error"] = {
"code": f"HTTP_{response.status_code}",
"message": error_message,
}
return parsed_json
except ValueError:
if response.ok:
return {
"success": False,
"error": {
"code": "NON_JSON_RESPONSE",
"message": "Received non-JSON success response",
},
"status_code": response.status_code,
"response_text": response.text[:500],
"timestamp": int(time.time() * 1000),
}
else:
return {
"success": False,
"error": {
"code": f"HTTP_{response.status_code}",
"message": f"Non-JSON error: {response.text[:100]}...",
},
"status_code": response.status_code,
"response_text": response.text[:500],
"timestamp": int(time.time() * 1000),
}
except requests.exceptions.Timeout:
return {
"success": False,
"error": {"code": "REQUEST_TIMEOUT", "message": "Request timed out"},
"status_code": 408,
"timestamp": int(time.time() * 1000),
}
except requests.exceptions.ConnectionError:
return {
"success": False,
"error": {
"code": "CONNECTION_ERROR",
"message": f"Failed to connect to Ghidra instance at {url}",
},
"status_code": 503,
"timestamp": int(time.time() * 1000),
}
except Exception as e:
return {
"success": False,
"error": {
"code": "UNEXPECTED_ERROR",
"message": f"Unexpected error: {str(e)}",
},
"exception": e.__class__.__name__,
"timestamp": int(time.time() * 1000),
}
def safe_get(
port: int, endpoint: str, params: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""Make GET request to Ghidra instance.
Args:
port: Ghidra instance port
endpoint: API endpoint path
params: Query parameters
Returns:
Response dict
"""
return _make_request("GET", port, endpoint, params=params)
def safe_post(
port: int, endpoint: str, data: Union[Dict[str, Any], str]
) -> Dict[str, Any]:
"""Make POST request to Ghidra instance.
Args:
port: Ghidra instance port
endpoint: API endpoint path
data: JSON dict or raw string payload
Returns:
Response dict
"""
headers = None
json_payload = None
text_payload = None
if isinstance(data, dict):
data = data.copy() # Don't mutate caller's dict
headers = data.pop("headers", None)
json_payload = data
else:
text_payload = data
return _make_request(
"POST", port, endpoint, json_data=json_payload, data=text_payload, headers=headers
)
def safe_put(port: int, endpoint: str, data: Dict[str, Any]) -> Dict[str, Any]:
"""Make PUT request to Ghidra instance.
Args:
port: Ghidra instance port
endpoint: API endpoint path
data: JSON payload
Returns:
Response dict
"""
if isinstance(data, dict):
data = data.copy() # Don't mutate caller's dict
headers = data.pop("headers", None)
else:
headers = None
return _make_request("PUT", port, endpoint, json_data=data, headers=headers)
def safe_patch(port: int, endpoint: str, data: Dict[str, Any]) -> Dict[str, Any]:
"""Make PATCH request to Ghidra instance.
Args:
port: Ghidra instance port
endpoint: API endpoint path
data: JSON payload
Returns:
Response dict
"""
if isinstance(data, dict):
data = data.copy() # Don't mutate caller's dict
headers = data.pop("headers", None)
else:
headers = None
return _make_request("PATCH", port, endpoint, json_data=data, headers=headers)
def safe_delete(port: int, endpoint: str) -> Dict[str, Any]:
"""Make DELETE request to Ghidra instance.
Args:
port: Ghidra instance port
endpoint: API endpoint path
Returns:
Response dict
"""
return _make_request("DELETE", port, endpoint)
def simplify_response(response: Dict[str, Any]) -> Dict[str, Any]:
"""Simplify HATEOAS response for MCP tool consumption.
- Removes _links from result entries
- Flattens nested structures
- Preserves important metadata
- Converts structured data to text
Args:
response: Raw API response
Returns:
Simplified response dict
"""
if not isinstance(response, dict):
return response
result = response.copy()
# Store API metadata
api_metadata = {}
for key in ["id", "instance", "timestamp", "size", "offset", "limit"]:
if key in result:
api_metadata[key] = result.get(key)
# Simplify result data
if "result" in result:
if isinstance(result["result"], list):
simplified_items = []
for item in result["result"]:
if isinstance(item, dict):
item_copy = item.copy()
links = item_copy.pop("_links", None)
if isinstance(links, dict):
for link_name, link_data in links.items():
if isinstance(link_data, dict) and "href" in link_data:
item_copy[f"{link_name}_url"] = link_data["href"]
simplified_items.append(item_copy)
else:
simplified_items.append(item)
result["result"] = simplified_items
elif isinstance(result["result"], dict):
result_copy = result["result"].copy()
links = result_copy.pop("_links", None)
if isinstance(links, dict):
for link_name, link_data in links.items():
if isinstance(link_data, dict) and "href" in link_data:
result_copy[f"{link_name}_url"] = link_data["href"]
# Convert disassembly to text
if "instructions" in result_copy and isinstance(
result_copy["instructions"], list
):
disasm_text = ""
for instr in result_copy["instructions"]:
if isinstance(instr, dict):
addr = instr.get("address", "")
mnemonic = instr.get("mnemonic", "")
operands = instr.get("operands", "")
bytes_str = instr.get("bytes", "")
disasm_text += (
f"{addr}: {bytes_str.ljust(10)} {mnemonic} {operands}\n"
)
result_copy["disassembly_text"] = disasm_text
result_copy.pop("instructions", None)
# Make decompiled code accessible
if "ccode" in result_copy:
result_copy["decompiled_text"] = result_copy["ccode"]
elif "decompiled" in result_copy:
result_copy["decompiled_text"] = result_copy["decompiled"]
result["result"] = result_copy
# Simplify top-level links
links = result.pop("_links", None)
if isinstance(links, dict):
api_links = {}
for link_name, link_data in links.items():
if isinstance(link_data, dict) and "href" in link_data:
api_links[link_name] = link_data["href"]
if api_links:
result["api_links"] = api_links
# Restore metadata
for key, value in api_metadata.items():
if key not in result:
result[key] = value
return result