Coverage for app \ ingestion \ pubmed_ingest.py: 69%
77 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-24 13:18 +0530
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-24 13:18 +0530
1import uuid
2from typing import Iterable
4from qdrant_client.models import PointStruct
5from qdrant_client.http.exceptions import UnexpectedResponse
7from app.fetchers.pubmed_fetcher import fetch_all_pmc_articles
8from app.processing.chunker import simple_chunk
9from app.processing.embedding import embed_texts
10from app.processing.entity_extractor import extract_medical_entities
11from app.schema.schema_builder import build_payload
12from app.vector_store.qdrant_store import (
13 get_client,
14 create_collection_if_not_exists,
15 create_indexes,
16 COLLECTION,
17)
19from app.config import settings
20from app.utils.logger import get_logger
22logger = get_logger(__name__)
25# ---------------------------------------------------------------------
26# Utilities
27# ---------------------------------------------------------------------
29def _batch(iterable: Iterable, size: int):
30 """Yield items in fixed-size batches."""
31 batch = []
32 for item in iterable:
33 batch.append(item)
34 if len(batch) == size:
35 yield batch
36 batch = []
37 if batch:
38 yield batch
41# ---------------------------------------------------------------------
42# Main ingestion pipeline (QDRANT ONLY)
43# ---------------------------------------------------------------------
45def ingest_from_pubmed(query: str, max_results: int = 5) -> None:
46 """
47 Fetch PubMed Central articles, chunk them, embed them,
48 and store ONLY in Qdrant.
50 Neo4j is NOT used here by design.
51 """
52 logger.info("Starting PubMed ingestion", extra={"query": query})
54 # ---- Qdrant setup ----
55 try:
56 client = get_client()
57 create_collection_if_not_exists(client)
58 create_indexes(client)
59 except Exception:
60 logger.exception("Failed to initialize Qdrant")
61 return
63 # ---- Fetch papers ----
64 try:
65 papers = fetch_all_pmc_articles(query, max_results=max_results)
66 except Exception:
67 logger.exception("Failed to fetch PMC articles")
68 return
70 if not papers:
71 logger.warning("No PMC articles found", extra={"query": query})
72 return
74 points_buffer = []
76 # ---- Process papers ----
77 for paper in papers:
78 pmid = paper.get("pmid", "unknown")
79 title = paper.get("title", "No Title")
80 text = paper.get("abstract")
82 if not text:
83 logger.warning("Skipping paper: no text", extra={"pmid": pmid})
84 continue
86 logger.info(
87 "Processing paper",
88 extra={"pmid": pmid, "title": title[:50]},
89 )
91 chunks = simple_chunk(text)
92 if not chunks:
93 logger.warning("No chunks produced", extra={"pmid": pmid})
94 continue
96 vectors = embed_texts(chunks)
97 if len(vectors) != len(chunks):
98 logger.error("Chunk/vector mismatch", extra={"pmid": pmid})
99 continue
101 for i, (chunk, vector) in enumerate(zip(chunks, vectors)):
102 try:
103 # ---- Entity extraction (metadata only) ----
104 entities = extract_medical_entities(chunk)
106 payload = build_payload(
107 text=chunk,
108 pmid=pmid,
109 title=title,
110 journal=paper.get("journal", "Unknown"),
111 year=paper.get("year", 0),
112 authors=paper.get("authors", ["PMC Full Text"]),
113 section="Full Text",
114 chunk_index=i,
115 api_query=query,
116 entities=entities,
117 )
119 points_buffer.append(
120 PointStruct(
121 id=str(uuid.uuid4()),
122 vector=vector,
123 payload=payload,
124 )
125 )
127 except Exception:
128 logger.exception(
129 "Failed to process chunk",
130 extra={"pmid": pmid, "chunk_index": i},
131 )
133 # ---- Batch upload ----
134 for batch in _batch(points_buffer, settings.QDRANT_BATCH_SIZE):
135 try:
136 client.upsert(
137 collection_name=COLLECTION,
138 points=batch,
139 )
140 logger.info(
141 "Batch uploaded",
142 extra={"batch_size": len(batch)},
143 )
144 except UnexpectedResponse:
145 logger.exception("Qdrant upsert failed")
146 except Exception:
147 logger.exception("Unexpected error during Qdrant upsert")
149 points_buffer.clear()
151 logger.info("PubMed ingestion completed", extra={"query": query})
154# ---------------------------------------------------------------------
155# CLI entry
156# ---------------------------------------------------------------------
158if __name__ == "__main__":
159 queries = [
160 "type 2 diabetes",
161 "hypertension",
162 "heart disease",
163 "asthma",
164 "chronic kidney disease",
165]
167 for q in queries:
168 ingest_from_pubmed(query=q, max_results=5)