""" Content Analysis Mixin - PDF content classification, summarization, and layout analysis Uses official fastmcp.contrib.mcp_mixin pattern """ import asyncio import time from pathlib import Path from typing import Dict, Any, Optional, List import logging import re from collections import Counter # PDF processing libraries import fitz # PyMuPDF # Official FastMCP mixin from fastmcp.contrib.mcp_mixin import MCPMixin, mcp_tool from ..security import validate_pdf_path, sanitize_error_message from .utils import parse_pages_parameter logger = logging.getLogger(__name__) class ContentAnalysisMixin(MCPMixin): """ Handles PDF content analysis including classification, summarization, and layout analysis. Uses the official FastMCP mixin pattern. """ def __init__(self): super().__init__() @mcp_tool( name="classify_content", description="Classify and analyze PDF content type and structure" ) async def classify_content(self, pdf_path: str) -> Dict[str, Any]: """ Classify PDF content type and analyze document structure. Args: pdf_path: Path to PDF file or HTTPS URL Returns: Dictionary containing content classification results """ start_time = time.time() try: path = await validate_pdf_path(pdf_path) doc = fitz.open(str(path)) # Extract text from sample pages for analysis sample_size = min(10, len(doc)) full_text = "" total_words = 0 total_sentences = 0 for page_num in range(sample_size): page_text = doc[page_num].get_text() full_text += page_text + " " total_words += len(page_text.split()) # Count sentences (basic estimation) sentences = re.split(r'[.!?]+', full_text) total_sentences = len([s for s in sentences if s.strip()]) # Analyze document structure toc = doc.get_toc() has_bookmarks = len(toc) > 0 bookmark_levels = max([item[0] for item in toc]) if toc else 0 # Content type classification content_indicators = { "academic": ["abstract", "introduction", "methodology", "conclusion", "references", "bibliography"], "business": ["executive summary", "proposal", "budget", "quarterly", "revenue", "profit"], "legal": ["whereas", "hereby", "pursuant", "plaintiff", "defendant", "contract", "agreement"], "technical": ["algorithm", "implementation", "system", "configuration", "specification", "api"], "financial": ["financial", "income", "expense", "balance sheet", "cash flow", "investment"], "medical": ["patient", "diagnosis", "treatment", "symptoms", "medical", "clinical"], "educational": ["course", "curriculum", "lesson", "assignment", "grade", "student"] } content_scores = {} text_lower = full_text.lower() for category, keywords in content_indicators.items(): score = sum(text_lower.count(keyword) for keyword in keywords) content_scores[category] = score # Determine primary content type if content_scores: primary_type = max(content_scores, key=content_scores.get) confidence = content_scores[primary_type] / max(sum(content_scores.values()), 1) else: primary_type = "general" confidence = 0.5 # Analyze text characteristics avg_words_per_page = total_words / sample_size if sample_size > 0 else 0 avg_sentences_per_page = total_sentences / sample_size if sample_size > 0 else 0 # Document complexity analysis unique_words = len(set(full_text.lower().split())) vocabulary_diversity = unique_words / max(total_words, 1) # Reading level estimation (simplified) if avg_sentences_per_page > 0: avg_words_per_sentence = total_words / total_sentences # Simplified readability score readability_score = 206.835 - (1.015 * avg_words_per_sentence) - (84.6 * (total_sentences / max(total_words, 1))) readability_score = max(0, min(100, readability_score)) else: readability_score = 50 # Determine reading level if readability_score >= 90: reading_level = "Elementary" elif readability_score >= 70: reading_level = "Middle School" elif readability_score >= 50: reading_level = "High School" elif readability_score >= 30: reading_level = "College" else: reading_level = "Graduate" # Check for multimedia content total_images = sum(len(doc[i].get_images()) for i in range(sample_size)) total_links = sum(len(doc[i].get_links()) for i in range(sample_size)) # Estimate for full document estimated_total_images = int(total_images * len(doc) / sample_size) if sample_size > 0 else 0 estimated_total_links = int(total_links * len(doc) / sample_size) if sample_size > 0 else 0 doc.close() return { "success": True, "classification": { "primary_type": primary_type, "confidence": round(confidence, 2), "secondary_types": sorted(content_scores.items(), key=lambda x: x[1], reverse=True)[1:4] }, "content_analysis": { "total_pages": len(doc), "estimated_word_count": int(total_words * len(doc) / sample_size), "avg_words_per_page": round(avg_words_per_page, 1), "vocabulary_diversity": round(vocabulary_diversity, 2), "reading_level": reading_level, "readability_score": round(readability_score, 1) }, "document_structure": { "has_bookmarks": has_bookmarks, "bookmark_levels": bookmark_levels, "estimated_sections": len([item for item in toc if item[0] <= 2]), "is_structured": has_bookmarks and bookmark_levels > 1 }, "multimedia_content": { "estimated_images": estimated_total_images, "estimated_links": estimated_total_links, "is_multimedia_rich": estimated_total_images > 10 or estimated_total_links > 5 }, "content_characteristics": { "is_text_heavy": avg_words_per_page > 500, "is_technical": content_scores.get("technical", 0) > 5, "has_formal_language": primary_type in ["legal", "academic", "technical"], "complexity_level": "high" if vocabulary_diversity > 0.7 else "medium" if vocabulary_diversity > 0.4 else "low" }, "file_info": { "path": str(path), "pages_analyzed": sample_size }, "analysis_time": round(time.time() - start_time, 2) } except Exception as e: error_msg = sanitize_error_message(str(e)) logger.error(f"Content classification failed: {error_msg}") return { "success": False, "error": error_msg, "analysis_time": round(time.time() - start_time, 2) } @mcp_tool( name="summarize_content", description="Generate summary and key insights from PDF content" ) async def summarize_content( self, pdf_path: str, pages: Optional[str] = None, summary_length: str = "medium" ) -> Dict[str, Any]: """ Generate summary and extract key insights from PDF content. Args: pdf_path: Path to PDF file or HTTPS URL pages: Page numbers to summarize (comma-separated, 1-based), None for all summary_length: Summary length ("short", "medium", "long") Returns: Dictionary containing content summary and insights """ start_time = time.time() try: path = await validate_pdf_path(pdf_path) doc = fitz.open(str(path)) # Parse pages parameter parsed_pages = parse_pages_parameter(pages) page_numbers = parsed_pages if parsed_pages else list(range(len(doc))) page_numbers = [p for p in page_numbers if 0 <= p < len(doc)] # If parsing failed but pages was specified, use all pages if pages and not page_numbers: page_numbers = list(range(len(doc))) # Extract text from specified pages full_text = "" for page_num in page_numbers: page_text = doc[page_num].get_text() full_text += page_text + "\n" # Basic text processing paragraphs = [p.strip() for p in full_text.split('\n\n') if p.strip()] sentences = [s.strip() for s in re.split(r'[.!?]+', full_text) if s.strip()] words = full_text.split() # Extract key phrases (simple frequency-based approach) word_freq = Counter(word.lower().strip('.,!?;:()[]{}') for word in words if len(word) > 3 and word.isalpha()) common_words = word_freq.most_common(20) # Extract potential key topics (capitalized phrases) topics = [] topic_pattern = r'\b[A-Z][a-z]+(?:\s+[A-Z][a-z]+)*\b' topic_matches = re.findall(topic_pattern, full_text) topic_freq = Counter(topic_matches) topics = [topic for topic, freq in topic_freq.most_common(10) if freq > 1] # Extract potential dates and numbers date_pattern = r'\b(?:\d{1,2}[/-]\d{1,2}[/-]\d{2,4}|\d{4}[/-]\d{1,2}[/-]\d{1,2})\b' dates = list(set(re.findall(date_pattern, full_text))) number_pattern = r'\b\d+(?:,\d{3})*(?:\.\d+)?\b' numbers = [num for num in re.findall(number_pattern, full_text) if len(num) > 2] # Generate summary based on length preference summary_sentences = [] target_sentences = {"short": 3, "medium": 7, "long": 15}.get(summary_length, 7) # Simple extractive summarization: select sentences with high keyword overlap if sentences: sentence_scores = [] for sentence in sentences[:50]: # Limit to first 50 sentences score = sum(word_freq.get(word.lower(), 0) for word in sentence.split()) sentence_scores.append((score, sentence)) # Select top sentences sentence_scores.sort(reverse=True) summary_sentences = [sent for _, sent in sentence_scores[:target_sentences]] # Generate insights insights = [] if len(words) > 1000: insights.append(f"This is a substantial document with approximately {len(words):,} words") if topics: insights.append(f"Key topics include: {', '.join(topics[:5])}") if dates: insights.append(f"Document references {len(dates)} dates, suggesting time-sensitive content") if len(paragraphs) > 20: insights.append("Document has extensive content with detailed sections") # Document metrics reading_time = len(words) // 200 # Assuming 200 words per minute doc.close() return { "success": True, "summary": { "length": summary_length, "sentences": summary_sentences, "key_insights": insights }, "content_metrics": { "total_words": len(words), "total_sentences": len(sentences), "total_paragraphs": len(paragraphs), "estimated_reading_time_minutes": reading_time, "pages_analyzed": len(page_numbers) }, "key_elements": { "top_keywords": [{"word": word, "frequency": freq} for word, freq in common_words[:10]], "identified_topics": topics, "dates_found": dates[:10], # Limit for context window "significant_numbers": numbers[:10] }, "document_characteristics": { "content_density": "high" if len(words) / len(page_numbers) > 500 else "medium" if len(words) / len(page_numbers) > 200 else "low", "structure_complexity": "high" if len(paragraphs) / len(page_numbers) > 10 else "medium" if len(paragraphs) / len(page_numbers) > 5 else "low", "topic_diversity": len(topics) }, "file_info": { "path": str(path), "total_pages": len(doc), "pages_processed": pages or "all" }, "analysis_time": round(time.time() - start_time, 2) } except Exception as e: error_msg = sanitize_error_message(str(e)) logger.error(f"Content summarization failed: {error_msg}") return { "success": False, "error": error_msg, "analysis_time": round(time.time() - start_time, 2) } @mcp_tool( name="analyze_layout", description="Analyze PDF page layout including text blocks, columns, and spacing" ) async def analyze_layout( self, pdf_path: str, pages: Optional[str] = None, include_coordinates: bool = True ) -> Dict[str, Any]: """ Analyze PDF page layout structure including text blocks and spacing. Args: pdf_path: Path to PDF file or HTTPS URL pages: Page numbers to analyze (comma-separated, 1-based), None for all include_coordinates: Whether to include detailed coordinate information Returns: Dictionary containing layout analysis results """ start_time = time.time() try: path = await validate_pdf_path(pdf_path) doc = fitz.open(str(path)) # Parse pages parameter parsed_pages = parse_pages_parameter(pages) if parsed_pages: page_numbers = [p for p in parsed_pages if 0 <= p < len(doc)] else: page_numbers = list(range(min(5, len(doc)))) # Limit to 5 pages for performance # If parsing failed but pages was specified, default to first 5 if pages and not page_numbers: page_numbers = list(range(min(5, len(doc)))) layout_analysis = [] for page_num in page_numbers: page = doc[page_num] page_rect = page.rect # Get text blocks text_dict = page.get_text("dict") blocks = text_dict.get("blocks", []) # Analyze text blocks text_blocks = [] total_text_area = 0 for block in blocks: if "lines" in block: # Text block block_bbox = block.get("bbox", [0, 0, 0, 0]) block_width = block_bbox[2] - block_bbox[0] block_height = block_bbox[3] - block_bbox[1] block_area = block_width * block_height total_text_area += block_area block_info = { "type": "text", "width": round(block_width, 2), "height": round(block_height, 2), "area": round(block_area, 2), "line_count": len(block["lines"]) } if include_coordinates: block_info["coordinates"] = { "x1": round(block_bbox[0], 2), "y1": round(block_bbox[1], 2), "x2": round(block_bbox[2], 2), "y2": round(block_bbox[3], 2) } text_blocks.append(block_info) # Analyze images images = page.get_images() image_blocks = [] total_image_area = 0 for img in images: try: # Get image position (approximate) xref = img[0] pix = fitz.Pixmap(doc, xref) img_area = pix.width * pix.height total_image_area += img_area image_blocks.append({ "type": "image", "width": pix.width, "height": pix.height, "area": img_area }) pix = None except: pass # Calculate layout metrics page_area = page_rect.width * page_rect.height text_coverage = (total_text_area / page_area) if page_area > 0 else 0 # Detect column layout (simplified) if text_blocks: # Group blocks by x-coordinate to detect columns x_positions = [block.get("coordinates", {}).get("x1", 0) for block in text_blocks if include_coordinates] if x_positions: x_positions.sort() column_breaks = [] for i in range(1, len(x_positions)): if x_positions[i] - x_positions[i-1] > 50: # Significant gap column_breaks.append(x_positions[i]) estimated_columns = len(column_breaks) + 1 if column_breaks else 1 else: estimated_columns = 1 else: estimated_columns = 1 # Determine layout type if estimated_columns > 2: layout_type = "multi_column" elif estimated_columns == 2: layout_type = "two_column" elif len(text_blocks) > 10: layout_type = "complex" elif len(image_blocks) > 3: layout_type = "image_heavy" else: layout_type = "simple" page_analysis = { "page": page_num + 1, "page_size": { "width": round(page_rect.width, 2), "height": round(page_rect.height, 2) }, "layout_type": layout_type, "content_summary": { "text_blocks": len(text_blocks), "image_blocks": len(image_blocks), "estimated_columns": estimated_columns, "text_coverage_percent": round(text_coverage * 100, 1) }, "text_blocks": text_blocks[:10] if len(text_blocks) > 10 else text_blocks, # Limit for context "image_blocks": image_blocks } layout_analysis.append(page_analysis) doc.close() # Overall document layout analysis layout_types = [page["layout_type"] for page in layout_analysis] most_common_layout = max(set(layout_types), key=layout_types.count) if layout_types else "unknown" avg_text_blocks = sum(page["content_summary"]["text_blocks"] for page in layout_analysis) / len(layout_analysis) avg_columns = sum(page["content_summary"]["estimated_columns"] for page in layout_analysis) / len(layout_analysis) return { "success": True, "layout_summary": { "pages_analyzed": len(page_numbers), "most_common_layout": most_common_layout, "average_text_blocks_per_page": round(avg_text_blocks, 1), "average_columns_per_page": round(avg_columns, 1), "layout_consistency": "high" if len(set(layout_types)) <= 2 else "medium" if len(set(layout_types)) <= 3 else "low" }, "page_layouts": layout_analysis, "layout_insights": [ f"Document uses primarily {most_common_layout} layout", f"Average of {avg_text_blocks:.1f} text blocks per page", f"Estimated {avg_columns:.1f} columns per page on average" ], "analysis_settings": { "include_coordinates": include_coordinates, "pages_processed": pages or f"first_{len(page_numbers)}" }, "file_info": { "path": str(path), "total_pages": len(doc) }, "analysis_time": round(time.time() - start_time, 2) } except Exception as e: error_msg = sanitize_error_message(str(e)) logger.error(f"Layout analysis failed: {error_msg}") return { "success": False, "error": error_msg, "analysis_time": round(time.time() - start_time, 2) }