Source code for ewoksutils.uri_utils

import re
import sys
import urllib.parse
from pathlib import Path
from typing import Iterable
from typing import Optional
from typing import Tuple
from typing import Union

_WIN32 = sys.platform == "win32"


[docs] def parse_uri( uri: Union[str, Path], default_scheme: str = "file", default_port: Optional[int] = None, ) -> urllib.parse.ParseResult: """ Parse a URI following RFC 3986 rules. The scheme and path components are required, though the path may be empty. When authority is present, the path must either be empty or begin with "/". When authority is not present, the path cannot begin with "//". """ uri, query_paths = _normalize(uri) parsed = urllib.parse.urlparse(uri) scheme, netloc, path, params, query, fragment = parsed query = _merge_query(query_paths, query) if not scheme and default_scheme: scheme = default_scheme if default_port and not parsed.port: netloc = f"{parsed.hostname}:{default_port}" return urllib.parse.ParseResult(scheme, netloc, path, params, query, fragment)
[docs] def path_from_uri( uri: Union[str, Path, urllib.parse.ParseResult], **parse_options ) -> Path: if not isinstance(uri, urllib.parse.ParseResult): uri = parse_uri(uri, **parse_options) return Path(_file_path_from_parsed(uri))
[docs] def parse_query( uri: Union[str, Path, urllib.parse.ParseResult], **parse_options ) -> dict: if not isinstance(uri, urllib.parse.ParseResult): uri = parse_uri(uri, **parse_options) return _split_query(uri.query)
[docs] def join_uri( root: Union[str, Path, urllib.parse.ParseResult], relative: Union[str, Path, urllib.parse.ParseResult], **parse_options, ) -> urllib.parse.ParseResult: if not isinstance(root, urllib.parse.ParseResult): root = parse_uri(root, **parse_options) if not isinstance(relative, urllib.parse.ParseResult): relative = parse_uri(relative, **parse_options) if root.params or relative.params: raise NotImplementedError("URI params are not supported") relative_path = _file_path_from_parsed(relative) path = f"{root.path}/{relative_path}" query = _merge_query(root.query, relative.query) return urllib.parse.ParseResult(root.scheme, root.netloc, path, "", query, "")
[docs] def uri_as_string(uri: Union[str, Path, urllib.parse.ParseResult]) -> str: if isinstance(uri, str): return uri if isinstance(uri, Path): uri = parse_uri(uri) if not isinstance(uri, urllib.parse.ParseResult): raise TypeError(type(uri)) if not uri.netloc: tmp_uri = urllib.parse.ParseResult( "file", uri.netloc, uri.path, uri.params, uri.query, uri.fragment ) uri_str = tmp_uri.geturl() return uri_str.replace("file://", f"{uri.scheme}://", 1) return uri.geturl()
def _normalize(uri: Union[str, Path]) -> Tuple[str, str]: """Handle URI's that do not follow the RFC 3986 rules.""" uri = str(uri).replace("\\", "/") # Handle Windows paths # Windows drive-letter path handling (C:/..., D:/...) if _WIN32 and re.match(r"^[A-Za-z]:/", uri): uri = f"file:///{uri}" # Non-standard notation: # "/some/path::/another/path" # becomes: # "/some/path?path=/another/path" query = "" query_paths = re.findall(r"::([^;?#]*)", uri) if query_paths: for path in query_paths: uri = uri.replace("::" + path, "") query = _join_query(("path", path) for path in query_paths) return uri, query def _split_query(query: str) -> dict: result = dict() for s in query.split("&"): if not s: continue name, _, value = s.partition("=") prev_value = result.get(name) if prev_value: value = _join_string(prev_value, value, "/") result[name] = value return result def _join_query(query_items: Iterable[Tuple[str, str]]) -> str: return "&".join(f"{k}={v}" for k, v in query_items) def _join_string(a: str, b: str, sep: str) -> str: aslash = a.endswith(sep) bslash = b.startswith(sep) if aslash and bslash: return a[:-1] + b if aslash or bslash: return a + b return a + sep + b def _merge_query(query1: str, query2: str) -> str: query1 = _split_query(query1) query2 = _split_query(query2) merged = [] for name in list(query1) + list(query2): value1 = query1.pop(name, None) value2 = query2.pop(name, None) if value1 and value2: merged.append((name, _join_string(value1, value2, "/"))) elif value1: merged.append((name, value1)) elif value2: merged.append((name, value2)) return _join_query(merged) def _file_path_from_parsed(uri: urllib.parse.ParseResult) -> str: if _WIN32 and uri.path.startswith("/"): return uri.path[1:] return uri.path