Coverage for src/local_deep_research/utilities/url_utils.py: 98%

56 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-03 23:15 +0000

1"""URL utility functions for the local deep research application.""" 

2 

3from functools import lru_cache 

4from urllib.parse import parse_qsl, urlencode, urlsplit, urlunsplit 

5 

6from ..security.network_utils import is_private_ip 

7 

8# Re-export for backwards compatibility 

9__all__ = ["normalize_url", "is_private_ip", "canonical_url_key"] 

10 

11# Tracking query parameter keys (matched lowercased). 

12_TRACKING_PARAMS = frozenset( 

13 { 

14 "fbclid", 

15 "gclid", 

16 "msclkid", 

17 "yclid", 

18 "dclid", 

19 "gad_source", 

20 "mc_eid", 

21 "mc_cid", 

22 "ref_src", 

23 "igshid", 

24 "_ga", 

25 "_gl", 

26 } 

27) 

28# Tracking param name prefixes (matched lowercased). 

29_TRACKING_PREFIXES = ("utm_",) 

30 

31 

32def normalize_url(raw_url: str) -> str: 

33 """ 

34 Normalize a URL to ensure it has a proper scheme and format. 

35 

36 Args: 

37 raw_url: The raw URL string to normalize 

38 

39 Returns: 

40 A properly formatted URL string 

41 

42 Examples: 

43 >>> normalize_url("localhost:11434") 

44 'http://localhost:11434' 

45 >>> normalize_url("https://example.com:11434") 

46 'https://example.com:11434' 

47 >>> normalize_url("http:example.com") 

48 'http://example.com' 

49 """ 

50 if not raw_url: 

51 raise ValueError("URL cannot be empty") 

52 

53 # Clean up the URL 

54 raw_url = raw_url.strip() 

55 

56 # First check if the URL already has a proper scheme 

57 if raw_url.startswith(("http://", "https://")): 

58 return raw_url 

59 

60 # Handle case where URL is malformed like "http:hostname" (missing //) 

61 if raw_url.startswith(("http:", "https:")) and not raw_url.startswith( 

62 ("http://", "https://") 

63 ): 

64 scheme = raw_url.split(":", 1)[0] 

65 rest = raw_url.split(":", 1)[1] 

66 return f"{scheme}://{rest}" 

67 

68 # Handle URLs that start with // 

69 if raw_url.startswith("//"): 

70 # Remove the // and process 

71 raw_url = raw_url[2:] 

72 

73 # At this point, we should have hostname:port or just hostname 

74 # Determine if this is localhost or an external host 

75 hostname = raw_url.split(":")[0].split("/")[0] 

76 

77 # Handle IPv6 addresses in brackets 

78 if hostname.startswith("[") and "]" in raw_url: 

79 # Extract the IPv6 address including brackets 

80 hostname = raw_url.split("]")[0] + "]" 

81 

82 # Use http for local/private addresses, https for external hosts 

83 scheme = "http" if is_private_ip(hostname) else "https" 

84 

85 return f"{scheme}://{raw_url}" 

86 

87 

88@lru_cache(maxsize=1024) 

89def canonical_url_key(url: str) -> str: 

90 """Return a canonical form of ``url`` suitable for deduplication and 

91 display in a Sources / citations listing. 

92 

93 The canonical form: 

94 - lowercases scheme and host (paths stay case-sensitive), 

95 - strips userinfo (``user:pass@`` — never leak creds), 

96 - strips default ports (80/http, 443/https), 

97 - strips fragments, 

98 - drops tracking query params (``utm_*``, ``fbclid``, ``gclid``, 

99 ``msclkid``, ``yclid``, ``dclid``, ``gad_source``, ``mc_eid``, 

100 ``mc_cid``, ``ref_src``, ``igshid``, ``_ga``, ``_gl``), 

101 - trims a trailing ``/`` from non-root paths. 

102 

103 Click-through behavior is preserved — tracking params carry no 

104 content, and mainstream browsers already strip them automatically. 

105 Percent-encoding is not normalized; query param order is preserved 

106 as-is. 

107 

108 Falls back to ``url.strip()`` when the input is not a recognizable 

109 absolute URL (e.g. ``mailto:``, ``data:``, or protocol-relative 

110 ``//host/p``), since canonicalization would be ambiguous. 

111 """ 

112 if not url: 

113 return "" 

114 try: 

115 parsed = urlsplit(url) 

116 except Exception: 

117 return url.strip() 

118 # Require both a scheme and a netloc; otherwise canonicalization is 

119 # ambiguous (mailto:, data:, protocol-relative, etc.). 

120 if not parsed.scheme or not parsed.netloc: 

121 return url.strip() 

122 

123 scheme = parsed.scheme.lower() 

124 

125 # Strip userinfo (user:pass@host) from netloc. 

126 netloc = parsed.netloc.rsplit("@", 1)[-1] 

127 

128 # Split host/port carefully so IPv6 literals survive. 

129 if netloc.startswith("["): 

130 end = netloc.find("]") 

131 host = netloc[: end + 1] 

132 rest = netloc[end + 1 :] 

133 port = rest[1:] if rest.startswith(":") else "" 

134 elif ":" in netloc: 

135 host, _, port = netloc.rpartition(":") 

136 host = host.lower() 

137 else: 

138 host, port = netloc.lower(), "" 

139 

140 if (scheme == "https" and port == "443") or ( 

141 scheme == "http" and port == "80" 

142 ): 

143 port = "" 

144 netloc = f"{host}:{port}" if port else host 

145 

146 # Filter query params case-insensitively on key; preserve order/values. 

147 if parsed.query: 

148 pairs = parse_qsl(parsed.query, keep_blank_values=True) 

149 kept = [ 

150 (k, v) 

151 for k, v in pairs 

152 if not ( 

153 k.lower() in _TRACKING_PARAMS 

154 or any(k.lower().startswith(p) for p in _TRACKING_PREFIXES) 

155 ) 

156 ] 

157 query_str = urlencode(kept, doseq=True) if kept else "" 

158 else: 

159 query_str = "" 

160 

161 path = parsed.path 

162 if path and path != "/" and path.endswith("/"): 

163 path = path.rstrip("/") 

164 

165 return urlunsplit((scheme, netloc, path, query_str, ""))