Class: RedisIPC::Stream::Dispatcher

Inherits:
Consumer
  • Object
show all
Defined in:
lib/redis_ipc/stream/dispatcher.rb

Overview

A consumer that reads unread entries and assigns them to consumers to be processed

Constant Summary collapse

DEFAULTS =
{
  # The number of Dispatchers to create
  pool_size: 10,

  # How often should the consumer process entries in seconds
  execution_interval: 0.001 # 1ms
}.freeze

Instance Attribute Summary

Attributes inherited from Consumer

#name, #redis

Instance Method Summary collapse

Methods inherited from Consumer

#add_callback, #inspect, #listening?, #stop_listening

Constructor Details

#initialize(name) ⇒ Dispatcher

Returns a new instance of Dispatcher.



17
18
19
# File 'lib/redis_ipc/stream/dispatcher.rb', line 17

def initialize(name, **)
  super(name, options: DEFAULTS, **)
end

Instance Method Details

#check_for_entriesObject

Reads in any unread entries in the stream for the group and dispatches it to a load balanced consumer

One dispatcher per group will receive a stream entry, regardless of content. Dispatchers will then ignore all entries that are not



34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/redis_ipc/stream/dispatcher.rb', line 34

def check_for_entries
  entry = read_from_stream
  return if entry.nil?

  # When the dispatcher reads an entry from the stream, it is added to its PEL. Acknowledge to remove it.
  # Usually, the entry is also deleted but all dispatchers receive the same entry and deleting it causes it
  # to not be passed to the next dispatcher
  if invalid_entry?(entry)
    acknowledge_entry(entry)
    return
  end

  instance_id = entry.pending? ? @instance_id : entry.instance_id

  consumer = find_load_balanced_consumer(instance_id)
  if consumer.nil?
    log("DISPATCH_FAILURE #{group_name}:#{name} failed to find a consumer", severity: :error)

    acknowledge_entry(entry)
    return
  end

  log("Dispatching to #{consumer.name}: #{entry.id}")

  @redis.claim_entry(consumer, entry)
end

#listenObject



21
22
23
24
25
# File 'lib/redis_ipc/stream/dispatcher.rb', line 21

def listen
  check_for_consumers!

  super
end