Source code for grab.spider.queue_backend.base

from __future__ import annotations

from abc import abstractmethod
from datetime import datetime
from typing import Any
from uuid import uuid4

from grab.spider.task import Task


[docs]class BaseTaskQueue: def __init__(self, **kwargs: Any) -> None: pass
[docs] def random_queue_name(self) -> str: return "task_queue_{}".format(str(uuid4()).replace("-", "_"))
[docs] def put( self, task: Task, priority: int, schedule_time: None | datetime = None, ) -> None: # pragma: no cover raise NotImplementedError
[docs] def get(self) -> Task: # pragma: no cover """Return `Task` object or raise `Queue.Empty` exception. @returns: `grab.spider.task.Task` object @raises: `Queue.Empty` exception """ raise NotImplementedError
@abstractmethod
[docs] def size(self) -> int: # pragma: no cover raise NotImplementedError
[docs] def clear(self) -> None: # pragma: no cover """Remove all tasks from the queue.""" raise NotImplementedError
[docs] def close(self) -> None: # pragma: no cover raise NotImplementedError