Coverage for app \ knowledge_graph \ wearables_graph.py: 46%

104 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-02-24 13:18 +0530

1""" 

2Wearables Graph Reader 

3 

4Purpose: 

5- Read wearable / vitals FACTS from Neo4j 

6- Compute deterministic statistical summaries 

7- NO clinical interpretation 

8- NO causal reasoning 

9 

10Design Principles: 

11- Deterministic 

12- Auditable 

13- LLM-safe (facts + computed observations only) 

14 

15Neo4j Domain: 

16Patient → WearableMetric → Reading 

17""" 

18 

19from typing import Dict, Any, List, Optional 

20from neo4j import GraphDatabase 

21import os 

22from statistics import mean 

23 

24 

25# ------------------------------------------------------------------ 

26# Neo4j connection 

27# ------------------------------------------------------------------ 

28 

29def _get_driver(): 

30 uri = os.getenv("NEO4J_URI", "bolt://localhost:7687") 

31 user = os.getenv("NEO4J_USER", "neo4j") 

32 password = os.getenv("NEO4J_PASSWORD", "password") 

33 return GraphDatabase.driver(uri, auth=(user, password)) 

34 

35 

36# ------------------------------------------------------------------ 

37# READ: Wearable FACT summaries 

38# ------------------------------------------------------------------ 

39 

40def get_wearable_summary(user_id: str) -> Dict[str, Any]: 

41 """ 

42 Fetch wearable metrics and return computed FACT summaries. 

43 No medical or behavioral interpretation is performed here. 

44 

45 Returns block compatible with prompt_builder._format_wearables() 

46 """ 

47 

48 driver = _get_driver() 

49 

50 cypher = """ 

51 MATCH (p:Patient {id: $user_id}) 

52 OPTIONAL MATCH (p)-[:HAS_METRIC]->(wm:WearableMetric) 

53 OPTIONAL MATCH (wm)-[:RECORDED_AS]->(r:Reading) 

54 RETURN 

55 wm.type AS metric_type, 

56 wm.name AS metric_name, 

57 wm.unit AS unit, 

58 wm.normalRange AS normal_range, 

59 collect({ 

60 value: toString(r.value), 

61 timestamp: toString(r.timestamp) 

62 }) AS readings 

63 ORDER BY wm.name 

64 """ 

65 

66 metrics = [] 

67 

68 with driver.session() as session: 

69 results = session.run(cypher, user_id=user_id) 

70 

71 for record in results: 

72 summary = _summarize_metric( 

73 metric_type = record["metric_type"], 

74 metric_name = record["metric_name"], 

75 unit = record["unit"], 

76 normal_range = record["normal_range"], 

77 readings = record["readings"], 

78 ) 

79 if summary: 

80 metrics.append(summary) 

81 

82 driver.close() 

83 

84 return { 

85 "available": bool(metrics), 

86 "metrics": metrics, 

87 } 

88 

89 

90# ------------------------------------------------------------------ 

91# Helpers — COMPUTED OBSERVATIONS ONLY 

92# ------------------------------------------------------------------ 

93 

94def _summarize_metric( 

95 metric_type: str, 

96 metric_name: str, 

97 unit: str, 

98 normal_range: str, 

99 readings: List[Any], 

100) -> Optional[Dict[str, Any]]: 

101 """ 

102 Compute deterministic statistical summaries for a wearable metric. 

103 

104 Handles: 

105 - Numeric values e.g. 72, 156, 8234 

106 - Blood pressure e.g. "138/88" → uses systolic for stats 

107 - String values e.g. "NSR" → trend by equality check 

108 """ 

109 

110 if not metric_type or not readings: 

111 return None 

112 

113 # Filter out null/empty readings 

114 valid_readings = [ 

115 r for r in readings 

116 if r and r.get("value") not in (None, "None", "", "null") 

117 ] 

118 

119 if not valid_readings: 

120 return None 

121 

122 # Sort by timestamp ascending so latest is last 

123 valid_readings.sort(key=lambda r: r.get("timestamp", "")) 

124 

125 raw_values = [r["value"] for r in valid_readings] 

126 timestamps = [r["timestamp"] for r in valid_readings] 

127 

128 # Build clean dated readings list for prompt_builder 

129 dated_readings = [ 

130 { 

131 "date": _clean_timestamp(r.get("timestamp", "")), 

132 "value": f"{r.get('value')} {unit or ''}".strip(), 

133 } 

134 for r in valid_readings 

135 ] 

136 

137 # Compute stats 

138 numeric_vals = _extract_numeric_values(raw_values) 

139 

140 # ── Numeric metric ─────────────────────────────────────────── 

141 if numeric_vals: 

142 avg = round(mean(numeric_vals), 1) 

143 

144 latest_value = dated_readings[-1]["value"] if dated_readings else "not recorded" 

145 previous_value = dated_readings[-2]["value"] if len(dated_readings) >= 2 else "not recorded" 

146 average_value = f"{avg} {unit or ''}".strip() 

147 trend = _compute_numeric_trend(numeric_vals) 

148 

149 return { 

150 "metric": metric_name or metric_type, 

151 "type": metric_type, 

152 "unit": unit, 

153 "normal_range": normal_range or "N/A", 

154 "latest_value": latest_value, 

155 "previous_value": previous_value, 

156 "average_value": average_value, 

157 "min_value": f"{min(numeric_vals)} {unit or ''}".strip(), 

158 "max_value": f"{max(numeric_vals)} {unit or ''}".strip(), 

159 "trend": trend, 

160 "readings_count": len(valid_readings), 

161 "readings": dated_readings, 

162 "time_range": { 

163 "start": _clean_timestamp(timestamps[0]) if timestamps else None, 

164 "end": _clean_timestamp(timestamps[-1]) if timestamps else None, 

165 }, 

166 } 

167 

168 # ── Blood pressure metric — parse systolic for stats ──────── 

169 bp_systolic = _extract_bp_systolic(raw_values) 

170 if bp_systolic: 

171 avg = round(mean(bp_systolic), 1) 

172 latest_value = dated_readings[-1]["value"] if dated_readings else "not recorded" 

173 previous_value = dated_readings[-2]["value"] if len(dated_readings) >= 2 else "not recorded" 

174 average_value = f"{avg}/{round(mean(_extract_bp_diastolic(raw_values)), 1)} {unit or ''}".strip() 

175 trend = _compute_numeric_trend(bp_systolic) 

176 

177 return { 

178 "metric": metric_name or metric_type, 

179 "type": metric_type, 

180 "unit": unit, 

181 "normal_range": normal_range or "N/A", 

182 "latest_value": latest_value, 

183 "previous_value": previous_value, 

184 "average_value": average_value, 

185 "min_value": f"{min(bp_systolic)} systolic", 

186 "max_value": f"{max(bp_systolic)} systolic", 

187 "trend": trend, 

188 "readings_count": len(valid_readings), 

189 "readings": dated_readings, 

190 "time_range": { 

191 "start": _clean_timestamp(timestamps[0]) if timestamps else None, 

192 "end": _clean_timestamp(timestamps[-1]) if timestamps else None, 

193 }, 

194 } 

195 

196 # ── String metric (ECG "NSR", etc.) ───────────────────────── 

197 latest_value = dated_readings[-1]["value"] if dated_readings else "not recorded" 

198 previous_value = dated_readings[-2]["value"] if len(dated_readings) >= 2 else "not recorded" 

199 trend = _compute_string_trend(raw_values) 

200 

201 return { 

202 "metric": metric_name or metric_type, 

203 "type": metric_type, 

204 "unit": unit, 

205 "normal_range": normal_range or "N/A", 

206 "latest_value": latest_value, 

207 "previous_value": previous_value, 

208 "average_value": "consistent readings" if trend == "stable" else "variable readings", 

209 "min_value": "N/A", 

210 "max_value": "N/A", 

211 "trend": trend, 

212 "readings_count": len(valid_readings), 

213 "readings": dated_readings, 

214 "time_range": { 

215 "start": _clean_timestamp(timestamps[0]) if timestamps else None, 

216 "end": _clean_timestamp(timestamps[-1]) if timestamps else None, 

217 }, 

218 } 

219 

220 

221# ------------------------------------------------------------------ 

222# Numeric helpers 

223# ------------------------------------------------------------------ 

224 

225def _extract_numeric_values(raw_values: List[str]) -> List[float]: 

226 """ 

227 Extract plain numeric values (non-BP, non-string). 

228 Skips "138/88" and "NSR" cleanly. 

229 """ 

230 values = [] 

231 for v in raw_values: 

232 v = str(v).strip() 

233 if "/" in v: 

234 continue # BP format — handled separately 

235 try: 

236 values.append(float(v)) 

237 except ValueError: 

238 pass # String like "NSR" — skip cleanly 

239 return values 

240 

241 

242def _extract_bp_systolic(raw_values: List[str]) -> List[float]: 

243 """Extract systolic (first number) from BP readings like '138/88'.""" 

244 values = [] 

245 for v in raw_values: 

246 v = str(v).strip() 

247 if "/" in v: 

248 try: 

249 values.append(float(v.split("/")[0])) 

250 except ValueError: 

251 pass 

252 return values 

253 

254 

255def _extract_bp_diastolic(raw_values: List[str]) -> List[float]: 

256 """Extract diastolic (second number) from BP readings like '138/88'.""" 

257 values = [] 

258 for v in raw_values: 

259 v = str(v).strip() 

260 if "/" in v: 

261 try: 

262 values.append(float(v.split("/")[1])) 

263 except ValueError: 

264 pass 

265 return values 

266 

267 

268def _compute_numeric_trend(values: List[float]) -> str: 

269 """ 

270 Clean human-readable trend — never exposes raw internal labels. 

271 Uses 5% threshold to avoid noise on small fluctuations. 

272 """ 

273 if len(values) < 2: 

274 return "monitoring ongoing — more readings needed" 

275 

276 first = values[0] 

277 last = values[-1] 

278 

279 if first == 0: 

280 return "monitoring ongoing — more readings needed" 

281 

282 diff = last - first 

283 pct = abs(diff / first) * 100 

284 

285 if pct < 2: 

286 return "stable" 

287 elif diff > 0: 

288 return f"increasing ({pct:.1f}% rise over recorded period)" 

289 else: 

290 return f"decreasing ({pct:.1f}% drop over recorded period)" 

291 

292 

293def _compute_string_trend(values: List[str]) -> str: 

294 """ 

295 For non-numeric readings — check if value changed. 

296 Never exposes raw internal labels. 

297 """ 

298 if len(values) < 2: 

299 return "monitoring ongoing — more readings needed" 

300 return "stable" if values[0] == values[-1] else "changed between readings" 

301 

302 

303# ------------------------------------------------------------------ 

304# Timestamp helper 

305# ------------------------------------------------------------------ 

306 

307def _clean_timestamp(ts: str) -> str: 

308 """ 

309 Convert Neo4j timestamp to readable date. 

310 "2026-02-08T08:00:00Z" → "2026-02-08" 

311 """ 

312 if not ts or ts in ("None", "null", ""): 

313 return "unknown date" 

314 return str(ts)[:10]