- Capture total_pages before doc.close() in content_analysis, security_analysis, annotations, and misc_tools mixins - Fix invalid PyMuPDF font name "helv-bold" → "helv" in add_stamps - Bump to v2.1.7
360 lines
15 KiB
Python
360 lines
15 KiB
Python
"""
|
|
Security Analysis Mixin - PDF security analysis and watermark detection
|
|
Uses official fastmcp.contrib.mcp_mixin pattern
|
|
"""
|
|
|
|
import asyncio
|
|
import time
|
|
from pathlib import Path
|
|
from typing import Dict, Any, Optional, List
|
|
import logging
|
|
|
|
# PDF processing libraries
|
|
import fitz # PyMuPDF
|
|
from PIL import Image
|
|
import io
|
|
|
|
# Official FastMCP mixin
|
|
from fastmcp.contrib.mcp_mixin import MCPMixin, mcp_tool
|
|
|
|
from ..security import validate_pdf_path, sanitize_error_message
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class SecurityAnalysisMixin(MCPMixin):
|
|
"""
|
|
Handles PDF security analysis including permissions, encryption, and watermark detection.
|
|
Uses the official FastMCP mixin pattern.
|
|
"""
|
|
|
|
def __init__(self):
|
|
super().__init__()
|
|
|
|
@mcp_tool(
|
|
name="analyze_pdf_security",
|
|
description="Analyze PDF security features and potential issues"
|
|
)
|
|
async def analyze_pdf_security(self, pdf_path: str) -> Dict[str, Any]:
|
|
"""
|
|
Analyze PDF security features including encryption, permissions, and vulnerabilities.
|
|
|
|
Args:
|
|
pdf_path: Path to PDF file or HTTPS URL
|
|
|
|
Returns:
|
|
Dictionary containing security analysis results
|
|
"""
|
|
start_time = time.time()
|
|
|
|
try:
|
|
path = await validate_pdf_path(pdf_path)
|
|
doc = fitz.open(str(path))
|
|
|
|
# Basic security information
|
|
is_encrypted = doc.needs_pass
|
|
is_linearized = getattr(doc, 'is_linearized', False)
|
|
pdf_version = getattr(doc, 'pdf_version', 'Unknown')
|
|
|
|
# Permission analysis
|
|
permissions = doc.permissions
|
|
permission_details = {
|
|
"print_allowed": bool(permissions & fitz.PDF_PERM_PRINT),
|
|
"copy_allowed": bool(permissions & fitz.PDF_PERM_COPY),
|
|
"modify_allowed": bool(permissions & fitz.PDF_PERM_MODIFY),
|
|
"annotate_allowed": bool(permissions & fitz.PDF_PERM_ANNOTATE),
|
|
"form_fill_allowed": bool(permissions & fitz.PDF_PERM_FORM),
|
|
"extract_allowed": bool(permissions & fitz.PDF_PERM_ACCESSIBILITY),
|
|
"assemble_allowed": bool(permissions & fitz.PDF_PERM_ASSEMBLE),
|
|
"print_high_quality_allowed": bool(permissions & fitz.PDF_PERM_PRINT_HQ)
|
|
}
|
|
|
|
# Security warnings and recommendations
|
|
security_warnings = []
|
|
security_recommendations = []
|
|
|
|
# Check for common security issues
|
|
if not is_encrypted:
|
|
security_warnings.append("Document is not password protected")
|
|
security_recommendations.append("Consider adding password protection for sensitive documents")
|
|
|
|
if permission_details["copy_allowed"] and permission_details["extract_allowed"]:
|
|
security_warnings.append("Text extraction and copying is unrestricted")
|
|
|
|
if permission_details["modify_allowed"]:
|
|
security_warnings.append("Document modification is allowed")
|
|
security_recommendations.append("Consider restricting modification permissions")
|
|
|
|
# Check PDF version for security considerations
|
|
if isinstance(pdf_version, (int, float)) and pdf_version < 1.4:
|
|
security_warnings.append(f"Old PDF version ({pdf_version}) may have security vulnerabilities")
|
|
security_recommendations.append("Consider updating to PDF version 1.7 or newer")
|
|
|
|
# Analyze metadata for potential information disclosure
|
|
metadata = doc.metadata
|
|
metadata_warnings = []
|
|
|
|
potentially_sensitive_fields = ["creator", "producer", "title", "author", "subject"]
|
|
for field in potentially_sensitive_fields:
|
|
if metadata.get(field):
|
|
metadata_warnings.append(f"Metadata contains {field}: {metadata[field][:50]}...")
|
|
|
|
if metadata_warnings:
|
|
security_warnings.append("Document metadata may contain sensitive information")
|
|
security_recommendations.append("Review and sanitize metadata before distribution")
|
|
|
|
# Check for JavaScript (potential security risk)
|
|
has_javascript = False
|
|
javascript_count = 0
|
|
|
|
for page_num in range(min(10, len(doc))): # Check first 10 pages
|
|
page = doc[page_num]
|
|
try:
|
|
# Look for JavaScript annotations
|
|
annotations = page.annots()
|
|
for annot in annotations:
|
|
annot_dict = annot.info
|
|
if 'javascript' in str(annot_dict).lower():
|
|
has_javascript = True
|
|
javascript_count += 1
|
|
except:
|
|
pass
|
|
|
|
if has_javascript:
|
|
security_warnings.append(f"Document contains JavaScript ({javascript_count} instances)")
|
|
security_recommendations.append("JavaScript in PDFs can pose security risks - review content")
|
|
|
|
# Check for embedded files
|
|
embedded_files = []
|
|
try:
|
|
for i in range(doc.embedded_file_count()):
|
|
file_info = doc.embedded_file_info(i)
|
|
embedded_files.append({
|
|
"name": file_info.get("name", f"embedded_file_{i}"),
|
|
"size": file_info.get("size", 0),
|
|
"type": file_info.get("type", "unknown")
|
|
})
|
|
except:
|
|
pass
|
|
|
|
if embedded_files:
|
|
security_warnings.append(f"Document contains {len(embedded_files)} embedded files")
|
|
security_recommendations.append("Embedded files should be scanned for malware")
|
|
|
|
# Calculate security score
|
|
security_score = 100
|
|
security_score -= len(security_warnings) * 10
|
|
if not is_encrypted:
|
|
security_score -= 20
|
|
if has_javascript:
|
|
security_score -= 15
|
|
if embedded_files:
|
|
security_score -= 10
|
|
|
|
security_score = max(0, security_score)
|
|
|
|
# Determine security level
|
|
if security_score >= 80:
|
|
security_level = "High"
|
|
elif security_score >= 60:
|
|
security_level = "Medium"
|
|
elif security_score >= 40:
|
|
security_level = "Low"
|
|
else:
|
|
security_level = "Critical"
|
|
|
|
doc.close()
|
|
|
|
return {
|
|
"success": True,
|
|
"security_score": security_score,
|
|
"security_level": security_level,
|
|
"encryption_info": {
|
|
"is_encrypted": is_encrypted,
|
|
"is_linearized": is_linearized,
|
|
"pdf_version": pdf_version
|
|
},
|
|
"permissions": permission_details,
|
|
"security_features": {
|
|
"has_javascript": has_javascript,
|
|
"javascript_instances": javascript_count,
|
|
"embedded_files_count": len(embedded_files),
|
|
"embedded_files": embedded_files
|
|
},
|
|
"metadata_analysis": {
|
|
"has_metadata": bool(any(metadata.values())),
|
|
"metadata_warnings": metadata_warnings
|
|
},
|
|
"security_assessment": {
|
|
"warnings": security_warnings,
|
|
"recommendations": security_recommendations,
|
|
"total_issues": len(security_warnings)
|
|
},
|
|
"file_info": {
|
|
"path": str(path),
|
|
"file_size": path.stat().st_size
|
|
},
|
|
"analysis_time": round(time.time() - start_time, 2)
|
|
}
|
|
|
|
except Exception as e:
|
|
error_msg = sanitize_error_message(str(e))
|
|
logger.error(f"Security analysis failed: {error_msg}")
|
|
return {
|
|
"success": False,
|
|
"error": error_msg,
|
|
"analysis_time": round(time.time() - start_time, 2)
|
|
}
|
|
|
|
@mcp_tool(
|
|
name="detect_watermarks",
|
|
description="Detect and analyze watermarks in PDF"
|
|
)
|
|
async def detect_watermarks(self, pdf_path: str) -> Dict[str, Any]:
|
|
"""
|
|
Detect and analyze watermarks in PDF document.
|
|
|
|
Args:
|
|
pdf_path: Path to PDF file or HTTPS URL
|
|
|
|
Returns:
|
|
Dictionary containing watermark detection results
|
|
"""
|
|
start_time = time.time()
|
|
|
|
try:
|
|
path = await validate_pdf_path(pdf_path)
|
|
doc = fitz.open(str(path))
|
|
total_pages = len(doc)
|
|
|
|
watermark_analysis = []
|
|
total_watermarks = 0
|
|
watermark_types = {"text": 0, "image": 0, "shape": 0}
|
|
|
|
# Analyze each page for watermarks
|
|
for page_num in range(len(doc)):
|
|
page = doc[page_num]
|
|
page_watermarks = []
|
|
|
|
try:
|
|
# Check for text watermarks (often low opacity or behind content)
|
|
text_dict = page.get_text("dict")
|
|
|
|
for block in text_dict.get("blocks", []):
|
|
if "lines" in block:
|
|
for line in block["lines"]:
|
|
for span in line["spans"]:
|
|
text = span.get("text", "").strip()
|
|
# Common watermark indicators
|
|
if (len(text) > 0 and
|
|
(text.upper() in ["DRAFT", "CONFIDENTIAL", "COPY", "SAMPLE", "WATERMARK"] or
|
|
"watermark" in text.lower() or
|
|
"confidential" in text.lower() or
|
|
"draft" in text.lower())):
|
|
|
|
page_watermarks.append({
|
|
"type": "text",
|
|
"content": text,
|
|
"font_size": span.get("size", 0),
|
|
"coordinates": {
|
|
"x": round(span.get("bbox", [0, 0, 0, 0])[0], 2),
|
|
"y": round(span.get("bbox", [0, 0, 0, 0])[1], 2)
|
|
}
|
|
})
|
|
watermark_types["text"] += 1
|
|
|
|
# Check for image watermarks (semi-transparent images)
|
|
images = page.get_images()
|
|
for img_index, img in enumerate(images):
|
|
try:
|
|
xref = img[0]
|
|
pix = fitz.Pixmap(doc, xref)
|
|
|
|
# Check if image is likely a watermark (small or semi-transparent)
|
|
if pix.width < 200 or pix.height < 200:
|
|
page_watermarks.append({
|
|
"type": "image",
|
|
"size": f"{pix.width}x{pix.height}",
|
|
"image_index": img_index + 1,
|
|
"coordinates": "analysis_required"
|
|
})
|
|
watermark_types["image"] += 1
|
|
|
|
pix = None
|
|
except:
|
|
pass
|
|
|
|
# Check for drawing watermarks (shapes, lines)
|
|
drawings = page.get_drawings()
|
|
for drawing in drawings:
|
|
# Simple heuristic: large shapes that might be watermarks
|
|
if len(drawing.get("items", [])) > 5: # Complex shape
|
|
page_watermarks.append({
|
|
"type": "shape",
|
|
"complexity": len(drawing.get("items", [])),
|
|
"coordinates": "shape_detected"
|
|
})
|
|
watermark_types["shape"] += 1
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Failed to analyze page {page_num + 1} for watermarks: {e}")
|
|
|
|
if page_watermarks:
|
|
watermark_analysis.append({
|
|
"page": page_num + 1,
|
|
"watermarks_found": len(page_watermarks),
|
|
"watermarks": page_watermarks
|
|
})
|
|
total_watermarks += len(page_watermarks)
|
|
|
|
doc.close()
|
|
|
|
# Watermark assessment
|
|
has_watermarks = total_watermarks > 0
|
|
watermark_density = total_watermarks / total_pages if total_pages > 0 else 0
|
|
|
|
# Determine watermark pattern
|
|
if watermark_density > 0.8:
|
|
pattern = "comprehensive" # Most pages have watermarks
|
|
elif watermark_density > 0.3:
|
|
pattern = "selective" # Some pages have watermarks
|
|
elif watermark_density > 0:
|
|
pattern = "minimal" # Few pages have watermarks
|
|
else:
|
|
pattern = "none"
|
|
|
|
return {
|
|
"success": True,
|
|
"watermark_summary": {
|
|
"has_watermarks": has_watermarks,
|
|
"total_watermarks": total_watermarks,
|
|
"watermark_density": round(watermark_density, 2),
|
|
"pattern": pattern,
|
|
"types_found": watermark_types
|
|
},
|
|
"page_analysis": watermark_analysis,
|
|
"watermark_insights": {
|
|
"pages_with_watermarks": len(watermark_analysis),
|
|
"pages_without_watermarks": total_pages - len(watermark_analysis),
|
|
"most_common_type": max(watermark_types, key=watermark_types.get) if any(watermark_types.values()) else "none"
|
|
},
|
|
"recommendations": [
|
|
"Check text watermarks for sensitive information disclosure",
|
|
"Verify image watermarks don't contain hidden data",
|
|
"Consider watermark removal if document is for public distribution"
|
|
] if has_watermarks else ["No watermarks detected"],
|
|
"file_info": {
|
|
"path": str(path),
|
|
"total_pages": total_pages
|
|
},
|
|
"analysis_time": round(time.time() - start_time, 2)
|
|
}
|
|
|
|
except Exception as e:
|
|
error_msg = sanitize_error_message(str(e))
|
|
logger.error(f"Watermark detection failed: {error_msg}")
|
|
return {
|
|
"success": False,
|
|
"error": error_msg,
|
|
"analysis_time": round(time.time() - start_time, 2)
|
|
} |