Coverage for src / local_deep_research / advanced_search_system / constraint_checking / intelligent_constraint_relaxer.py: 57%

167 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-01-11 00:51 +0000

1""" 

2Intelligent Constraint Relaxation Strategy 

3 

4This module implements progressive constraint relaxation to improve BrowseComp 

5performance when strict constraint matching fails. 

6 

7Based on BROWSECOMP_IMPROVEMENT_STRATEGY.md recommendations for handling 

8complex multi-constraint queries that may not have perfect matches. 

9""" 

10 

11from loguru import logger 

12from typing import Dict, List 

13 

14 

15class IntelligentConstraintRelaxer: 

16 """ 

17 Progressive constraint relaxation based on search results and constraint reliability. 

18 

19 Features: 

20 1. Maintains essential identifying constraints 

21 2. Relaxes problematic constraint types first 

22 3. Creates multiple search attempts with different constraint sets 

23 4. Preserves constraint importance hierarchy 

24 """ 

25 

26 def __init__(self): 

27 # Constraint priorities (higher = more important, never relax) 

28 self.constraint_priorities = { 

29 "NAME_PATTERN": 10, # Never relax - essential for identification 

30 "EXISTENCE": 9, # Rarely relax - basic entity existence 

31 "LOCATION": 8, # Usually important for identification 

32 "TEMPORAL": 7, # Dates often crucial but sometimes fuzzy 

33 "PROPERTY": 6, # Basic properties, moderately important 

34 "EVENT": 5, # Events can be important but sometimes optional 

35 "STATISTIC": 3, # Often relax - numbers frequently imprecise 

36 "COMPARISON": 1, # Frequently relax - relative comparisons problematic 

37 "RELATIONSHIP": 2, # Often problematic due to complexity 

38 } 

39 

40 # Minimum constraints to keep for meaningful search 

41 self.min_constraints = 2 

42 

43 # Constraint relaxation strategies by type 

44 self.relaxation_strategies = { 

45 "STATISTIC": self._relax_statistical_constraint, 

46 "COMPARISON": self._relax_comparison_constraint, 

47 "TEMPORAL": self._relax_temporal_constraint, 

48 "PROPERTY": self._relax_property_constraint, 

49 } 

50 

51 def relax_constraints_progressively( 

52 self, 

53 constraints: List[object], 

54 candidates_found: List[object], 

55 target_candidates: int = 5, 

56 ) -> List[List[object]]: 

57 """ 

58 Generate progressive constraint relaxation sets based on search results. 

59 

60 Args: 

61 constraints: Original constraint list 

62 candidates_found: Current candidates found 

63 target_candidates: Target number of candidates to find 

64 

65 Returns: 

66 List of relaxed constraint sets to try 

67 """ 

68 if len(candidates_found) >= target_candidates: 

69 logger.debug("Sufficient candidates found, no relaxation needed") 

70 return [constraints] # No relaxation needed 

71 

72 logger.info( 

73 f"Only {len(candidates_found)} candidates found, generating relaxation strategies" 

74 ) 

75 

76 # Sort constraints by relaxation priority (lowest first) 

77 relaxable_constraints = sorted( 

78 constraints, 

79 key=lambda c: self.constraint_priorities.get( 

80 self._get_constraint_type(c), 5 

81 ), 

82 ) 

83 

84 relaxed_sets = [] 

85 

86 # Strategy 1: Remove least important constraints progressively 

87 for i in range(1, min(len(constraints), 4)): # Max 3 relaxation levels 

88 relaxed_set = relaxable_constraints[ 

89 :-i 

90 ] # Remove i lowest priority constraints 

91 

92 if len(relaxed_set) >= self.min_constraints: 92 ↛ 87line 92 didn't jump to line 87 because the condition on line 92 was always true

93 relaxed_sets.append(relaxed_set) 

94 logger.debug( 

95 f"Relaxation level {i}: Removed {i} constraints, {len(relaxed_set)} remaining" 

96 ) 

97 

98 # Strategy 2: Create constraint variations for difficult constraints 

99 variation_sets = self._create_constraint_variations(constraints) 

100 relaxed_sets.extend(variation_sets) 

101 

102 # Strategy 3: Keep only high-priority constraints 

103 high_priority_constraints = [ 

104 c 

105 for c in constraints 

106 if self.constraint_priorities.get(self._get_constraint_type(c), 5) 

107 >= 7 

108 ] 

109 

110 if len(high_priority_constraints) >= self.min_constraints: 110 ↛ 117line 110 didn't jump to line 117 because the condition on line 110 was always true

111 relaxed_sets.append(high_priority_constraints) 

112 logger.debug( 

113 f"High-priority only: {len(high_priority_constraints)} constraints" 

114 ) 

115 

116 # Remove duplicates while preserving order 

117 unique_sets = [] 

118 seen_sets = set() 

119 

120 for constraint_set in relaxed_sets: 

121 # Create a hashable representation 

122 set_signature = tuple(sorted(str(c) for c in constraint_set)) 

123 if set_signature not in seen_sets: 123 ↛ 120line 123 didn't jump to line 120 because the condition on line 123 was always true

124 seen_sets.add(set_signature) 

125 unique_sets.append(constraint_set) 

126 

127 logger.info( 

128 f"Generated {len(unique_sets)} unique relaxation strategies" 

129 ) 

130 return unique_sets 

131 

132 def _create_constraint_variations( 

133 self, constraints: List[object] 

134 ) -> List[List[object]]: 

135 """ 

136 Create variations of difficult constraints to improve matching. 

137 

138 Args: 

139 constraints: Original constraints 

140 

141 Returns: 

142 List of constraint sets with variations 

143 """ 

144 variation_sets = [] 

145 

146 for i, constraint in enumerate(constraints): 

147 constraint_type = self._get_constraint_type(constraint) 

148 

149 if constraint_type in self.relaxation_strategies: 

150 # Create variations for this constraint 

151 variations = self.relaxation_strategies[constraint_type]( 

152 constraint 

153 ) 

154 

155 if variations: 

156 # Replace original constraint with each variation 

157 for variation in variations: 

158 new_set = constraints.copy() 

159 new_set[i] = variation 

160 variation_sets.append(new_set) 

161 

162 return variation_sets 

163 

164 def _relax_statistical_constraint(self, constraint: object) -> List[object]: 

165 """ 

166 Create relaxed variations of statistical constraints. 

167 

168 Statistical constraints often fail due to: 

169 - Outdated numbers 

170 - Rounding differences 

171 - Different measurement units 

172 """ 

173 variations = [] 

174 constraint_text = str(constraint) 

175 

176 # Extract numbers from constraint 

177 import re 

178 

179 numbers = re.findall(r"\d+(?:\.\d+)?", constraint_text) 

180 

181 for number_str in numbers: 

182 try: 

183 number = float(number_str) 

184 

185 # Create range variations (+/- 10%, 20%, 50%) 

186 for tolerance in [0.1, 0.2, 0.5]: 

187 lower = number * (1 - tolerance) 

188 upper = number * (1 + tolerance) 

189 

190 # Replace exact number with range 

191 relaxed_text = constraint_text.replace( 

192 number_str, f"between {lower:.0f} and {upper:.0f}" 

193 ) 

194 

195 variations.append( 

196 self._create_relaxed_constraint( 

197 constraint, relaxed_text 

198 ) 

199 ) 

200 

201 # Create "approximately" version 

202 approx_text = constraint_text.replace( 

203 number_str, f"approximately {number_str}" 

204 ) 

205 variations.append( 

206 self._create_relaxed_constraint(constraint, approx_text) 

207 ) 

208 

209 except ValueError: 

210 continue 

211 

212 return variations[:3] # Limit to avoid too many variations 

213 

214 def _relax_comparison_constraint(self, constraint: object) -> List[object]: 

215 """ 

216 Create relaxed variations of comparison constraints. 

217 

218 Comparison constraints often fail due to: 

219 - Relative terms are context-dependent 

220 - "Times more" calculations are complex 

221 - Baseline comparisons may be unclear 

222 """ 

223 variations = [] 

224 constraint_text = str(constraint).lower() 

225 

226 # Replace strict comparisons with looser ones 

227 relaxation_mappings = { 

228 "times more": "significantly more", 

229 "times larger": "much larger", 

230 "times bigger": "much bigger", 

231 "exactly": "approximately", 

232 "must be": "should be", 

233 "is the": "is among the", 

234 "largest": "one of the largest", 

235 "smallest": "one of the smallest", 

236 "highest": "among the highest", 

237 "lowest": "among the lowest", 

238 } 

239 

240 for strict_term, relaxed_term in relaxation_mappings.items(): 

241 if strict_term in constraint_text: 241 ↛ 242line 241 didn't jump to line 242 because the condition on line 241 was never true

242 relaxed_text = constraint_text.replace( 

243 strict_term, relaxed_term 

244 ) 

245 variations.append( 

246 self._create_relaxed_constraint(constraint, relaxed_text) 

247 ) 

248 

249 # Remove comparison altogether - focus on the main entity/property 

250 comparison_indicators = [ 

251 "more than", 

252 "less than", 

253 "compared to", 

254 "relative to", 

255 ] 

256 for indicator in comparison_indicators: 

257 if indicator in constraint_text: 257 ↛ 259line 257 didn't jump to line 259 because the condition on line 257 was never true

258 # Extract the part before the comparison 

259 parts = constraint_text.split(indicator) 

260 if len(parts) > 1: 

261 main_part = parts[0].strip() 

262 variations.append( 

263 self._create_relaxed_constraint(constraint, main_part) 

264 ) 

265 

266 return variations[:3] 

267 

268 def _relax_temporal_constraint(self, constraint: object) -> List[object]: 

269 """ 

270 Create relaxed variations of temporal constraints. 

271 

272 Temporal constraints often fail due to: 

273 - Exact dates vs approximate dates 

274 - Different calendar systems 

275 - Founding vs incorporation dates 

276 """ 

277 variations = [] 

278 constraint_text = str(constraint) 

279 

280 # Extract years 

281 import re 

282 

283 years = re.findall(r"\b(19\d{2}|20\d{2})\b", constraint_text) 

284 

285 for year_str in years: 

286 year = int(year_str) 

287 

288 # Create decade ranges 

289 decade_start = (year // 10) * 10 

290 decade_text = constraint_text.replace(year_str, f"{decade_start}s") 

291 variations.append( 

292 self._create_relaxed_constraint(constraint, decade_text) 

293 ) 

294 

295 # Create +/- ranges 

296 for range_years in [1, 2, 5]: 

297 range_text = constraint_text.replace( 

298 year_str, 

299 f"between {year - range_years} and {year + range_years}", 

300 ) 

301 variations.append( 

302 self._create_relaxed_constraint(constraint, range_text) 

303 ) 

304 

305 # Replace exact temporal terms with approximate ones 

306 temporal_relaxations = { 

307 "founded in": "founded around", 

308 "established in": "established around", 

309 "created in": "created around", 

310 "started in": "started around", 

311 "exactly": "approximately", 

312 } 

313 

314 for exact_term, relaxed_term in temporal_relaxations.items(): 

315 if exact_term in constraint_text.lower(): 

316 relaxed_text = constraint_text.replace(exact_term, relaxed_term) 

317 variations.append( 

318 self._create_relaxed_constraint(constraint, relaxed_text) 

319 ) 

320 

321 return variations[:3] 

322 

323 def _relax_property_constraint(self, constraint: object) -> List[object]: 

324 """ 

325 Create relaxed variations of property constraints. 

326 

327 Property constraints can be relaxed by: 

328 - Making specific properties more general 

329 - Allowing alternative phrasings 

330 - Focusing on key attributes 

331 """ 

332 variations = [] 

333 constraint_text = str(constraint).lower() 

334 

335 # Make specific properties more general 

336 property_generalizations = { 

337 "multinational": "international", 

338 "conglomerate": "large company", 

339 "corporation": "company", 

340 "subsidiary": "part of", 

341 "headquarters": "based", 

342 "founded": "established", 

343 "specialized": "focused", 

344 "leading": "major", 

345 } 

346 

347 for specific, general in property_generalizations.items(): 

348 if specific in constraint_text: 

349 relaxed_text = constraint_text.replace(specific, general) 

350 variations.append( 

351 self._create_relaxed_constraint(constraint, relaxed_text) 

352 ) 

353 

354 # Remove adjectives to make constraints less specific 

355 adjective_patterns = [ 

356 r"\b(very|extremely|highly|most|largest|biggest|smallest)\s+", 

357 r"\b(major|minor|primary|secondary|main|key)\s+", 

358 ] 

359 

360 for pattern in adjective_patterns: 

361 import re 

362 

363 if re.search(pattern, constraint_text): 

364 relaxed_text = re.sub(pattern, "", constraint_text) 

365 variations.append( 

366 self._create_relaxed_constraint(constraint, relaxed_text) 

367 ) 

368 

369 return variations[:2] 

370 

371 def _create_relaxed_constraint( 

372 self, original_constraint: object, relaxed_text: str 

373 ) -> object: 

374 """ 

375 Create a new constraint object with relaxed text. 

376 

377 This is a helper method that preserves the constraint structure 

378 while updating the constraint value/text. 

379 """ 

380 # Try to create a copy of the constraint with updated text 

381 if hasattr(original_constraint, "__dict__"): 381 ↛ 398line 381 didn't jump to line 398 because the condition on line 381 was always true

382 # Create a copy of the constraint object 

383 import copy 

384 

385 relaxed_constraint = copy.deepcopy(original_constraint) 

386 

387 # Update the constraint value/description 

388 if hasattr(relaxed_constraint, "value"): 388 ↛ 390line 388 didn't jump to line 390 because the condition on line 388 was always true

389 relaxed_constraint.value = relaxed_text 

390 elif hasattr(relaxed_constraint, "description"): 

391 relaxed_constraint.description = relaxed_text 

392 elif hasattr(relaxed_constraint, "text"): 

393 relaxed_constraint.text = relaxed_text 

394 

395 return relaxed_constraint 

396 else: 

397 # If we can't copy the constraint, return a simple string representation 

398 return relaxed_text 

399 

400 def _get_constraint_type(self, constraint: object) -> str: 

401 """Extract constraint type from constraint object.""" 

402 if hasattr(constraint, "type"): 402 ↛ 407line 402 didn't jump to line 407 because the condition on line 402 was always true

403 if hasattr(constraint.type, "value"): 403 ↛ 406line 403 didn't jump to line 406 because the condition on line 403 was always true

404 return constraint.type.value 

405 else: 

406 return str(constraint.type) 

407 elif hasattr(constraint, "constraint_type"): 

408 return constraint.constraint_type 

409 else: 

410 # Try to infer from constraint text 

411 constraint_text = str(constraint).lower() 

412 

413 if any( 

414 word in constraint_text 

415 for word in ["name", "called", "known as"] 

416 ): 

417 return "NAME_PATTERN" 

418 elif any( 

419 word in constraint_text 

420 for word in ["location", "country", "city"] 

421 ): 

422 return "LOCATION" 

423 elif any( 

424 word in constraint_text 

425 for word in ["year", "date", "when", "time"] 

426 ): 

427 return "TEMPORAL" 

428 elif any( 

429 word in constraint_text 

430 for word in ["number", "count", "amount"] 

431 ): 

432 return "STATISTIC" 

433 elif any( 

434 word in constraint_text 

435 for word in ["event", "happened", "occurred"] 

436 ): 

437 return "EVENT" 

438 elif any( 

439 word in constraint_text 

440 for word in ["than", "more", "less", "compared"] 

441 ): 

442 return "COMPARISON" 

443 else: 

444 return "PROPERTY" 

445 

446 def analyze_relaxation_impact( 

447 self, 

448 original_constraints: List[object], 

449 relaxed_constraints: List[object], 

450 ) -> Dict: 

451 """ 

452 Analyze the impact of constraint relaxation. 

453 

454 Returns analysis of what was changed and the expected impact. 

455 """ 

456 analysis = { 

457 "original_count": len(original_constraints), 

458 "relaxed_count": len(relaxed_constraints), 

459 "constraints_removed": len(original_constraints) 

460 - len(relaxed_constraints), 

461 "constraint_changes": [], 

462 "priority_impact": "low", 

463 "recommendation": "", 

464 } 

465 

466 # Check what types of constraints were removed/modified 

467 original_types = [ 

468 self._get_constraint_type(c) for c in original_constraints 

469 ] 

470 relaxed_types = [ 

471 self._get_constraint_type(c) for c in relaxed_constraints 

472 ] 

473 

474 removed_types = [] 

475 for orig_type in original_types: 

476 if orig_type not in relaxed_types: 

477 removed_types.append(orig_type) 

478 

479 # Assess impact based on what was removed 

480 high_impact_types = {"NAME_PATTERN", "EXISTENCE", "LOCATION"} 

481 medium_impact_types = {"TEMPORAL", "EVENT", "PROPERTY"} 

482 

483 if any(t in removed_types for t in high_impact_types): 

484 analysis["priority_impact"] = "high" 

485 analysis["recommendation"] = ( 

486 "High-priority constraints removed. Results may be less accurate." 

487 ) 

488 elif any(t in removed_types for t in medium_impact_types): 

489 analysis["priority_impact"] = "medium" 

490 analysis["recommendation"] = ( 

491 "Medium-priority constraints removed. Check results carefully." 

492 ) 

493 else: 

494 analysis["priority_impact"] = "low" 

495 analysis["recommendation"] = ( 

496 "Low-priority constraints removed. Results should remain accurate." 

497 ) 

498 

499 analysis["removed_constraint_types"] = removed_types 

500 

501 return analysis