Class: RedisIPC::Stream::Ledger::Consumer
- Defined in:
- lib/redis_ipc/stream/ledger/consumer.rb
Overview
Note:
It felt better to me to let Consumer be generic
A consumer that only consumes entries that are registered with the ledger
Constant Summary
Constants inherited from Consumer
Instance Attribute Summary
Attributes inherited from Consumer
Instance Method Summary collapse
- #check_for_entries ⇒ Object
-
#initialize(ledger:) ⇒ Consumer
constructor
A new instance of Consumer.
Methods inherited from Consumer
#add_callback, #inspect, #listen, #listening?, #stop_listening
Constructor Details
#initialize(ledger:) ⇒ Consumer
Returns a new instance of Consumer.
12 13 14 15 |
# File 'lib/redis_ipc/stream/ledger/consumer.rb', line 12 def initialize(*, ledger:, **) super(*, **) @ledger = ledger end |
Instance Method Details
#check_for_entries ⇒ Object
17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
# File 'lib/redis_ipc/stream/ledger/consumer.rb', line 17 def check_for_entries entry = read_from_stream return if entry.nil? || invalid_entry?(entry) ledger_entry = @ledger.fetch_entry(entry) is_a_request = ledger_entry.nil? && entry.pending? is_a_response = !ledger_entry.nil? && (entry.fulfilled? || entry.rejected?) if is_a_request process_request(entry) elsif is_a_response process_response(entry, ledger_entry) end # In the normal Consumer workflow, `#check_for_entries` will pass the entry to any observers listening. # This is fine except Concurrent::TimerTask triggers the next execution before notifying the observers, # which causes `#check_for_entries` to be called before the entry is acknowledged and removed from # the stream. # The solution I decided to go with is to update the observers before TimerTask has a chance to and then # let TimerTask update the observers again, but with nil. # Code that utilizes `Consumer#add_callback` will never notice this, however, code utilizing # `Consumer#add_observer` will need to handle the `nil` "entry" that can be passed through nil ensure acknowledge_and_remove(entry) unless entry.nil? end |