Coverage for src/local_deep_research/citation_handlers/precision_extraction_handler.py: 97%

210 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-03 23:15 +0000

1""" 

2Precision Extraction Citation Handler 

3 

4This handler focuses on extracting precise, complete answers for SimpleQA-style questions. 

5It includes specialized extractors for: 

6- Full names (including middle names) 

7- Single answers when only one is requested 

8- Dimension-aware measurements 

9- Specific entities without extra information 

10""" 

11 

12import re 

13from datetime import datetime, timezone 

14from typing import Any, Dict, List, Union 

15 

16from loguru import logger 

17 

18from .base_citation_handler import BaseCitationHandler 

19 

20 

21class PrecisionExtractionHandler(BaseCitationHandler): 

22 """Citation handler optimized for precise answer extraction.""" 

23 

24 def __init__(self, *args, **kwargs): 

25 super().__init__(*args, **kwargs) 

26 

27 # Answer type patterns 

28 self.answer_patterns = { 

29 "full_name": re.compile( 

30 r"\b([A-Z][a-z]+(?:\s+[A-Z][a-z]+){1,4})\b" 

31 ), 

32 "year": re.compile(r"\b(19\d{2}|20\d{2})\b"), 

33 "number": re.compile(r"\b(\d+(?:\.\d+)?)\b"), 

34 "dimension": re.compile( 

35 r"(\d+(?:\.\d+)?)\s*(meters?|feet|inches|cm|km|miles?|m|ft|kg|pounds?|lbs?)", 

36 re.I, 

37 ), 

38 "score": re.compile(r"(\d+)\s*[-–]\s*(\d+)"), 

39 "percentage": re.compile(r"(\d+(?:\.\d+)?)\s*%"), 

40 "location": re.compile(r"\b([A-Z][a-z]+(?:\s+[A-Z][a-z]+)*)\b"), 

41 } 

42 

43 def analyze_initial( 

44 self, query: str, search_results: Union[str, List[Dict]] 

45 ) -> Dict[str, Any]: 

46 """Initial analysis with precision extraction.""" 

47 documents = self._create_documents(search_results) 

48 formatted_sources = self._format_sources(documents) 

49 

50 # Determine question type for targeted extraction 

51 question_type = self._identify_question_type(query) 

52 

53 current_timestamp = datetime.now(timezone.utc).strftime( 

54 "%Y-%m-%d %H:%M" 

55 ) 

56 

57 output_prefix = self._get_output_instruction_prefix() 

58 

59 prompt = f"""{output_prefix}Analyze the following information and provide a PRECISE answer to the question. Include citations using numbers in square brackets [1], [2], etc. 

60 

61Question: {query} 

62Question Type: {question_type} 

63 

64Sources: 

65{formatted_sources} 

66 

67Current time is {current_timestamp} UTC for verifying temporal references in sources. 

68 

69PRECISION INSTRUCTIONS: 

701. Extract the EXACT answer as it appears in the sources 

712. For names: Include FULL names with all parts (first, middle, last) 

723. For numbers: Include exact values with units if present 

734. For single-answer questions: Provide ONLY ONE answer, not multiple options 

745. For dimensions: Specify the exact measurement type (height, length, width) 

756. Citations should support the specific answer given 

76 

77Format: Start with the direct, precise answer, then explain with citations.""" 

78 

79 response = self._invoke_with_streaming(prompt) 

80 

81 # Apply precision extraction if needed 

82 response = self._apply_precision_extraction( 

83 response, query, question_type, formatted_sources 

84 ) 

85 

86 return {"content": response, "documents": documents} 

87 

88 def analyze_followup( 

89 self, 

90 question: str, 

91 search_results: Union[str, List[Dict]], 

92 previous_knowledge: str, 

93 nr_of_links: int, 

94 ) -> Dict[str, Any]: 

95 """Follow-up analysis with precision extraction.""" 

96 documents = self._create_documents( 

97 search_results, nr_of_links=nr_of_links 

98 ) 

99 formatted_sources = self._format_sources(documents) 

100 

101 question_type = self._identify_question_type(question) 

102 

103 # Extract key facts from previous knowledge 

104 key_facts = self._extract_key_facts(previous_knowledge, question_type) 

105 

106 current_timestamp = datetime.now(timezone.utc).strftime( 

107 "%Y-%m-%d %H:%M" 

108 ) 

109 

110 output_prefix = self._get_output_instruction_prefix() 

111 

112 prompt = f"""{output_prefix}Using the previous knowledge and new sources, provide a PRECISE answer to the question. 

113 

114Previous Key Facts: 

115{key_facts} 

116 

117Question: {question} 

118Question Type: {question_type} 

119 

120New Sources: 

121{formatted_sources} 

122 

123Current time is {current_timestamp} UTC for verifying temporal references in sources. 

124 

125PRECISION REQUIREMENTS: 

1261. Build on previous knowledge to provide the MOST COMPLETE answer 

1272. If a full name was partially found before, complete it now 

1283. If multiple candidates exist, select the one with the MOST evidence 

1294. For measurements, ensure units and dimension types match the question 

1305. Reconcile any conflicts by choosing the most frequently cited answer 

131 

132Provide the precise answer with citations. Do not create the bibliography, it will be provided automatically.""" 

133 

134 content = self._invoke_with_streaming(prompt) 

135 

136 # Apply precision extraction 

137 content = self._apply_precision_extraction( 

138 content, question, question_type, formatted_sources 

139 ) 

140 

141 return {"content": content, "documents": documents} 

142 

143 def _identify_question_type(self, query: str) -> str: 

144 """Identify the type of question for targeted extraction.""" 

145 query_lower = query.lower() 

146 

147 # Name questions 

148 if any( 

149 phrase in query_lower 

150 for phrase in ["full name", "name of", "who was", "who is"] 

151 ): 

152 if "full name" in query_lower: 

153 return "full_name" 

154 return "name" 

155 

156 # Location questions 

157 if any( 

158 phrase in query_lower 

159 for phrase in ["where", "location", "city", "country", "place"] 

160 ): 

161 return "location" 

162 

163 # Temporal questions 

164 if any(phrase in query_lower for phrase in ["when", "year", "date"]): 

165 return "temporal" 

166 

167 # Numerical questions 

168 if any( 

169 phrase in query_lower 

170 for phrase in ["how many", "how much", "number", "count"] 

171 ): 

172 return "number" 

173 

174 # Score/result questions 

175 if any( 

176 phrase in query_lower 

177 for phrase in ["score", "result", "final", "outcome"] 

178 ): 

179 return "score" 

180 

181 # Dimension questions 

182 if any( 

183 phrase in query_lower 

184 for phrase in [ 

185 "height", 

186 "length", 

187 "width", 

188 "size", 

189 "tall", 

190 "long", 

191 "wide", 

192 ] 

193 ): 

194 return "dimension" 

195 

196 # Single answer questions 

197 if query_lower.startswith("which") and "one" in query_lower: 

198 return "single_choice" 

199 

200 return "general" 

201 

202 def _apply_precision_extraction( 

203 self, content: str, query: str, question_type: str, sources: str 

204 ) -> str: 

205 """Apply precision extraction based on question type.""" 

206 

207 if question_type == "full_name": 

208 return self._extract_full_name(content, query, sources) 

209 if question_type == "name": 

210 return self._extract_best_name(content, query, sources) 

211 if question_type == "single_choice": 

212 return self._extract_single_answer(content, query, sources) 

213 if question_type == "dimension": 

214 return self._extract_dimension(content, query, sources) 

215 if question_type == "score": 

216 return self._extract_score(content, query, sources) 

217 if question_type == "temporal": 

218 return self._extract_temporal(content, query, sources) 

219 if question_type == "number": 

220 return self._extract_number(content, query, sources) 

221 

222 return content 

223 

224 def _extract_full_name(self, content: str, query: str, sources: str) -> str: 

225 """Extract complete full names.""" 

226 # First, use LLM to identify all name variations 

227 extraction_prompt = f"""Find ALL variations of the person's name mentioned in the sources. 

228 

229Question: {query} 

230 

231Content: {content[:2000]} 

232Sources: {sources[:2000]} 

233 

234List all name variations found: 

2351. Shortest version: 

2362. Longest/most complete version: 

2373. Most frequently mentioned version: 

238 

239Which is the FULL name (including middle name if present)?""" 

240 

241 try: 

242 extraction = self._invoke_text(extraction_prompt) 

243 extraction_lower = extraction.lower() 

244 

245 # Extract the identified full name 

246 if "full name" in extraction_lower: 246 ↛ 262line 246 didn't jump to line 262 because the condition on line 246 was always true

247 lines = extraction.split("\n") 

248 for line in lines: 

249 if "full name" in line.lower() or "longest" in line.lower(): 249 ↛ 248line 249 didn't jump to line 248 because the condition on line 249 was always true

250 # Extract name from this line 

251 matches = self.answer_patterns["full_name"].findall( 

252 line 

253 ) 

254 if matches: 

255 # Choose the longest match 

256 full_name = max( 

257 matches, key=lambda x: len(x.split()) 

258 ) 

259 return f"{full_name}. {content}" 

260 

261 # Fallback: find all names and pick the longest 

262 all_names = self.answer_patterns["full_name"].findall( 

263 content + " " + sources 

264 ) 

265 if all_names: 265 ↛ 287line 265 didn't jump to line 287 because the condition on line 265 was always true

266 # Group similar names and pick the longest variant 

267 name_groups: Dict[str, List[str]] = {} 

268 for name in all_names: 

269 last_word = name.split()[-1] 

270 if last_word not in name_groups: 

271 name_groups[last_word] = [] 

272 name_groups[last_word].append(name) 

273 

274 # Find the group with the most complete name 

275 best_name = "" 

276 for group in name_groups.values(): 

277 longest_in_group = max(group, key=lambda x: len(x.split())) 

278 if len(longest_in_group.split()) > len(best_name.split()): 278 ↛ 276line 278 didn't jump to line 276 because the condition on line 278 was always true

279 best_name = longest_in_group 

280 

281 if best_name: 281 ↛ 287line 281 didn't jump to line 287 because the condition on line 281 was always true

282 return f"{best_name}. {content}" 

283 

284 except Exception: 

285 logger.exception("Error in full name extraction") 

286 

287 return content 

288 

289 def _extract_single_answer( 

290 self, content: str, query: str, sources: str 

291 ) -> str: 

292 """Extract a single answer when multiple options might be present.""" 

293 extraction_prompt = f"""The question asks for ONE specific answer. Extract ONLY that answer. 

294 

295Question: {query} 

296Content: {content[:1500]} 

297 

298Rules: 

2991. If multiple items are listed, identify which ONE actually answers the question 

3002. Look for the PRIMARY or FIRST mentioned item 

3013. Do not include alternatives or additional options 

302 

303The single answer is:""" 

304 

305 try: 

306 answer = self._invoke_text(extraction_prompt) 

307 if not answer: 

308 return content 

309 

310 # Clean up the answer 

311 answer = answer.split(",")[ 

312 0 

313 ].strip() # Take only first if comma-separated 

314 answer = answer.split(" and ")[ 

315 0 

316 ].strip() # Take only first if "and"-separated 

317 answer = answer.split(" or ")[ 

318 0 

319 ].strip() # Take only first if "or"-separated 

320 

321 return f"{answer}. {content}" 

322 

323 except Exception: 

324 logger.exception("Error in single answer extraction") 

325 

326 return content 

327 

328 def _extract_dimension(self, content: str, query: str, sources: str) -> str: 

329 """Extract specific dimensions with correct units and context awareness.""" 

330 # Enhanced dimension type detection 

331 dimension_types = { 

332 "height": ["height", "tall", "high", "elevation", "altitude"], 

333 "length": ["length", "long", "distance", "reach", "span"], 

334 "width": ["width", "wide", "breadth", "diameter"], 

335 "depth": ["depth", "deep", "thickness"], 

336 "weight": ["weight", "weigh", "heavy", "mass"], 

337 "speed": ["speed", "fast", "velocity", "mph", "kmh"], 

338 "area": ["area", "square"], 

339 "volume": ["volume", "cubic"], 

340 } 

341 

342 query_lower = query.lower() 

343 dimension_type = None 

344 dimension_keywords = [] 

345 

346 # Find the most specific dimension type 

347 for dim_type, keywords in dimension_types.items(): 

348 matching_keywords = [kw for kw in keywords if kw in query_lower] 

349 if matching_keywords: 

350 dimension_type = dim_type 

351 dimension_keywords = matching_keywords 

352 break 

353 

354 extraction_prompt = f"""Extract the EXACT measurement that answers this question. 

355 

356Question: {query} 

357Content: {content[:1500]} 

358 

359Rules: 

3601. Find the specific {dimension_type or "dimension"} measurement 

3612. Return ONLY the number and unit (e.g., "20 meters", "5.5 feet") 

3623. Distinguish between different types of measurements: 

363 - Height/tall: vertical measurements 

364 - Length/long: horizontal distance 

365 - Width/wide: horizontal breadth 

3664. Look for context clues near the measurement 

3675. If multiple measurements, choose the one that matches the question type 

368 

369The exact {dimension_type or "dimension"} is:""" 

370 

371 try: 

372 answer = self._invoke_text(extraction_prompt) 

373 

374 # Clean and validate the answer 

375 import re 

376 

377 measurement_match = re.search( 

378 r"(\d+(?:\.\d+)?)\s*([a-zA-Z/°]+)", answer 

379 ) 

380 if measurement_match: 

381 number, unit = measurement_match.groups() 

382 clean_answer = f"{number} {unit}" 

383 return f"{clean_answer}. {content}" 

384 

385 # Fallback: intelligent pattern matching 

386 all_dimensions = self.answer_patterns["dimension"].findall( 

387 content + " " + sources 

388 ) 

389 if all_dimensions: 

390 # Score dimensions based on context and dimension type 

391 scored_dimensions = [] 

392 

393 for dim in all_dimensions: 

394 number, unit = dim 

395 dim_str = f"{number} {unit}" 

396 score = 0 

397 

398 # Find the dimension in content 

399 pos = content.find(dim_str) 

400 if pos >= 0: 400 ↛ 452line 400 didn't jump to line 452 because the condition on line 400 was always true

401 # Get context around this measurement 

402 context = content[max(0, pos - 100) : pos + 100].lower() 

403 

404 # Score based on dimension keywords in context 

405 for keyword in dimension_keywords: 

406 if keyword in context: 

407 score += 10 

408 

409 # Score based on unit appropriateness 

410 unit_lower = unit.lower() 

411 if ( 

412 ( 

413 dimension_type == "height" 

414 and any( 

415 u in unit_lower 

416 for u in ["m", "meter", "ft", "feet", "cm"] 

417 ) 

418 ) 

419 or ( 

420 dimension_type == "length" 

421 and any( 

422 u in unit_lower 

423 for u in ["m", "meter", "km", "mile", "ft"] 

424 ) 

425 ) 

426 or ( 

427 dimension_type == "weight" 

428 and any( 

429 u in unit_lower 

430 for u in [ 

431 "kg", 

432 "lb", 

433 "pound", 

434 "gram", 

435 "ton", 

436 ] 

437 ) 

438 ) 

439 or ( 

440 dimension_type == "speed" 

441 and any( 

442 u in unit_lower 

443 for u in ["mph", "kmh", "km/h", "m/s"] 

444 ) 

445 ) 

446 ): 

447 score += 5 

448 

449 # Prefer measurements closer to the beginning (more likely to be primary) 

450 score += max(0, 5 - (pos // 100)) 

451 

452 scored_dimensions.append((score, dim_str)) 

453 

454 # Return the highest scoring dimension 

455 if scored_dimensions: 455 ↛ 461line 455 didn't jump to line 461 because the condition on line 455 was always true

456 scored_dimensions.sort(key=lambda x: x[0], reverse=True) 

457 best_dimension = scored_dimensions[0][1] 

458 return f"{best_dimension}. {content}" 

459 

460 # Final fallback: first dimension 

461 return ( 

462 f"{all_dimensions[0][0]} {all_dimensions[0][1]}. {content}" 

463 ) 

464 

465 except Exception: 

466 logger.exception("Error in dimension extraction") 

467 

468 return content 

469 

470 def _extract_score(self, content: str, query: str, sources: str) -> str: 

471 """Extract game scores or results.""" 

472 # Find all score patterns 

473 scores = self.answer_patterns["score"].findall(content + " " + sources) 

474 

475 if scores: 

476 # Use LLM to identify the correct score 

477 extraction_prompt = f"""Which score/result answers this question? 

478 

479Question: {query} 

480Found scores: {scores} 

481Context: {content[:1000]} 

482 

483The answer is:""" 

484 

485 try: 

486 answer = self._invoke_text(extraction_prompt) 

487 if not answer: 

488 return f"{scores[0][0]}-{scores[0][1]}. {content}" 

489 return f"{answer}. {content}" 

490 except Exception: 

491 # Return first score found if LLM extraction fails 

492 return f"{scores[0][0]}-{scores[0][1]}. {content}" 

493 

494 return content 

495 

496 def _extract_temporal(self, content: str, query: str, sources: str) -> str: 

497 """Extract dates or years.""" 

498 # Find all year patterns 

499 years = self.answer_patterns["year"].findall(content + " " + sources) 

500 

501 if years: 

502 # Use LLM to pick the right one 

503 extraction_prompt = f"""Which date/year specifically answers this question? 

504 

505Question: {query} 

506Found years: {set(years)} 

507Context: {content[:1000]} 

508 

509The answer is:""" 

510 

511 try: 

512 answer = self._invoke_text(extraction_prompt) 

513 if not answer: 

514 return f"{years[0]}. {content}" 

515 # Clean to just the year/date 

516 year_match = self.answer_patterns["year"].search(answer) 

517 if year_match: 

518 return f"{year_match.group()}. {content}" 

519 return f"{answer}. {content}" 

520 except Exception: 

521 # Fallback to first found year if LLM extraction fails 

522 return f"{years[0]}. {content}" 

523 

524 return content 

525 

526 def _extract_number(self, content: str, query: str, sources: str) -> str: 

527 """Extract specific numbers.""" 

528 # Find all numbers 

529 numbers = self.answer_patterns["number"].findall( 

530 content + " " + sources 

531 ) 

532 

533 if numbers: 

534 extraction_prompt = f"""Which number specifically answers this question? 

535 

536Question: {query} 

537Found numbers: {numbers[:10]} 

538Context: {content[:1000]} 

539 

540The answer is:""" 

541 

542 try: 

543 answer = self._invoke_text(extraction_prompt) 

544 if not answer: 

545 return f"{numbers[0]}. {content}" 

546 return f"{answer}. {content}" 

547 except Exception: 

548 # Fallback to first found number if LLM extraction fails 

549 return f"{numbers[0]}. {content}" 

550 

551 return content 

552 

553 def _extract_best_name(self, content: str, query: str, sources: str) -> str: 

554 """Extract the best matching name (not necessarily full).""" 

555 # Find all potential names 

556 names = self.answer_patterns["full_name"].findall( 

557 content + " " + sources 

558 ) 

559 

560 if names: 

561 # Count frequency 

562 name_counts: Dict[str, int] = {} 

563 for name in names: 

564 name_counts[name] = name_counts.get(name, 0) + 1 

565 

566 # Get most frequent 

567 best_name = max(name_counts.items(), key=lambda x: x[1])[0] 

568 return f"{best_name}. {content}" 

569 

570 return content 

571 

572 def _extract_key_facts( 

573 self, previous_knowledge: str, question_type: str 

574 ) -> str: 

575 """Extract key facts from previous knowledge.""" 

576 extraction_prompt = f"""Extract key facts related to a {question_type} question from this knowledge: 

577 

578{previous_knowledge[:1500]} 

579 

580List the most important facts (names, numbers, dates) found:""" 

581 

582 try: 

583 facts = self._invoke_text(extraction_prompt) 

584 return facts[:500] 

585 except Exception: 

586 # Fallback to truncated previous knowledge if LLM extraction fails 

587 return previous_knowledge[:500]