API Reference

Wrapper

Wrapper for queue based producer/consumers.

class multiconsumers_queue.wrapper.Producer(q, fn, stats, consumers_cnt, name='producer', lock=NOTHING, stop_now=False, wait_for_queue=0.01)[source]

Wrapper for data source function.

Parameters
  • q (queue.Queue) – Queue fot interconnection

  • fn (typing.Callable[[], typing.Iterator[typing.Any]]) – Real producer

  • stats (typing.Counter[str]) – Shared counter with statistics

  • consumers_cnt (int) – How many consumers to start

  • name (str) – Process name

  • lock (threading.Lock) – Lock for shared counter

  • stop_now (bool) – Stop flag.If set it to True then consumers will be stopped

  • wait_for_queue (float) – sleep time for preventing CPU throttling

stop_consumers()[source]

Send None to the consumers.

Return type

None

run()[source]

Start Producer.

Return type

None

stop()[source]

Interrupt Producer.

Return type

None

class multiconsumers_queue.wrapper.Consumer(q, fn, stats, name='consumer', lock=NOTHING)[source]

Wrapper for data processing function.

Parameters
  • q (queue.Queue) – Queue fot interconnection

  • fn (typing.Callable) – Real producer

  • stats (typing.Counter[str]) – Shared counter with statistics

  • name (str) – Process name

  • lock (threading.Lock) – Lock for shared counter

run()[source]

Start Consumer.

Return type

None

class multiconsumers_queue.wrapper.ThreadPool(src, dst, notifier, notification_interval=60, consumers_cnt=5, q=NOTHING, stats=NOTHING, producer=NOTHING, consumers=NOTHING)[source]

Producer/Consumers thread pool.

Parameters
  • src (typing.Callable[[], typing.Iterator[typing.Any]]) – Producer fn

  • dst (typing.Callable) – Consumer fn

  • notifier (typing.Callable) – ScheduledAction fn

  • notification_interval (typing.Union[int, float]) – time between notifications in seconds

  • consumers_cnt (int) – How many consumers to start

  • q (queue.Queue) – Queue fot interconnection

  • stats (typing.Counter[str]) – Shared counter with statistics

  • producer (Producer) – Producer

  • consumers (typing.List[Consumer]) – list of Consumer objects

init_consumers()[source]

Init Consumer.

Return type

List[Consumer]

Returns

List[Consumer]

init_producer()[source]

Init Producer.

Return type

Producer

Returns

Producer

init_queue()[source]

Init queue.

Return type

Queue

Returns

queue.Queue

run()[source]

Start processing.

Return type

None

Helpers

Miscellaneous helpers.

multiconsumers_queue.helpers.reset_logger(level)[source]

Customize logging output.

Parameters

level (str) – logging level

Return type

None

class multiconsumers_queue.helpers.ScheduledAction(fn, args=[], kwargs={}, notification_interval=60, is_running=False, timer=NOTHING, start_time=NOTHING, stop_time=None)[source]

Helper for action that should be run only after a certain amount of time has passed.

Parameters
  • fn (Callable) – Callable for execute

  • args (List) – Callable args

  • kwargs (Dict[str, Any]) – Callable kwargs

  • notification_interval (Union[int, float]) – time between fn calls in seconds

  • is_running (bool) – Running state flag

  • timer (threading.Timer) – Timer

  • start_time (arrow.arrow.Arrow) – Starting time

  • stop_time (Optional[arrow.arrow.Arrow]) – Stopping time

init_timer()[source]

Start new timer.

Return type

Timer

Returns

threading.Timer

start()[source]

Set action starting time.

Return type

Arrow

Returns

arrow.arrow.Arrow

stop()[source]

Stop timer.

Return type

None