Source code for plom.client.downloader

# SPDX-License-Identifier: AGPL-3.0-or-later
# Copyright (C) 2019-2021 Andrew Rechnitzer
# Copyright (C) 2019-2024 Colin B. Macdonald
# Copyright (C) 2020 Victoria Schuster

"""The background downloader downloads images using threads."""

from __future__ import annotations

import logging
from pathlib import Path
import random
import sys
import tempfile
import threading
from time import sleep, time
from typing import Any

if sys.version_info >= (3, 9):
    from importlib import resources
else:
    import importlib_resources as resources

# from PyQt6.QtCore import QThread
from PyQt6.QtCore import QObject, QRunnable, QThreadPool, pyqtSignal, pyqtSlot

from plom.messenger import Messenger, BaseMessenger
from plom.plom_exceptions import PlomException, PlomConnectionError
from .pagecache import PageCache

log = logging.getLogger("Downloader")


[docs] class Downloader(QObject): """Downloads and maintains a cache of images. Downloader maintains a queue of downloads and emits signals whenever downloads succeed or fail. Call :meth:`download_in_background_thread` to enqueue an image for asynchronous download. Once enqueued, a download will be automatically retried several times, but to prevent endless data usage, it will give up after three tries. That is, clients cannot assume that something enqueued will inevitably be downloaded. Clients can check by TODO: document how to check if something is in the queue or/and or currently downloading. Synchronous downloads can be performed with :meth:`sync_download` and :meth:`sync_downloads`. These images will also be cached. TODO: document how to query the queue size. TODO: document how to query the size on disc. The current queue can be cleared with :meth:`clear_queue`. For shutting down the queue, see :meth:`stop`. The Downloader keeps a clone of the messenger: if you logout (revoke the token) in another msgr while this is downloading, you'll get a crash. The Downloader will emit various **signals**. You can connect slots to these: * `download_finished(img_id: int, md5sum: str, filename: str)`: emitted when a (background) download finishes. `filename` is newly-downloaded file. * `download_failed(img_id: int)`: emitted when a (background) download fails. The job will be automatically restarted up to three times. * `download_queue_changed(dict)`: the queue length changed (e.g., something enqueued or the queue is cleared). The signal argument is a dict of information about the queue. Use :meth:`enable_fail_mode` to artificially fail some download attempts and generally take longer. For debugging. Disable again with :meth:`disable_fail_mode`. """ # emitted anytime a (background) download finishes download_finished = pyqtSignal(int, str, str) # emitted anytime a (background) download fails download_failed = pyqtSignal(int) # emitted when queue lengths change (i.e., things enqueued) download_queue_changed = pyqtSignal(dict) def __init__( self, basedir: str | Path, *, msgr: BaseMessenger | None = None ) -> None: """Initialize a new Downloader. Args: basedir: a directory for the image cache. Keyword Args: msgr: used for communication with a Plom server, or None and you can later call :method:`attach_messenger`. Note Messenger is not multithreaded and blocks using mutexes. Here we make our own private clone so caller can keep using their's. Returns: None. """ super().__init__() # self.is_download_in_progress = False self.msgr: None | BaseMessenger = None if msgr: self.msgr = Messenger.clone(msgr) self.basedir = Path(basedir) self.write_lock = threading.Lock() self.pagecache = PageCache(basedir) # TODO: may want this in the QApp: only have one # TODO: just use QThreadPool.globalInstance()? self.threadpool = QThreadPool() # TODO: will this stop Marker from getting one? It doesn't seem to... self.threadpool.setMaxThreadCount(2) self._tries: dict[int, int] = {} self._total_tries: dict[int, int] = {} self._in_progress: dict[int, bool] = {} # it still counts as a fail if it eventually retried successfully self.number_of_fails = 0 self.number_of_retries = 0 # we're trying to stop, so don't retry for example self._stopping = False self._placeholder_image: Path | None = None self.make_placeholder() self.simulate_failures = False # percentage of download attempts that will fail and an overall # delay in seconds in a range (both are i.i.d. per retry). # These are ignored unless simulate_failures is True. self._simulate_failure_rate = 33.0 self._simulate_slow_net = (0.5, 3.0)
[docs] def attach_messenger(self, msgr: Messenger) -> None: """Add/replace the current messenger.""" self.msgr = Messenger.clone(msgr)
[docs] def detach_messenger(self) -> None: """Stop our messenger and forget it (but do not logout).""" if self.msgr: self.msgr.stop() self.msgr = None
[docs] def has_messenger(self) -> bool: """Do we have a messenger?""" if self.msgr: return True return False
def enable_fail_mode(self) -> None: log.info("fail mode ENABLED") self.simulate_failures = True def disable_fail_mode(self) -> None: log.info("fail mode disabled") self.simulate_failures = False def make_placeholder(self) -> None: # Not imported earlier b/c of some circular import stuff (?) import plom.client.icons res = resources.files(plom.client.icons) / "manager_unknown.svg" placeholder = self.basedir / "placeholder" placeholder = placeholder.with_suffix(res.suffix) with res.open("rb") as fin, placeholder.open("wb") as fout: fout.write(fin.read()) self._placeholder_image = placeholder
[docs] def get_placeholder_path(self) -> str: """A static image that can be used as a placeholder while images are downloading. Returns: A real path on disc to the image, possibly a cached copy. TODO: might prefer a ``pathlib.Path`` but for now its a `str`. Currently you may have to make a string (not an Path for example) b/c of some Qt limitations in the ExamModel and proxy stuff in Marker. TODO: Issue #2357: better image or perhaps an animation? """ return str(self._placeholder_image)
def get_stats(self) -> dict[str, Any]: # TODO: would be nice to know the "gave up after 3 tries" failures... # TODO: track retries and fails (more positive!) in_progress_ids = [k for k, v in self._in_progress.items() if v is True] return { "cache_size": self.pagecache.how_many_cached(), "fails": self.number_of_fails, "retries": self.number_of_retries, "queued": len(in_progress_ids), "in_progress_ids": in_progress_ids, } def print_queue(self) -> None: print("enumerating all jobs to check for in progress...") for k, v in self._in_progress.items(): print((k, v))
[docs] def clear_queue(self) -> None: """Cancel any enqueued (but not yet started) downloads. Any existing downloads will continue, including their (up-to) three retries. """ # self.threadpool.cancel() self.threadpool.clear() # print(f"children: {self.threadpool.children()}") # print("forcing in_progress to false...") for k, v in self._in_progress.items(): self._in_progress[k] = False self.download_queue_changed.emit(self.get_stats())
[docs] def stop(self, timeout: int = -1) -> bool: """Try to stop the downloader, after waiting for threads to clear. Args: timeout (int): milliseconds seconds to wait before giving up. ``-1`` to wait forever. Returns: bool: True if all threads finished or False if timeout reached. In the False case, some cleanup tasks, such as removing files, probably did not occur. Feel free to try again. """ self._stopping = True # first we clear the ones that haven't started self.clear_queue() # then wait for timeout for the in-progress ones if not self.threadpool.waitForDone(timeout): return False # all downloads cancelling/finished, we can start cleaning up self.pagecache.wipe_cache() if self._placeholder_image: self._placeholder_image.unlink() self._placeholder_image = None self.detach_messenger() return True
[docs] def download_in_background_thread( self, row: dict[str, Any], priority: bool = False, _is_retry: bool = False ): """Enqueue the downloading of particular row of the image database. Args: row (dict): One image entry in the "page data", has fields `id`, `md5` and some others that are used to try to choose a reasonable local file name. Currently the local file name is chosen from the ``"server_path"`` key. Keyword Args: priority (bool): high priority if user requested this (not a background download. _is_retry (bool): default False. If True, this signifies an automatic retry. Clients should probably not touch this. Returns: None Raises: RuntimeError: something unexpected happened. PlomConnectionError: we do not have a valid Messenger. Does not start a new download if the Page Cache already has that image. It also tries to avoid enquing another request for the same image. """ log.debug( "activeThreadCount = %d, maxThreadCount = %d", self.threadpool.maxThreadCount(), self.threadpool.activeThreadCount(), ) if self.pagecache.has_page_image(row["id"]): return if not _is_retry and self._in_progress.get(row["id"]): # return early if this image id is already in queue # TODO but we should reset retries? return # try some things to get a reasonable local filename target_name = row.get("server_path", None) # Note: too dangerous: callers are likely to have put placeholder in these! # if target_name is None: # target_name = row.get("local_filename", None) # if target_name is None: # target_name = row.get("filename", None) if target_name is None: raise NotImplementedError("TODO: then use a random value") if str(target_name) == str(self._placeholder_image): raise RuntimeError( f"Unexpectedly detected target image as placeholder: {row}" ) target_name = self.basedir / (Path(target_name).name) if not self.msgr: raise PlomConnectionError( "Cannot download as we don't have an active Messenger" ) assert self.msgr is not None worker = DownloadWorker( self.msgr, row["id"], row["md5"], target_name, basedir=self.basedir, simulate_failures=( (self._simulate_failure_rate, self._simulate_slow_net) if self.simulate_failures else False ), ) worker.signals.download_succeed.connect(self._worker_delivers) worker.signals.download_fail.connect(self._worker_failed) # TODO: Getting TypeError, try on more recent PyQt6...? # (QRunnable, priority: int = 0): argument 2 has unexpected type 'Priority' if priority: self.threadpool.start(worker) # , QThread.Priority.HighPriority) else: self.threadpool.start(worker) # , QThread.Priority.LowPriority) # keep track of which img_ids are in progress # todo: semaphore around this and .start? self._in_progress[row["id"]] = True # bg.finished.connect(thread.quit) # bg.finished.connect(bg.deleteLater) # keep track of retries x = self._tries.get(row["id"], 0) y = self._total_tries.get(row["id"], 0) self._tries[row["id"]] = x + 1 if _is_retry else 1 self._total_tries[row["id"]] = y + 1 log.info( "image id %d: starting try %d (lifetime try %d)", row["id"], self._tries[row["id"]], self._total_tries[row["id"]], ) # TODO: did it though? Maybe more when it returns? self.download_queue_changed.emit(self.get_stats())
def _worker_delivers(self, img_id: int, md5: str, tmpfile, targetfile) -> None: """A worker has succeed and delivered a temp file to us. Args: img_id: integer uniquely tied to an image, probably the DB key or similar. md5: the md5sum of the file. tmpfile (str/pathlib.Path): a temporary path and filename where the file is now. targetfile (str/pathlib.Path): to where should we save (that is, rename) the file. This will emit a signal that others can listen for. In some cases, the worker will deliver something that someone else has downloaded in the meantime. In that case we do not emit a signal. """ log.debug(f"Worker delivery: {img_id}, tmp={tmpfile}, target={targetfile}") # TODO: maybe pagecache should have the desired filename? # TODO: revisit once PageCache decides None/Exception... self._in_progress[img_id] = False if self.pagecache.has_page_image(img_id): cur = self.pagecache.page_image_path(img_id) else: cur = None if cur: if cur == targetfile: log.info( "Someone else downloaded %d (%s) for us in the meantime, no action", img_id, targetfile, ) # no emit in this case return raise RuntimeError(f"downloaded wrong thing? {cur}, {targetfile}, {md5}") Path(targetfile).parent.mkdir(exist_ok=True, parents=True) with self.write_lock: Path(tmpfile).rename(targetfile) self.pagecache.set_page_image_path(img_id, targetfile) self.download_finished.emit(img_id, md5, targetfile) self.download_queue_changed.emit(self.get_stats()) def _worker_failed( self, img_id: int, md5: str, targetfile, err_stuff_tuple ) -> None: """A worker has failed and called us: retry 3 times.""" log.warning("Worker failed: %d, %s", img_id, str(err_stuff_tuple)) self.number_of_retries += 1 self.download_failed.emit(img_id) x = self._tries[img_id] if x >= 3: log.warning( "We've tried image %d too many times (try %d/3 and %d lifetime failures): giving up", img_id, self._tries[img_id], self._total_tries[img_id], ) self.number_of_fails += 1 self._in_progress[img_id] = False self.download_queue_changed.emit(self.get_stats()) return if self._stopping: log.warning("Not retrying image %d b/c we're stopping", img_id) self._in_progress[img_id] = False self.download_queue_changed.emit(self.get_stats()) return # TODO: does not respect the original priority: high priority failure becomes ordinary self.download_in_background_thread( {"id": img_id, "md5": md5, "server_path": targetfile}, _is_retry=True, ) self.download_queue_changed.emit(self.get_stats())
[docs] def sync_downloads(self, pagedata: list[dict[str, Any]]) -> list[dict[str, Any]]: """Given a block of "pagedata" download all images synchronously and return updated data. Args: pagedata: a list of dicts, each dict described in `sync_download`. Warning: we don't make a copy: it will be modified (and returned). Returns: list: a list of dicts which consists of the updated input with filenames added/updated for each image. """ for row in pagedata: row = self.sync_download(row) return pagedata
[docs] def sync_download(self, row: dict[str, Any]) -> dict[str, Any]: """Given a row of "pagedata", download synchronously and return edited row. Args: row: one row of the metadata for the set of all pages involved in a question. A list of dicts where each dict must have (at least) keys ``id``, ``md5``, ``server_path``. TODO: sometimes we seem to accept ``md5sum`` instead: should fix that. Returns: dict: the modified row. If the file was already downloaded, put its name into the ``filename`` key. If we had to download it we also put the filename into ``filename``. """ if self.simulate_failures: # TODO: simulate failures here too, not just slowdowns? # fail = random.random() <= self._simulate_failure_rate / 100 a, b = self._simulate_slow_net # generate wait1 + wait2 \in (a, b) wait2 = random.random() * (b - a) + a wait1 = random.random() * wait2 wait2 -= wait1 # TODO: revisit once PageCache decides None/Exception... if self.pagecache.has_page_image(row["id"]): cur = self.pagecache.page_image_path(row["id"]) row_cur = row.get("filename", None) if row_cur is None: row["filename"] = cur # TODO: do we care if this matches row["server_path"]? return row assert ( row_cur == cur ), f"row has a filename which does not match cache: {row_cur} vs {cur}" log.info("asked to download id=%d; already in cache", row["id"]) return row f = self.basedir / (Path(row["server_path"]).name) if f.exists(): raise RuntimeError( f"asked to download {f}; unexpectedly we already have it" ) log.info("downloading %s", f) # the server_path might have a few subdirs f.parent.mkdir(exist_ok=True, parents=True) # we're not entirely consistent... md5 = row.get("md5") or row["md5sum"] if self.simulate_failures: sleep(wait1) # if self.simulate_failures and fail: # raise NotImplementedError("TODO: how to simulate failure?") assert self.msgr im_bytes = self.msgr.get_image(row["id"], md5) if self.simulate_failures: sleep(wait2) with open(f, "wb") as fh: fh.write(im_bytes) row["filename"] = str(f) self.pagecache.set_page_image_path(row["id"], row["filename"]) return row
[docs] class WorkerSignals(QObject): """Defines the signals available from a running worker thread. Supported signals are: finished: No data download_success: `(img_id (int), md5 (str), tempfile (str), targetfile (str)` download_fail: `(img_id (int), md5 (str), targetfile (str), err_stuff_tuple (tuple)` where the tuple is `(exctype, value, traceback.format_exc()`. """ finished = pyqtSignal() # error = pyqtSignal(tuple) # result = pyqtSignal(object) download_succeed = pyqtSignal(int, str, str, str) download_fail = pyqtSignal(int, str, str, tuple)
[docs] class DownloadWorker(QRunnable): def __init__( self, msgr: BaseMessenger, img_id: int, md5: str, target_name: Path, *, basedir: Path, simulate_failures: bool | tuple[float, tuple[float, float]] = False, ): super().__init__() self._msgr = Messenger.clone(msgr) self.img_id = img_id self.md5 = md5 self.target_name = Path(target_name) self.basedir = Path(basedir) self.signals = WorkerSignals() if simulate_failures: assert isinstance(simulate_failures, tuple) self._simulate_failure_rate = simulate_failures[0] self._simulate_slow_net = simulate_failures[1] self.simulate_failures = True else: self.simulate_failures = False # https://www.pythonguis.com/tutorials/multithreading-pyqt-applications-qthreadpool/ # consider try except with error signal
[docs] @pyqtSlot() def run(self): simfail = False # pylint worries it could be undefined if self.simulate_failures: simfail = random.random() <= self._simulate_failure_rate / 100 a, b = self._simulate_slow_net # generate wait1 + wait2 \in (a, b) wait2 = random.random() * (b - a) + a wait1 = random.random() * wait2 wait2 -= wait1 sleep(wait1) try: t0 = time() try: im_bytes = self._msgr.get_image(self.img_id, self.md5) if self.simulate_failures and simfail: # TODO: can get PlomNotAuthorized if the pre-clone msgr is logged out raise NotImplementedError( "TODO: what sort of exceptions are possible?" ) except PlomException as e: log.warning(f"vaguely expected failure! {str(e)}") self.signals.download_fail.emit( self.img_id, self.md5, str(self.target_name), (str(e), "whut else?") ) self.signals.finished.emit() return t1 = time() with tempfile.NamedTemporaryFile( "wb", dir=self.basedir, prefix="downloading_", suffix=self.target_name.suffix, delete=False, ) as f: f.write(im_bytes) t2 = time() except Exception as e: # TODO: generic catch-all bad, beer good log.error(f"unexpected failure, wtf we do here?! {str(e)}") self.signals.download_fail.emit( self.img_id, self.md5, str(self.target_name), (str(e), "whut else?") ) self.signals.finished.emit() return if self.simulate_failures: sleep(wait2) if self.simulate_failures: log.debug( "worker time: %.3gs download, %.3gs write, %.3gs debuggery", t1 - t0, t2 - t1, wait1 + wait2, ) else: log.debug("worker time: %.3gs download, %.3gs write", t1 - t0, t2 - t1) self.signals.download_succeed.emit( self.img_id, self.md5, f.name, str(self.target_name) ) self.signals.finished.emit()