Class: RedisIPC::Stream::Ledger::Consumer

Inherits:
Consumer
  • Object
show all
Defined in:
lib/redis_ipc/stream/ledger/consumer.rb

Overview

Note:

It felt better to me to let Consumer be generic

A consumer that only consumes entries that are registered with the ledger

Constant Summary

Constants inherited from Consumer

Consumer::DEFAULTS

Instance Attribute Summary

Attributes inherited from Consumer

#name, #redis

Instance Method Summary collapse

Methods inherited from Consumer

#add_callback, #inspect, #listen, #listening?, #stop_listening

Constructor Details

#initialize(ledger:) ⇒ Consumer

Returns a new instance of Consumer.



12
13
14
15
# File 'lib/redis_ipc/stream/ledger/consumer.rb', line 12

def initialize(*, ledger:, **)
  super(*, **)
  @ledger = ledger
end

Instance Method Details

#check_for_entriesObject



17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/redis_ipc/stream/ledger/consumer.rb', line 17

def check_for_entries
  entry = read_from_stream
  return if entry.nil? || invalid_entry?(entry)

  ledger_entry = @ledger.fetch_entry(entry)
  is_a_request = ledger_entry.nil? && entry.pending?
  is_a_response = !ledger_entry.nil? && (entry.fulfilled? || entry.rejected?)

  if is_a_request
    process_request(entry)
  elsif is_a_response
    process_response(entry, ledger_entry)
  end

  # In the normal Consumer workflow, `#check_for_entries` will pass the entry to any observers listening.
  # This is fine except Concurrent::TimerTask triggers the next execution before notifying the observers,
  # which causes `#check_for_entries` to be called before the entry is acknowledged and removed from
  # the stream.
  # The solution I decided to go with is to update the observers before TimerTask has a chance to and then
  # let TimerTask update the observers again, but with nil.
  # Code that utilizes `Consumer#add_callback` will never notice this, however, code utilizing
  # `Consumer#add_observer` will need to handle the `nil` "entry" that can be passed through
  nil
ensure
  acknowledge_and_remove(entry) unless entry.nil?
end