Source code for grab.spider.service.network

from __future__ import annotations

import time
from abc import abstractmethod
from collections.abc import Callable
from queue import Empty, Queue
from typing import Any, Dict, Literal

from grab.spider.interface import FatalErrorQueueItem
from grab.spider.task import Task

from .base import BaseService, ServiceWorker

[docs]NetworkResult = Dict[str, Any] # pylint: disable=deprecated-typing-alias
[docs]class BaseNetworkService(BaseService): @abstractmethod
[docs] def get_active_threads_number(self) -> int: # pragma: no cover raise NotImplementedError
[docs]class NetworkServiceThreaded(BaseNetworkService): def __init__( self, fatal_error_queue: Queue[FatalErrorQueueItem], thread_number: int, process_task: Callable[[Task], None], get_task_from_queue: Callable[[], None | Literal[True] | Task], ) -> None: super().__init__(fatal_error_queue) self.thread_number = thread_number self.process_task = process_task self.get_task_from_queue = get_task_from_queue self.worker_pool = [] for _ in range(self.thread_number): self.worker_pool.append(self.create_worker(self.worker_callback)) self.register_workers(self.worker_pool)
[docs] def get_active_threads_number(self) -> int: return sum( 1 for x in self.iterate_workers(self.worker_registry) if x.is_busy_event.is_set() )
# TODO: supervisor worker to restore failed worker threads
[docs] def worker_callback(self, worker: ServiceWorker) -> None: while not worker.stop_event.is_set(): worker.process_pause_signal() try: task = self.get_task_from_queue() except Empty: time.sleep(0.1) else: if task is None or task is True: time.sleep(0.1) else: worker.is_busy_event.set() try: self.process_task(task) finally: worker.is_busy_event.clear()