Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,20 @@ Exit codes:
- `LTP_DOWNLOAD_SHA256_<VERSION>`: version-specific expected SHA-256 for the downloaded LanguageTool archive, for example `LTP_DOWNLOAD_SHA256_6_9_SNAPSHOT`.
- `LTP_DOWNLOAD_SHA256`: fallback expected SHA-256 for the downloaded LanguageTool archive.
- `LTP_BYPASS_VERIFIED_DOWNLOADS`: set to `true` to skip SHA-256 verification.
- `LTP_MAX_DOWNLOAD_BYTES`: maximum downloaded ZIP size in bytes.
- default: `536870912` (512 MiB)
- `LTP_SAFE_ZIP_MAX_ARCHIVE_BYTES`: maximum total compressed member size in bytes.
- default: `536870912` (512 MiB)
- `LTP_SAFE_ZIP_MAX_EXTRACTED_BYTES`: maximum total extracted size in bytes.
- default: `805306368` (768 MiB)
- `LTP_SAFE_ZIP_MAX_MEMBERS`: maximum ZIP member count.
- default: `5000`
- `LTP_SAFE_ZIP_MAX_MEMBER_EXTRACTED_BYTES`: maximum extracted size for a single ZIP member in bytes.
- default: `134217728` (128 MiB)
- `LTP_SAFE_ZIP_MAX_MEMBER_COMPRESSION_RATIO`: maximum compression ratio for a single ZIP member.
- default: `100.0`
- `LTP_SAFE_ZIP_MAX_TOTAL_COMPRESSION_RATIO`: maximum compression ratio for the whole ZIP archive.
- default: `10.0`

Downloaded zips are verified with SHA-256 when a checksum is available. Checksums are resolved in this order:
1. `LTP_DOWNLOAD_SHA256_<VERSION>`, where non-alphanumeric characters in the version are replaced with `_` and the name is uppercased.
Expand Down
131 changes: 101 additions & 30 deletions language_tool_python/download_lt.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@
from ._deprecated import deprecated
from .config_file import LanguageToolConfig
from .exceptions import JavaError, PathError
from .safe_zip import SafeZipExtractor
from .utils import (
LTP_JAR_DIR_PATH_ENV_VAR,
get_env_int,
get_language_tool_download_path,
)

Expand All @@ -55,6 +57,9 @@
LT_SNAPSHOT_CURRENT_VERSION = "6.9-SNAPSHOT"
LTP_DOWNLOAD_SHA256_ENV_VAR = "LTP_DOWNLOAD_SHA256"
LTP_BYPASS_VERIFIED_DOWNLOADS_ENV_VAR = "LTP_BYPASS_VERIFIED_DOWNLOADS"
LTP_MAX_DOWNLOAD_BYTES_ENV_VAR = "LTP_MAX_DOWNLOAD_BYTES"
DOWNLOAD_CHUNK_BYTES = 1024 * 1024
_SAFE_ZIP_EXTRACTOR = SafeZipExtractor()

with (
importlib.resources.as_file(
Expand All @@ -76,6 +81,12 @@
)


MAX_DOWNLOAD_BYTES = get_env_int(
LTP_MAX_DOWNLOAD_BYTES_ENV_VAR,
512 * 1024 * 1024,
) # 512 MiB, latest snapshot: 246.58 MiB archive


def _get_zip_hash(version_name: str) -> Optional[str]:
"""Get the expected SHA-256 hash for a given version of LanguageTool.
This function checks for environment variables that may specify the expected hash for the given version. It normalizes the version name to construct the environment variable name. If no specific environment variable is found for the version, it falls back to a general environment variable or a manifest lookup. If the bypass environment variable is set, it will skip verification and return None.
Expand Down Expand Up @@ -109,6 +120,39 @@ def _get_zip_hash(version_name: str) -> Optional[str]:
return None


def _validate_download_size(content_length: Optional[str]) -> Optional[int]:
"""
Validate the HTTP Content-Length header before downloading a ZIP file.

:param content_length: The Content-Length header value, if present.
:type content_length: Optional[str]
:return: The parsed content length, or None when the header is missing.
:rtype: Optional[int]
:raises PathError: If the header is invalid or exceeds the download size limit.
"""
if content_length is None:
return None

try:
total = int(content_length)
except ValueError as e:
err = f"Invalid Content-Length header: {content_length!r}."
raise PathError(err) from e

if total < 0:
err = f"Invalid Content-Length header: {content_length!r}."
raise PathError(err)

if total > MAX_DOWNLOAD_BYTES:
err = (
f"Refusing to download {total} bytes. "
f"Maximum allowed download size is {MAX_DOWNLOAD_BYTES} bytes."
)
raise PathError(err)

return total
Comment thread
mdevolde marked this conversation as resolved.


def parse_java_version(version_text: str) -> Tuple[int, int]:
"""
Parse the Java version from a given version text.
Expand Down Expand Up @@ -261,8 +305,15 @@ def unzip_file(temp_file_name: str, directory_to_extract_to: Path) -> None:
"""

logger.info("Unzipping %s to %s", temp_file_name, directory_to_extract_to)
with zipfile.ZipFile(temp_file_name, "r") as zip_ref:
zip_ref.extractall(directory_to_extract_to)
with (
tempfile.TemporaryDirectory(dir=directory_to_extract_to.parent) as temp_dir,
zipfile.ZipFile(temp_file_name, "r") as zip_ref,
):
_SAFE_ZIP_EXTRACTOR.extractall(
zip_ref,
directory_to_extract_to,
work_dir=Path(temp_dir),
)


@deprecated(
Expand Down Expand Up @@ -419,8 +470,6 @@ def _get_remote_zip(
except requests.exceptions.Timeout as e:
err = f"Request to {self.download_url} timed out."
raise TimeoutError(err) from e
content_length = req.headers.get("Content-Length")
total = int(content_length) if content_length is not None else None
if req.status_code == 404:
err = f"Could not find at URL {self.download_url}. The given version may not exist or is no longer available."
raise PathError(err)
Expand All @@ -430,14 +479,25 @@ def _get_remote_zip(
if req.status_code != 200:
err = f"Failed to download from {self.download_url}. HTTP status code: {req.status_code}."
raise PathError(err)
content_length = req.headers.get("Content-Length")
total = _validate_download_size(content_length)
progress = tqdm.tqdm(
unit="B",
unit_scale=True,
total=total,
desc=f"Downloading LanguageTool {self.version_name}",
)
for chunk in req.iter_content(chunk_size=1024):
downloaded_bytes = 0
for chunk in req.iter_content(chunk_size=DOWNLOAD_CHUNK_BYTES):
if chunk: # filter out keep-alive new chunks
downloaded_bytes += len(chunk)
if downloaded_bytes > MAX_DOWNLOAD_BYTES:
progress.close()
err = (
f"Refusing to download more than {MAX_DOWNLOAD_BYTES} bytes "
f"from {self.download_url}."
)
raise PathError(err)
sha256.update(chunk)
progress.update(len(chunk))
downloaded_file.write(chunk)
Expand Down Expand Up @@ -708,13 +768,17 @@ def download(self) -> None:

if self not in self.get_installed_versions():
with (
tempfile.TemporaryDirectory() as temp_dir,
tempfile.TemporaryDirectory(dir=download_folder) as temp_dir,
tempfile.NamedTemporaryFile(
suffix=".zip", dir=temp_dir
) as downloaded_file,
self._get_remote_zip(downloaded_file) as zip_file,
):
zip_file.extractall(download_folder)
_SAFE_ZIP_EXTRACTOR.extractall(
zip_file,
download_folder,
work_dir=Path(temp_dir),
)

@property
def version_name(self) -> str:
Expand Down Expand Up @@ -790,8 +854,7 @@ def download(self) -> None:
Download and install this snapshot version of LanguageTool.

This method checks Java compatibility, downloads the snapshot ZIP file,
and extracts it to the download folder. For snapshots, the extracted
directory is renamed to match the expected version name if necessary.
and extracts it to the download folder using the requested snapshot name.
"""
confirm_java_compatibility(self._version_name)

Expand All @@ -803,33 +866,41 @@ def download(self) -> None:
return

if self not in self.get_installed_versions():
# For snapshots, pass expected_dirname to rename the extracted folder
with (
tempfile.TemporaryDirectory() as temp_dir,
tempfile.TemporaryDirectory(dir=download_folder) as temp_dir,
tempfile.NamedTemporaryFile(
suffix=".zip", dir=temp_dir
) as downloaded_file,
self._get_remote_zip(downloaded_file) as zip_file,
):
lt_dir = zip_file.infolist()[0].filename
expected_dirname = f"LanguageTool-{self.version_name}/"
if lt_dir != expected_dirname:
with (
tempfile.NamedTemporaryFile(
suffix=".zip", dir=temp_dir
) as temp_file,
zipfile.ZipFile(temp_file, "w") as renamed_zip,
):
for item in zip_file.infolist():
buffer = zip_file.read(item.filename)
new_name = item.filename.replace(
lt_dir, expected_dirname, 1
)
renamed_zip.writestr(new_name, buffer)
temp_file.seek(0)
renamed_zip.extractall(download_folder)
else:
zip_file.extractall(download_folder)
snapshot_extract_dir = Path(temp_dir) / "snapshot"
_SAFE_ZIP_EXTRACTOR.extractall(
zip_file,
snapshot_extract_dir,
work_dir=Path(temp_dir),
)
Comment thread
mdevolde marked this conversation as resolved.
extracted_roots = list(snapshot_extract_dir.iterdir())
if len(extracted_roots) != 1 or not extracted_roots[0].is_dir():
err = (
"Expected snapshot archive to contain exactly one "
"root directory."
)
raise PathError(err)

expected_dir = download_folder / f"LanguageTool-{self.version_name}"
if expected_dir.exists() or expected_dir.is_symlink():
err = (
"Refusing to overwrite existing LanguageTool snapshot "
f"directory: {expected_dir}."
)
raise PathError(err)

logger.debug(
"Renaming extracted snapshot directory %s to %s",
extracted_roots[0],
expected_dir,
)
extracted_roots[0].rename(expected_dir)

@property
def version_name(self) -> str:
Expand Down
Loading