Class: RedisIPC::Stream::Dispatcher
- Defined in:
- lib/redis_ipc/stream/dispatcher.rb
Overview
A consumer that reads unread entries and assigns them to consumers to be processed
Constant Summary collapse
- DEFAULTS =
{ # The number of Dispatchers to create pool_size: 10, # How often should the consumer process entries in seconds execution_interval: 0.001 # 1ms }.freeze
Instance Attribute Summary
Attributes inherited from Consumer
Instance Method Summary collapse
-
#check_for_entries ⇒ Object
Reads in any unread entries in the stream for the group and dispatches it to a load balanced consumer.
-
#initialize(name) ⇒ Dispatcher
constructor
A new instance of Dispatcher.
- #listen ⇒ Object
Methods inherited from Consumer
#add_callback, #inspect, #listening?, #stop_listening
Constructor Details
#initialize(name) ⇒ Dispatcher
Returns a new instance of Dispatcher.
17 18 19 |
# File 'lib/redis_ipc/stream/dispatcher.rb', line 17 def initialize(name, **) super(name, options: DEFAULTS, **) end |
Instance Method Details
#check_for_entries ⇒ Object
Reads in any unread entries in the stream for the group and dispatches it to a load balanced consumer
One dispatcher per group will receive a stream entry, regardless of content. Dispatchers will then ignore all entries that are not
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
# File 'lib/redis_ipc/stream/dispatcher.rb', line 34 def check_for_entries entry = read_from_stream return if entry.nil? # When the dispatcher reads an entry from the stream, it is added to its PEL. Acknowledge to remove it. # Usually, the entry is also deleted but all dispatchers receive the same entry and deleting it causes it # to not be passed to the next dispatcher if invalid_entry?(entry) acknowledge_entry(entry) return end instance_id = entry.pending? ? @instance_id : entry.instance_id consumer = find_load_balanced_consumer(instance_id) if consumer.nil? log("DISPATCH_FAILURE #{group_name}:#{name} failed to find a consumer", severity: :error) acknowledge_entry(entry) return end log("Dispatching to #{consumer.name}: #{entry.id}") @redis.claim_entry(consumer, entry) end |
#listen ⇒ Object
21 22 23 24 25 |
# File 'lib/redis_ipc/stream/dispatcher.rb', line 21 def listen check_for_consumers! super end |