Source code for grab.spider.queue_backend.mongodb

from __future__ import annotations

import logging
import pickle
import queue
from datetime import datetime, timezone
from typing import Any, cast

import pymongo
from bson import Binary
from pymongo import MongoClient
from pymongo.collection import Collection

from grab.spider.queue_backend.base import BaseTaskQueue
from grab.spider.task import Task

[docs]LOG = logging.getLogger("grab.spider.queue_backend.mongodb")
[docs]class MongodbTaskQueue(BaseTaskQueue): def __init__( self, connection_args: None | dict[str, Any] = None, collection_name: None | str = None, database_name: str = "grab_spider", ) -> None: super().__init__() self.database_name: str = database_name self.collection_name: str = collection_name or self.random_queue_name() self.connection: MongoClient[Any] = MongoClient(**(connection_args or {})) self.collection: Collection[Any] = self.connection[self.database_name][ self.collection_name ] LOG.debug( "Using collection %s in database %s", self.collection_name, self.database_name, ) self.collection.create_index([("priority", 1)])
[docs] def size(self) -> int: return self.collection.count_documents({})
[docs] def put( self, task: Task, priority: int, schedule_time: None | datetime = None, ) -> None: if schedule_time is None: schedule_time = datetime.now(timezone.utc) item = { "task": Binary(pickle.dumps(task)), "priority": priority, "schedule_time": schedule_time, } self.collection.insert_one(item)
[docs] def get(self) -> Task: item = self.collection.find_one_and_delete( {"schedule_time": {"$lt": datetime.now(timezone.utc)}}, sort=[("priority", pymongo.ASCENDING)], ) if item is None: raise queue.Empty return cast(Task, pickle.loads(item["task"])) # noqa: S301
[docs] def clear(self) -> None: self.collection.delete_many({})
[docs] def close(self) -> None: self.connection.close()