API Reference¶
Wrapper¶
Wrapper for queue based producer/consumers.
-
class
multiconsumers_queue.wrapper.Producer(q, fn, stats, consumers_cnt, name='producer', lock=NOTHING, stop_now=False, wait_for_queue=0.01)[source]¶ Wrapper for data source function.
- Parameters
q (queue.Queue) – Queue fot interconnection
fn (typing.Callable[[], typing.Iterator[typing.Any]]) – Real producer
stats (typing.Counter[str]) – Shared counter with statistics
consumers_cnt (int) – How many consumers to start
name (str) – Process name
lock (threading.Lock) – Lock for shared counter
stop_now (bool) – Stop flag.If set it to True then consumers will be stopped
wait_for_queue (float) – sleep time for preventing CPU throttling
-
class
multiconsumers_queue.wrapper.Consumer(q, fn, stats, name='consumer', lock=NOTHING)[source]¶ Wrapper for data processing function.
- Parameters
q (queue.Queue) – Queue fot interconnection
fn (typing.Callable) – Real producer
stats (typing.Counter[str]) – Shared counter with statistics
name (str) – Process name
lock (threading.Lock) – Lock for shared counter
-
class
multiconsumers_queue.wrapper.ThreadPool(src, dst, notifier, notification_interval=60, consumers_cnt=5, q=NOTHING, stats=NOTHING, producer=NOTHING, consumers=NOTHING)[source]¶ Producer/Consumers thread pool.
- Parameters
src (typing.Callable[[], typing.Iterator[typing.Any]]) – Producer fn
dst (typing.Callable) – Consumer fn
notifier (typing.Callable) – ScheduledAction fn
notification_interval (typing.Union[int, float]) – time between notifications in seconds
consumers_cnt (int) – How many consumers to start
q (queue.Queue) – Queue fot interconnection
stats (typing.Counter[str]) – Shared counter with statistics
producer (Producer) – Producer
consumers (typing.List[Consumer]) – list of Consumer objects
Helpers¶
Miscellaneous helpers.
-
multiconsumers_queue.helpers.reset_logger(level)[source]¶ Customize logging output.
- Parameters
level (
str) – logging level- Return type
None
-
class
multiconsumers_queue.helpers.ScheduledAction(fn, args=[], kwargs={}, notification_interval=60, is_running=False, timer=NOTHING, start_time=NOTHING, stop_time=None)[source]¶ Helper for action that should be run only after a certain amount of time has passed.
- Parameters
fn (Callable) – Callable for execute
args (List) – Callable args
kwargs (Dict[str, Any]) – Callable kwargs
notification_interval (Union[int, float]) – time between fn calls in seconds
is_running (bool) – Running state flag
timer (threading.Timer) – Timer
start_time (arrow.arrow.Arrow) – Starting time
stop_time (Optional[arrow.arrow.Arrow]) – Stopping time