Coverage for app \ knowledge_graph \ wearables_graph.py: 46%
104 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-24 13:18 +0530
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-24 13:18 +0530
1"""
2Wearables Graph Reader
4Purpose:
5- Read wearable / vitals FACTS from Neo4j
6- Compute deterministic statistical summaries
7- NO clinical interpretation
8- NO causal reasoning
10Design Principles:
11- Deterministic
12- Auditable
13- LLM-safe (facts + computed observations only)
15Neo4j Domain:
16Patient → WearableMetric → Reading
17"""
19from typing import Dict, Any, List, Optional
20from neo4j import GraphDatabase
21import os
22from statistics import mean
25# ------------------------------------------------------------------
26# Neo4j connection
27# ------------------------------------------------------------------
29def _get_driver():
30 uri = os.getenv("NEO4J_URI", "bolt://localhost:7687")
31 user = os.getenv("NEO4J_USER", "neo4j")
32 password = os.getenv("NEO4J_PASSWORD", "password")
33 return GraphDatabase.driver(uri, auth=(user, password))
36# ------------------------------------------------------------------
37# READ: Wearable FACT summaries
38# ------------------------------------------------------------------
40def get_wearable_summary(user_id: str) -> Dict[str, Any]:
41 """
42 Fetch wearable metrics and return computed FACT summaries.
43 No medical or behavioral interpretation is performed here.
45 Returns block compatible with prompt_builder._format_wearables()
46 """
48 driver = _get_driver()
50 cypher = """
51 MATCH (p:Patient {id: $user_id})
52 OPTIONAL MATCH (p)-[:HAS_METRIC]->(wm:WearableMetric)
53 OPTIONAL MATCH (wm)-[:RECORDED_AS]->(r:Reading)
54 RETURN
55 wm.type AS metric_type,
56 wm.name AS metric_name,
57 wm.unit AS unit,
58 wm.normalRange AS normal_range,
59 collect({
60 value: toString(r.value),
61 timestamp: toString(r.timestamp)
62 }) AS readings
63 ORDER BY wm.name
64 """
66 metrics = []
68 with driver.session() as session:
69 results = session.run(cypher, user_id=user_id)
71 for record in results:
72 summary = _summarize_metric(
73 metric_type = record["metric_type"],
74 metric_name = record["metric_name"],
75 unit = record["unit"],
76 normal_range = record["normal_range"],
77 readings = record["readings"],
78 )
79 if summary:
80 metrics.append(summary)
82 driver.close()
84 return {
85 "available": bool(metrics),
86 "metrics": metrics,
87 }
90# ------------------------------------------------------------------
91# Helpers — COMPUTED OBSERVATIONS ONLY
92# ------------------------------------------------------------------
94def _summarize_metric(
95 metric_type: str,
96 metric_name: str,
97 unit: str,
98 normal_range: str,
99 readings: List[Any],
100) -> Optional[Dict[str, Any]]:
101 """
102 Compute deterministic statistical summaries for a wearable metric.
104 Handles:
105 - Numeric values e.g. 72, 156, 8234
106 - Blood pressure e.g. "138/88" → uses systolic for stats
107 - String values e.g. "NSR" → trend by equality check
108 """
110 if not metric_type or not readings:
111 return None
113 # Filter out null/empty readings
114 valid_readings = [
115 r for r in readings
116 if r and r.get("value") not in (None, "None", "", "null")
117 ]
119 if not valid_readings:
120 return None
122 # Sort by timestamp ascending so latest is last
123 valid_readings.sort(key=lambda r: r.get("timestamp", ""))
125 raw_values = [r["value"] for r in valid_readings]
126 timestamps = [r["timestamp"] for r in valid_readings]
128 # Build clean dated readings list for prompt_builder
129 dated_readings = [
130 {
131 "date": _clean_timestamp(r.get("timestamp", "")),
132 "value": f"{r.get('value')} {unit or ''}".strip(),
133 }
134 for r in valid_readings
135 ]
137 # Compute stats
138 numeric_vals = _extract_numeric_values(raw_values)
140 # ── Numeric metric ───────────────────────────────────────────
141 if numeric_vals:
142 avg = round(mean(numeric_vals), 1)
144 latest_value = dated_readings[-1]["value"] if dated_readings else "not recorded"
145 previous_value = dated_readings[-2]["value"] if len(dated_readings) >= 2 else "not recorded"
146 average_value = f"{avg} {unit or ''}".strip()
147 trend = _compute_numeric_trend(numeric_vals)
149 return {
150 "metric": metric_name or metric_type,
151 "type": metric_type,
152 "unit": unit,
153 "normal_range": normal_range or "N/A",
154 "latest_value": latest_value,
155 "previous_value": previous_value,
156 "average_value": average_value,
157 "min_value": f"{min(numeric_vals)} {unit or ''}".strip(),
158 "max_value": f"{max(numeric_vals)} {unit or ''}".strip(),
159 "trend": trend,
160 "readings_count": len(valid_readings),
161 "readings": dated_readings,
162 "time_range": {
163 "start": _clean_timestamp(timestamps[0]) if timestamps else None,
164 "end": _clean_timestamp(timestamps[-1]) if timestamps else None,
165 },
166 }
168 # ── Blood pressure metric — parse systolic for stats ────────
169 bp_systolic = _extract_bp_systolic(raw_values)
170 if bp_systolic:
171 avg = round(mean(bp_systolic), 1)
172 latest_value = dated_readings[-1]["value"] if dated_readings else "not recorded"
173 previous_value = dated_readings[-2]["value"] if len(dated_readings) >= 2 else "not recorded"
174 average_value = f"{avg}/{round(mean(_extract_bp_diastolic(raw_values)), 1)} {unit or ''}".strip()
175 trend = _compute_numeric_trend(bp_systolic)
177 return {
178 "metric": metric_name or metric_type,
179 "type": metric_type,
180 "unit": unit,
181 "normal_range": normal_range or "N/A",
182 "latest_value": latest_value,
183 "previous_value": previous_value,
184 "average_value": average_value,
185 "min_value": f"{min(bp_systolic)} systolic",
186 "max_value": f"{max(bp_systolic)} systolic",
187 "trend": trend,
188 "readings_count": len(valid_readings),
189 "readings": dated_readings,
190 "time_range": {
191 "start": _clean_timestamp(timestamps[0]) if timestamps else None,
192 "end": _clean_timestamp(timestamps[-1]) if timestamps else None,
193 },
194 }
196 # ── String metric (ECG "NSR", etc.) ─────────────────────────
197 latest_value = dated_readings[-1]["value"] if dated_readings else "not recorded"
198 previous_value = dated_readings[-2]["value"] if len(dated_readings) >= 2 else "not recorded"
199 trend = _compute_string_trend(raw_values)
201 return {
202 "metric": metric_name or metric_type,
203 "type": metric_type,
204 "unit": unit,
205 "normal_range": normal_range or "N/A",
206 "latest_value": latest_value,
207 "previous_value": previous_value,
208 "average_value": "consistent readings" if trend == "stable" else "variable readings",
209 "min_value": "N/A",
210 "max_value": "N/A",
211 "trend": trend,
212 "readings_count": len(valid_readings),
213 "readings": dated_readings,
214 "time_range": {
215 "start": _clean_timestamp(timestamps[0]) if timestamps else None,
216 "end": _clean_timestamp(timestamps[-1]) if timestamps else None,
217 },
218 }
221# ------------------------------------------------------------------
222# Numeric helpers
223# ------------------------------------------------------------------
225def _extract_numeric_values(raw_values: List[str]) -> List[float]:
226 """
227 Extract plain numeric values (non-BP, non-string).
228 Skips "138/88" and "NSR" cleanly.
229 """
230 values = []
231 for v in raw_values:
232 v = str(v).strip()
233 if "/" in v:
234 continue # BP format — handled separately
235 try:
236 values.append(float(v))
237 except ValueError:
238 pass # String like "NSR" — skip cleanly
239 return values
242def _extract_bp_systolic(raw_values: List[str]) -> List[float]:
243 """Extract systolic (first number) from BP readings like '138/88'."""
244 values = []
245 for v in raw_values:
246 v = str(v).strip()
247 if "/" in v:
248 try:
249 values.append(float(v.split("/")[0]))
250 except ValueError:
251 pass
252 return values
255def _extract_bp_diastolic(raw_values: List[str]) -> List[float]:
256 """Extract diastolic (second number) from BP readings like '138/88'."""
257 values = []
258 for v in raw_values:
259 v = str(v).strip()
260 if "/" in v:
261 try:
262 values.append(float(v.split("/")[1]))
263 except ValueError:
264 pass
265 return values
268def _compute_numeric_trend(values: List[float]) -> str:
269 """
270 Clean human-readable trend — never exposes raw internal labels.
271 Uses 5% threshold to avoid noise on small fluctuations.
272 """
273 if len(values) < 2:
274 return "monitoring ongoing — more readings needed"
276 first = values[0]
277 last = values[-1]
279 if first == 0:
280 return "monitoring ongoing — more readings needed"
282 diff = last - first
283 pct = abs(diff / first) * 100
285 if pct < 2:
286 return "stable"
287 elif diff > 0:
288 return f"increasing ({pct:.1f}% rise over recorded period)"
289 else:
290 return f"decreasing ({pct:.1f}% drop over recorded period)"
293def _compute_string_trend(values: List[str]) -> str:
294 """
295 For non-numeric readings — check if value changed.
296 Never exposes raw internal labels.
297 """
298 if len(values) < 2:
299 return "monitoring ongoing — more readings needed"
300 return "stable" if values[0] == values[-1] else "changed between readings"
303# ------------------------------------------------------------------
304# Timestamp helper
305# ------------------------------------------------------------------
307def _clean_timestamp(ts: str) -> str:
308 """
309 Convert Neo4j timestamp to readable date.
310 "2026-02-08T08:00:00Z" → "2026-02-08"
311 """
312 if not ts or ts in ("None", "null", ""):
313 return "unknown date"
314 return str(ts)[:10]