Source code for grab.spider.service.base

from __future__ import annotations

import logging
import sys
from collections.abc import Callable, Iterable
from queue import Queue
from threading import Event, Thread
from types import TracebackType
from typing import Any, Tuple, Type, cast

from grab.spider.interface import FatalErrorQueueItem

[docs]logger = logging.getLogger(__name__)
[docs]class ServiceWorker: def __init__( self, fatal_error_queue: Queue[FatalErrorQueueItem], worker_callback: Callable[..., Any], ) -> None: self.fatal_error_queue = fatal_error_queue self.thread = Thread( target=self.worker_callback_wrapper(worker_callback), args=[self] ) self.thread.daemon = True self.thread.name = self.build_thread_name(worker_callback) self.pause_event = Event() self.stop_event = Event() self.resume_event = Event() self.activity_paused = Event() self.is_busy_event = Event()
[docs] def build_thread_name(self, worker_callback: Callable[..., Any]) -> str: cls_name = ( worker_callback.__self__.__class__.__name__ if hasattr(worker_callback, "__self__") else "NA" ) return "worker:{}:{}".format(cls_name, worker_callback.__name__)
[docs] def worker_callback_wrapper( self, callback: Callable[..., Any] ) -> Callable[..., None]: def wrapper(*args: Any, **kwargs: Any) -> None: try: callback(*args, **kwargs) except Exception as ex: logger.exception("Spider Service Fatal Error", exc_info=ex) # pylint: disable=deprecated-typing-alias self.fatal_error_queue.put( cast( Tuple[Type[Exception], Exception, TracebackType], sys.exc_info() ) ) return wrapper
[docs] def start(self) -> None: self.thread.start()
[docs] def stop(self) -> None: self.stop_event.set()
[docs] def process_pause_signal(self) -> None: if self.pause_event.is_set(): self.activity_paused.set() self.resume_event.wait()
[docs] def pause(self) -> None: self.resume_event.clear() self.pause_event.set() while True: if self.activity_paused.wait(0.1): break if not self.is_alive(): break
[docs] def resume(self) -> None: self.pause_event.clear() self.activity_paused.clear() self.resume_event.set()
[docs] def is_alive(self) -> bool: return self.thread.is_alive()
[docs]class BaseService: def __init__(self, fatal_error_queue: Queue[FatalErrorQueueItem]) -> None: self.fatal_error_queue = fatal_error_queue self.worker_registry: list[ServiceWorker] = []
[docs] def create_worker(self, worker_action: Callable[..., None]) -> ServiceWorker: return ServiceWorker(self.fatal_error_queue, worker_action)
[docs] def iterate_workers(self, objects: list[ServiceWorker]) -> Iterable[ServiceWorker]: for obj in objects: assert isinstance(obj, (ServiceWorker, list)) if isinstance(obj, ServiceWorker): yield obj elif isinstance(obj, list): yield from obj
[docs] def start(self) -> None: for worker in self.iterate_workers(self.worker_registry): worker.start()
[docs] def stop(self) -> None: for worker in self.iterate_workers(self.worker_registry): worker.stop()
[docs] def pause(self) -> None: for worker in self.iterate_workers(self.worker_registry): worker.pause()
[docs] def resume(self) -> None: for worker in self.iterate_workers(self.worker_registry): worker.resume()
[docs] def register_workers(self, *args: Any) -> None: self.worker_registry = list(args)
[docs] def is_busy(self) -> bool: return any( x.is_busy_event.is_set() for x in self.iterate_workers(self.worker_registry) )
[docs] def is_alive(self) -> bool: return any(x.is_alive() for x in self.iterate_workers(self.worker_registry))