Source code for grab.spider.service.task_generator

from __future__ import annotations

import time
from collections.abc import Callable, Iterator
from queue import Queue

from grab.spider.interface import FatalErrorQueueItem
from grab.spider.queue_backend.base import BaseTaskQueue
from grab.spider.task import Task

from .base import BaseService, ServiceWorker
from .parser import ParserService
from .task_dispatcher import TaskDispatcherService


[docs]class TaskGeneratorService(BaseService): def __init__( self, fatal_error_queue: Queue[FatalErrorQueueItem], real_generator: Iterator[Task], thread_number: int, get_task_queue: Callable[[], BaseTaskQueue], parser_service: ParserService, task_dispatcher: TaskDispatcherService, ) -> None: super().__init__(fatal_error_queue) self.real_generator = real_generator self.task_queue_threshold = max(200, thread_number * 2) self.get_task_queue = get_task_queue self.parser_service = parser_service self.task_dispatcher = task_dispatcher self.worker = self.create_worker(self.worker_callback) self.register_workers(self.worker)
[docs] def worker_callback(self, worker: ServiceWorker) -> None: # at this point I guess the task queue is set # i.e. "spider.run()" is called task_queue = self.get_task_queue() while not worker.stop_event.is_set(): worker.process_pause_signal() queue_size = max( task_queue.size(), self.parser_service.input_queue.qsize(), ) if queue_size < self.task_queue_threshold: try: for _ in range(self.task_queue_threshold - queue_size): if worker.pause_event.is_set(): return task = next(self.real_generator) self.task_dispatcher.input_queue.put( (task, None, {"source": "task_generator"}) ) except StopIteration: return else: time.sleep(0.1)