from __future__ import annotations
from abc import abstractmethod
from datetime import datetime
from typing import Any
from uuid import uuid4
from grab.spider.task import Task
[docs]class BaseTaskQueue:
def __init__(self, **kwargs: Any) -> None:
pass
[docs] def random_queue_name(self) -> str:
return "task_queue_{}".format(str(uuid4()).replace("-", "_"))
[docs] def put(
self,
task: Task,
priority: int,
schedule_time: None | datetime = None,
) -> None: # pragma: no cover
raise NotImplementedError
[docs] def get(self) -> Task: # pragma: no cover
"""Return `Task` object or raise `Queue.Empty` exception.
@returns: `grab.spider.task.Task` object
@raises: `Queue.Empty` exception
"""
raise NotImplementedError
@abstractmethod
[docs] def size(self) -> int: # pragma: no cover
raise NotImplementedError
[docs] def clear(self) -> None: # pragma: no cover
"""Remove all tasks from the queue."""
raise NotImplementedError
[docs] def close(self) -> None: # pragma: no cover
raise NotImplementedError