Coverage for app \ rag \ claim_extractor.py: 98%

132 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-02-24 13:18 +0530

1""" 

2Claim Extractor - PRODUCTION-READY VERSION 

3Handles multiple formats and has reliable fallbacks. 

4""" 

5 

6from typing import List, Dict 

7import re 

8 

9 

10def extract_claims(text: str) -> List[Dict[str, str]]: 

11 """ 

12 Extract structured claims from LLM output. 

13 Handles multiple response formats reliably. 

14 """ 

15 

16 if not text: 

17 return [] 

18 

19 # ✅ Always strip markdown bold/italic before processing 

20 clean_text = _strip_markdown(text) 

21 

22 # Strategy 1: Dash-based claims (- RISK: ..., - MONITORING: ...) 

23 claims = _extract_dash_based_claims(clean_text) 

24 if claims: 

25 return claims 

26 

27 # Strategy 2: Inline bold section headers (**Key Considerations:** ...) 

28 # This is what phi3:mini / ollama models typically output 

29 claims = _extract_bold_section_claims(clean_text) 

30 if claims: 

31 return claims 

32 

33 # Strategy 3: Markdown ## headers 

34 sections = _split_by_headers(clean_text) 

35 if sections: 

36 claims = [] 

37 for section, content in sections.items(): 

38 claim_type = _map_section_to_type(section) 

39 lines = _extract_bullet_points(content) 

40 for line in lines: 

41 if line.strip(): 

42 claims.append({ 

43 "type": claim_type, 

44 "statement": line.strip() 

45 }) 

46 if claims: 

47 return claims 

48 

49 # Strategy 4: Smart sentence extraction (fallback) 

50 claims = _extract_smart_sentences(clean_text) 

51 if claims: 

52 return claims 

53 

54 # Final fallback 

55 return [{ 

56 "type": "general", 

57 "statement": clean_text[:200] 

58 }] 

59 

60 

61# ------------------------------------------------------------------ 

62# ✅ NEW: Strip markdown formatting before processing 

63# ------------------------------------------------------------------ 

64 

65def _strip_markdown(text: str) -> str: 

66 """ 

67 Remove markdown bold (**text**), italic (*text*), and header markers. 

68 Preserves the actual content text. 

69 """ 

70 # Remove bold: **text** → text 

71 text = re.sub(r'\*\*(.+?)\*\*', r'\1', text) 

72 # Remove italic: *text* → text 

73 text = re.sub(r'\*(.+?)\*', r'\1', text) 

74 # Remove inline code: `text` → text 

75 text = re.sub(r'`(.+?)`', r'\1', text) 

76 return text 

77 

78 

79# ------------------------------------------------------------------ 

80# ✅ NEW: Handle inline bold section headers 

81# e.g. "Key Considerations: ..." or "What to Monitor: ..." 

82# This is what phi3:mini outputs instead of ## headers 

83# ------------------------------------------------------------------ 

84 

85def _extract_bold_section_claims(text: str) -> List[Dict[str, str]]: 

86 """ 

87 Extract claims from inline section format: 

88 "Key Considerations: blah blah. What to Monitor: blah blah." 

89 

90 After markdown stripping, the bold headers become plain text like: 

91 "Key Considerations: ..." 

92 """ 

93 # Known section headers from prompt_builder.py response format 

94 section_patterns = [ 

95 (r'key\s+considerations?\s*:', 'general'), 

96 (r'what\s+to\s+monitor\s*:', 'monitoring'), 

97 (r'when\s+to\s+seek\s+medical\s+help\s*:', 'warning'), 

98 (r'safety\s+notes?\s*:', 'recommendation'), 

99 ] 

100 

101 # Build a splitter that splits on any known section header 

102 splitter = '|'.join(p for p, _ in section_patterns) 

103 full_pattern = re.compile( 

104 r'(' + splitter + r')', 

105 re.IGNORECASE 

106 ) 

107 

108 parts = full_pattern.split(text) 

109 

110 if len(parts) <= 1: 

111 return [] # No section headers found 

112 

113 claims = [] 

114 i = 1 # parts[0] is preamble before first header 

115 

116 while i < len(parts) - 1: 

117 header = parts[i].strip().lower() 

118 content = parts[i + 1].strip() if i + 1 < len(parts) else "" 

119 i += 2 

120 

121 # Determine type from header 

122 claim_type = "general" 

123 for pattern, ctype in section_patterns: 

124 if re.search(pattern, header, re.IGNORECASE): 

125 claim_type = ctype 

126 break 

127 

128 # Split content into individual sentences 

129 sentences = re.split(r'(?<=[.!?])\s+', content) 

130 for sentence in sentences: 

131 sentence = sentence.strip().rstrip('.') 

132 # Skip very short or empty sentences 

133 if len(sentence) > 20: 

134 claims.append({ 

135 "type": claim_type, 

136 "statement": sentence 

137 }) 

138 

139 return claims 

140 

141 

142def _extract_dash_based_claims(text: str) -> List[Dict[str, str]]: 

143 """ 

144 Extract claims in format: - RISK: statement 

145 """ 

146 claims = [] 

147 pattern = r'^-\s+([A-Z]+):\s+(.+)$' 

148 

149 for line in text.splitlines(): 

150 match = re.match(pattern, line.strip()) 

151 if match: 

152 claim_type = match.group(1).lower() 

153 statement = match.group(2).strip() 

154 if claim_type not in ['risk', 'monitoring', 'warning', 'recommendation']: 

155 claim_type = 'general' 

156 claims.append({"type": claim_type, "statement": statement}) 

157 

158 return claims 

159 

160 

161def _split_by_headers(text: str) -> Dict[str, str]: 

162 """ 

163 Split text into sections by markdown headers (# or ##). 

164 """ 

165 sections = {} 

166 current_section = "unknown" 

167 buffer = [] 

168 

169 for line in text.splitlines(): 

170 header_match = re.match(r'^#+\s+(.+)$', line.strip()) 

171 if header_match: 

172 if buffer: 

173 section_text = "\n".join(buffer).strip() 

174 if section_text: 

175 sections[current_section] = section_text 

176 buffer = [] 

177 current_section = header_match.group(1).lower() 

178 else: 

179 buffer.append(line) 

180 

181 if buffer: 

182 section_text = "\n".join(buffer).strip() 

183 if section_text: 

184 sections[current_section] = section_text 

185 

186 return sections 

187 

188 

189def _extract_bullet_points(text: str) -> List[str]: 

190 """ 

191 Extract bullet points from text. 

192 """ 

193 claims = [] 

194 for line in text.splitlines(): 

195 line = line.strip() 

196 if not line: 

197 continue 

198 if line[0] in ['-', '*', '•']: 

199 claim = line.lstrip('-*•').strip() 

200 if claim: 

201 claims.append(claim) 

202 elif re.match(r'^\d+\.\s+', line): 

203 claim = re.sub(r'^\d+\.\s+', '', line).strip() 

204 if claim: 

205 claims.append(claim) 

206 return claims 

207 

208 

209def _extract_smart_sentences(text: str) -> List[Dict[str, str]]: 

210 """ 

211 Smart sentence extraction with type classification (fallback). 

212 """ 

213 claims = [] 

214 sentences = re.split(r'(?<=[.!?])\s+', text) 

215 

216 for sentence in sentences: 

217 sentence = sentence.strip() 

218 if 15 < len(sentence) < 500: 

219 claims.append({ 

220 "type": _classify_sentence(sentence), 

221 "statement": sentence 

222 }) 

223 

224 return claims[:15] 

225 

226 

227def _classify_sentence(sentence: str) -> str: 

228 """ 

229 Classify a sentence into a claim type based on keywords. 

230 """ 

231 s = sentence.lower() 

232 

233 if any(w in s for w in ['risk', 'danger', 'avoid', 'contraindicated', 'caution', 'can cause', 'may cause']): 

234 return "risk" 

235 if any(w in s for w in ['monitor', 'track', 'watch', 'check', 'measure', 'test', 'observe']): 

236 return "monitoring" 

237 if any(w in s for w in ['urgent', 'immediately', 'emergency', 'seek', 'call', 'hospital']): 

238 return "warning" 

239 if any(w in s for w in ['recommend', 'suggest', 'consider', 'should', 'important', 'maintain']): 

240 return "recommendation" 

241 

242 return "general" 

243 

244 

245def _map_section_to_type(section: str) -> str: 

246 """ 

247 Map section name to claim type. 

248 """ 

249 s = section.lower() 

250 

251 if any(w in s for w in ['risk', 'concern', 'danger']): 

252 return "risk" 

253 if any(w in s for w in ['monitor', 'watch', 'track']): 

254 return "monitoring" 

255 if any(w in s for w in ['help', 'urgent', 'emergency', 'seek']): 

256 return "warning" 

257 if any(w in s for w in ['recommend', 'consider', 'suggest', 'safety', 'note']): 

258 return "recommendation" 

259 

260 return "general"