Coverage for app \ rag \ claim_extractor.py: 98%
132 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-24 13:18 +0530
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-24 13:18 +0530
1"""
2Claim Extractor - PRODUCTION-READY VERSION
3Handles multiple formats and has reliable fallbacks.
4"""
6from typing import List, Dict
7import re
10def extract_claims(text: str) -> List[Dict[str, str]]:
11 """
12 Extract structured claims from LLM output.
13 Handles multiple response formats reliably.
14 """
16 if not text:
17 return []
19 # ✅ Always strip markdown bold/italic before processing
20 clean_text = _strip_markdown(text)
22 # Strategy 1: Dash-based claims (- RISK: ..., - MONITORING: ...)
23 claims = _extract_dash_based_claims(clean_text)
24 if claims:
25 return claims
27 # Strategy 2: Inline bold section headers (**Key Considerations:** ...)
28 # This is what phi3:mini / ollama models typically output
29 claims = _extract_bold_section_claims(clean_text)
30 if claims:
31 return claims
33 # Strategy 3: Markdown ## headers
34 sections = _split_by_headers(clean_text)
35 if sections:
36 claims = []
37 for section, content in sections.items():
38 claim_type = _map_section_to_type(section)
39 lines = _extract_bullet_points(content)
40 for line in lines:
41 if line.strip():
42 claims.append({
43 "type": claim_type,
44 "statement": line.strip()
45 })
46 if claims:
47 return claims
49 # Strategy 4: Smart sentence extraction (fallback)
50 claims = _extract_smart_sentences(clean_text)
51 if claims:
52 return claims
54 # Final fallback
55 return [{
56 "type": "general",
57 "statement": clean_text[:200]
58 }]
61# ------------------------------------------------------------------
62# ✅ NEW: Strip markdown formatting before processing
63# ------------------------------------------------------------------
65def _strip_markdown(text: str) -> str:
66 """
67 Remove markdown bold (**text**), italic (*text*), and header markers.
68 Preserves the actual content text.
69 """
70 # Remove bold: **text** → text
71 text = re.sub(r'\*\*(.+?)\*\*', r'\1', text)
72 # Remove italic: *text* → text
73 text = re.sub(r'\*(.+?)\*', r'\1', text)
74 # Remove inline code: `text` → text
75 text = re.sub(r'`(.+?)`', r'\1', text)
76 return text
79# ------------------------------------------------------------------
80# ✅ NEW: Handle inline bold section headers
81# e.g. "Key Considerations: ..." or "What to Monitor: ..."
82# This is what phi3:mini outputs instead of ## headers
83# ------------------------------------------------------------------
85def _extract_bold_section_claims(text: str) -> List[Dict[str, str]]:
86 """
87 Extract claims from inline section format:
88 "Key Considerations: blah blah. What to Monitor: blah blah."
90 After markdown stripping, the bold headers become plain text like:
91 "Key Considerations: ..."
92 """
93 # Known section headers from prompt_builder.py response format
94 section_patterns = [
95 (r'key\s+considerations?\s*:', 'general'),
96 (r'what\s+to\s+monitor\s*:', 'monitoring'),
97 (r'when\s+to\s+seek\s+medical\s+help\s*:', 'warning'),
98 (r'safety\s+notes?\s*:', 'recommendation'),
99 ]
101 # Build a splitter that splits on any known section header
102 splitter = '|'.join(p for p, _ in section_patterns)
103 full_pattern = re.compile(
104 r'(' + splitter + r')',
105 re.IGNORECASE
106 )
108 parts = full_pattern.split(text)
110 if len(parts) <= 1:
111 return [] # No section headers found
113 claims = []
114 i = 1 # parts[0] is preamble before first header
116 while i < len(parts) - 1:
117 header = parts[i].strip().lower()
118 content = parts[i + 1].strip() if i + 1 < len(parts) else ""
119 i += 2
121 # Determine type from header
122 claim_type = "general"
123 for pattern, ctype in section_patterns:
124 if re.search(pattern, header, re.IGNORECASE):
125 claim_type = ctype
126 break
128 # Split content into individual sentences
129 sentences = re.split(r'(?<=[.!?])\s+', content)
130 for sentence in sentences:
131 sentence = sentence.strip().rstrip('.')
132 # Skip very short or empty sentences
133 if len(sentence) > 20:
134 claims.append({
135 "type": claim_type,
136 "statement": sentence
137 })
139 return claims
142def _extract_dash_based_claims(text: str) -> List[Dict[str, str]]:
143 """
144 Extract claims in format: - RISK: statement
145 """
146 claims = []
147 pattern = r'^-\s+([A-Z]+):\s+(.+)$'
149 for line in text.splitlines():
150 match = re.match(pattern, line.strip())
151 if match:
152 claim_type = match.group(1).lower()
153 statement = match.group(2).strip()
154 if claim_type not in ['risk', 'monitoring', 'warning', 'recommendation']:
155 claim_type = 'general'
156 claims.append({"type": claim_type, "statement": statement})
158 return claims
161def _split_by_headers(text: str) -> Dict[str, str]:
162 """
163 Split text into sections by markdown headers (# or ##).
164 """
165 sections = {}
166 current_section = "unknown"
167 buffer = []
169 for line in text.splitlines():
170 header_match = re.match(r'^#+\s+(.+)$', line.strip())
171 if header_match:
172 if buffer:
173 section_text = "\n".join(buffer).strip()
174 if section_text:
175 sections[current_section] = section_text
176 buffer = []
177 current_section = header_match.group(1).lower()
178 else:
179 buffer.append(line)
181 if buffer:
182 section_text = "\n".join(buffer).strip()
183 if section_text:
184 sections[current_section] = section_text
186 return sections
189def _extract_bullet_points(text: str) -> List[str]:
190 """
191 Extract bullet points from text.
192 """
193 claims = []
194 for line in text.splitlines():
195 line = line.strip()
196 if not line:
197 continue
198 if line[0] in ['-', '*', '•']:
199 claim = line.lstrip('-*•').strip()
200 if claim:
201 claims.append(claim)
202 elif re.match(r'^\d+\.\s+', line):
203 claim = re.sub(r'^\d+\.\s+', '', line).strip()
204 if claim:
205 claims.append(claim)
206 return claims
209def _extract_smart_sentences(text: str) -> List[Dict[str, str]]:
210 """
211 Smart sentence extraction with type classification (fallback).
212 """
213 claims = []
214 sentences = re.split(r'(?<=[.!?])\s+', text)
216 for sentence in sentences:
217 sentence = sentence.strip()
218 if 15 < len(sentence) < 500:
219 claims.append({
220 "type": _classify_sentence(sentence),
221 "statement": sentence
222 })
224 return claims[:15]
227def _classify_sentence(sentence: str) -> str:
228 """
229 Classify a sentence into a claim type based on keywords.
230 """
231 s = sentence.lower()
233 if any(w in s for w in ['risk', 'danger', 'avoid', 'contraindicated', 'caution', 'can cause', 'may cause']):
234 return "risk"
235 if any(w in s for w in ['monitor', 'track', 'watch', 'check', 'measure', 'test', 'observe']):
236 return "monitoring"
237 if any(w in s for w in ['urgent', 'immediately', 'emergency', 'seek', 'call', 'hospital']):
238 return "warning"
239 if any(w in s for w in ['recommend', 'suggest', 'consider', 'should', 'important', 'maintain']):
240 return "recommendation"
242 return "general"
245def _map_section_to_type(section: str) -> str:
246 """
247 Map section name to claim type.
248 """
249 s = section.lower()
251 if any(w in s for w in ['risk', 'concern', 'danger']):
252 return "risk"
253 if any(w in s for w in ['monitor', 'watch', 'track']):
254 return "monitoring"
255 if any(w in s for w in ['help', 'urgent', 'emergency', 'seek']):
256 return "warning"
257 if any(w in s for w in ['recommend', 'consider', 'suggest', 'safety', 'note']):
258 return "recommendation"
260 return "general"