Source code for multiconsumers_queue.wrapper

"""Wrapper for queue based producer/consumers."""
from __future__ import annotations

from collections import Counter
import concurrent.futures
from itertools import chain
import queue
import threading
import time
import typing

import attr
from loguru import logger as log

from multiconsumers_queue.helpers import ScheduledAction


[docs]@attr.s(auto_attribs=True) class Producer: """Wrapper for data source function. Args: q: Queue fot interconnection fn: Real producer stats: Shared counter with statistics consumers_cnt: How many consumers to start name: Process name lock: Lock for shared counter stop_now: Stop flag.If set it to True then consumers will be stopped wait_for_queue: sleep time for preventing CPU throttling """ q: queue.Queue fn: typing.Callable[[], typing.Iterator[typing.Any]] stats: typing.Counter[str] consumers_cnt: int # we need it for properly shutdown consumers name: str = "producer" lock: threading.Lock = attr.ib(factory=threading.Lock, repr=False) stop_now: bool = False # External signal can stop producer wait_for_queue: float = 0.01 # Minimize CPU load for waiting loop
[docs] def stop_consumers(self) -> None: """Send None to the consumers.""" for _ in range(self.consumers_cnt): self.q.put(None)
[docs] def run(self) -> None: """Start Producer.""" log.debug(f"{self.name} started") try: for item in self.fn(): if self.stop_now: log.info(f"{self.name} Stop signal received. Gracefully shutdown") while not self.q.empty(): item = self.q.get() self.q.task_done() log.trace(f"{item} dropped") self.stats["items dropped"] += 1 break while self.q.full(): time.sleep(self.wait_for_queue) self.q.put(item) log.trace(f"{self.name} put {item}") with self.lock: self.stats["items produced"] += 1 except Exception: # noqa log.exception("Unexpected producer error") with self.lock: self.stats["producer errors"] += 1 finally: self.stop_consumers() log.debug(f"{self.name} finished")
[docs] def stop(self) -> None: """Interrupt Producer.""" self.stop_now = True
[docs]@attr.s(auto_attribs=True) class Consumer: """Wrapper for data processing function. Args: q: Queue fot interconnection fn: Real producer stats: Shared counter with statistics name: Process name lock: Lock for shared counter """ q: queue.Queue fn: typing.Callable stats: typing.Counter[str] name: str = "consumer" lock: threading.Lock = attr.ib(factory=threading.Lock, repr=False)
[docs] def run(self) -> None: """Start Consumer.""" log.debug(f"{self.name} started") wait_for_items = True while wait_for_items: item = self.q.get() if item is None: wait_for_items = False self.q.task_done() else: try: log.trace(f"{self.name} start processing {item}") self.fn(item) log.trace(f"{self.name} done") except Exception: # noqa log.exception("Unexpected consumer error") with self.lock: self.stats["consumer errors"] += 1 else: with self.lock: self.stats["items consumed"] += 1 finally: self.q.task_done() log.debug(f"{self.name} finished")
[docs]@attr.s(auto_attribs=True) class ThreadPool: """Producer/Consumers thread pool. Args: src: Producer fn dst: Consumer fn notifier: ScheduledAction fn notification_interval: time between notifications in seconds consumers_cnt: How many consumers to start q: Queue fot interconnection stats: Shared counter with statistics producer: Producer consumers: list of Consumer objects """ src: typing.Callable[[], typing.Iterator[typing.Any]] # real producer dst: typing.Callable # real consumer notifier: typing.Callable notification_interval: typing.Union[int, float] = 60 consumers_cnt: int = 5 q: queue.Queue = attr.ib() stats: typing.Counter[str] = attr.ib(factory=Counter) producer: Producer = attr.ib() consumers: typing.List[Consumer] = attr.ib()
[docs] @consumers.default # noqa def init_consumers(self) -> typing.List[Consumer]: """Init Consumer. Returns: List[Consumer] """ return [ Consumer(self.q, self.dst, self.stats, f"consumer-{idx}") for idx in range(self.consumers_cnt) ]
[docs] @producer.default # noqa def init_producer(self) -> Producer: """Init Producer. Returns: Producer """ return Producer(self.q, self.src, self.stats, self.consumers_cnt)
[docs] @q.default # noqa def init_queue(self) -> queue.Queue: """Init queue. Returns: queue.Queue """ return queue.Queue(self.consumers_cnt)
[docs] def run(self) -> None: """Start processing.""" workers: chain[typing.Union[Producer, Consumer]] = chain([self.producer], self.consumers) notifier = ScheduledAction(self.notifier, notification_interval=self.notification_interval) try: with concurrent.futures.ThreadPoolExecutor( max_workers=1 + self.consumers_cnt ) as executor: futures = {executor.submit(each.run) for each in workers} concurrent.futures.wait(futures) self.q.join() finally: notifier.stop()