Coverage for app \ ingestion \ pubmed_ingest.py: 69%

77 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-02-24 13:18 +0530

1import uuid 

2from typing import Iterable 

3 

4from qdrant_client.models import PointStruct 

5from qdrant_client.http.exceptions import UnexpectedResponse 

6 

7from app.fetchers.pubmed_fetcher import fetch_all_pmc_articles 

8from app.processing.chunker import simple_chunk 

9from app.processing.embedding import embed_texts 

10from app.processing.entity_extractor import extract_medical_entities 

11from app.schema.schema_builder import build_payload 

12from app.vector_store.qdrant_store import ( 

13 get_client, 

14 create_collection_if_not_exists, 

15 create_indexes, 

16 COLLECTION, 

17) 

18 

19from app.config import settings 

20from app.utils.logger import get_logger 

21 

22logger = get_logger(__name__) 

23 

24 

25# --------------------------------------------------------------------- 

26# Utilities 

27# --------------------------------------------------------------------- 

28 

29def _batch(iterable: Iterable, size: int): 

30 """Yield items in fixed-size batches.""" 

31 batch = [] 

32 for item in iterable: 

33 batch.append(item) 

34 if len(batch) == size: 

35 yield batch 

36 batch = [] 

37 if batch: 

38 yield batch 

39 

40 

41# --------------------------------------------------------------------- 

42# Main ingestion pipeline (QDRANT ONLY) 

43# --------------------------------------------------------------------- 

44 

45def ingest_from_pubmed(query: str, max_results: int = 5) -> None: 

46 """ 

47 Fetch PubMed Central articles, chunk them, embed them, 

48 and store ONLY in Qdrant. 

49 

50 Neo4j is NOT used here by design. 

51 """ 

52 logger.info("Starting PubMed ingestion", extra={"query": query}) 

53 

54 # ---- Qdrant setup ---- 

55 try: 

56 client = get_client() 

57 create_collection_if_not_exists(client) 

58 create_indexes(client) 

59 except Exception: 

60 logger.exception("Failed to initialize Qdrant") 

61 return 

62 

63 # ---- Fetch papers ---- 

64 try: 

65 papers = fetch_all_pmc_articles(query, max_results=max_results) 

66 except Exception: 

67 logger.exception("Failed to fetch PMC articles") 

68 return 

69 

70 if not papers: 

71 logger.warning("No PMC articles found", extra={"query": query}) 

72 return 

73 

74 points_buffer = [] 

75 

76 # ---- Process papers ---- 

77 for paper in papers: 

78 pmid = paper.get("pmid", "unknown") 

79 title = paper.get("title", "No Title") 

80 text = paper.get("abstract") 

81 

82 if not text: 

83 logger.warning("Skipping paper: no text", extra={"pmid": pmid}) 

84 continue 

85 

86 logger.info( 

87 "Processing paper", 

88 extra={"pmid": pmid, "title": title[:50]}, 

89 ) 

90 

91 chunks = simple_chunk(text) 

92 if not chunks: 

93 logger.warning("No chunks produced", extra={"pmid": pmid}) 

94 continue 

95 

96 vectors = embed_texts(chunks) 

97 if len(vectors) != len(chunks): 

98 logger.error("Chunk/vector mismatch", extra={"pmid": pmid}) 

99 continue 

100 

101 for i, (chunk, vector) in enumerate(zip(chunks, vectors)): 

102 try: 

103 # ---- Entity extraction (metadata only) ---- 

104 entities = extract_medical_entities(chunk) 

105 

106 payload = build_payload( 

107 text=chunk, 

108 pmid=pmid, 

109 title=title, 

110 journal=paper.get("journal", "Unknown"), 

111 year=paper.get("year", 0), 

112 authors=paper.get("authors", ["PMC Full Text"]), 

113 section="Full Text", 

114 chunk_index=i, 

115 api_query=query, 

116 entities=entities, 

117 ) 

118 

119 points_buffer.append( 

120 PointStruct( 

121 id=str(uuid.uuid4()), 

122 vector=vector, 

123 payload=payload, 

124 ) 

125 ) 

126 

127 except Exception: 

128 logger.exception( 

129 "Failed to process chunk", 

130 extra={"pmid": pmid, "chunk_index": i}, 

131 ) 

132 

133 # ---- Batch upload ---- 

134 for batch in _batch(points_buffer, settings.QDRANT_BATCH_SIZE): 

135 try: 

136 client.upsert( 

137 collection_name=COLLECTION, 

138 points=batch, 

139 ) 

140 logger.info( 

141 "Batch uploaded", 

142 extra={"batch_size": len(batch)}, 

143 ) 

144 except UnexpectedResponse: 

145 logger.exception("Qdrant upsert failed") 

146 except Exception: 

147 logger.exception("Unexpected error during Qdrant upsert") 

148 

149 points_buffer.clear() 

150 

151 logger.info("PubMed ingestion completed", extra={"query": query}) 

152 

153 

154# --------------------------------------------------------------------- 

155# CLI entry 

156# --------------------------------------------------------------------- 

157 

158if __name__ == "__main__": 

159 queries = [ 

160 "type 2 diabetes", 

161 "hypertension", 

162 "heart disease", 

163 "asthma", 

164 "chronic kidney disease", 

165] 

166 

167 for q in queries: 

168 ingest_from_pubmed(query=q, max_results=5) 

169 

170