Class: RedisIPC::Stream::Consumer
- Inherits:
-
Object
- Object
- RedisIPC::Stream::Consumer
- Defined in:
- lib/redis_ipc/stream/consumer.rb
Direct Known Subclasses
Constant Summary collapse
- DEFAULTS =
{ # The number of Consumers to create to process entries for their group. # Any consumers created for a group will only process entries for their group pool_size: 20, # How often does the consumer check for new entries execution_interval: 0.001 # 1ms }.freeze
Instance Attribute Summary collapse
-
#name ⇒ Object
readonly
Returns the value of attribute name.
-
#redis ⇒ Object
readonly
Returns the value of attribute redis.
Instance Method Summary collapse
-
#add_callback(callback_type, observer = nil, function = :update, &block) ⇒ Object
A wrapper for #add_observer that simplifies processing entries by removing the need to have code on every observer to handle nil entries or exceptions If manual exception handling is needed, use #add_observer.
-
#check_for_entries ⇒ Object
The method that is called by the consumer's task.
-
#initialize(name, redis:, options: {}) ⇒ Consumer
constructor
Creates a new Consumer for the given stream.
-
#inspect ⇒ String
Returns the inspected object.
-
#listen ⇒ Object
Starts checking the stream for new entries.
-
#listening? ⇒ Boolean
Returns true if the consumer is listening for entries.
-
#stop_listening ⇒ Object
Stops checking the stream for new entries.
Constructor Details
#initialize(name, redis:, options: {}) ⇒ Consumer
Creates a new Consumer for the given stream. This class is configured to read from its own Pending Entries List by default. This means that this class cannot read entries without them being claimed by this consumer. See Dispatcher
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
# File 'lib/redis_ipc/stream/consumer.rb', line 29 def initialize(name, redis:, options: {}) @name = name.freeze raise ArgumentError, "Consumer was created without a name" if @name.blank? @redis = redis @instance_id = redis.instance_id @redis.create_consumer(self) @options = DEFAULTS.merge().freeze @logger = @redis.logger # This is the workhorse for the consumer @task = Concurrent::TimerTask.new( execution_interval: @options[:execution_interval], freeze_on_deref: true ) { check_for_entries } end |
Instance Attribute Details
#name ⇒ Object (readonly)
Returns the value of attribute name.
15 16 17 |
# File 'lib/redis_ipc/stream/consumer.rb', line 15 def name @name end |
#redis ⇒ Object (readonly)
Returns the value of attribute redis.
15 16 17 |
# File 'lib/redis_ipc/stream/consumer.rb', line 15 def redis @redis end |
Instance Method Details
#add_callback(callback_type, observer = nil, function = :update, &block) ⇒ Object
A wrapper for #add_observer that simplifies processing entries by removing the need to have code on every observer to handle nil entries or exceptions If manual exception handling is needed, use #add_observer. Just remember that the result can be nil
59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 |
# File 'lib/redis_ipc/stream/consumer.rb', line 59 def add_callback(callback_type, observer = nil, function = :update, &block) handler = lambda do |data| if block yield(data) else observer.send(function, data) end end case callback_type # Ignores exceptions and only calls when it's a success when :on_message add_observer do |_, entry, exception| next if entry.nil? || exception handler.call(entry) end # Ignores successful entries and only calls on exceptions when :on_error add_observer do |_, _e, exception| next unless exception handler.call(exception) end else raise ArgumentError, "Invalid callback type #{callback_type} provided. Expected :on_message, or :on_error" end end |
#check_for_entries ⇒ Object
This is default functionality. This is expected to be overwritten by other classes
The method that is called by the consumer's task.
136 137 138 139 140 141 142 143 |
# File 'lib/redis_ipc/stream/consumer.rb', line 136 def check_for_entries entry = read_from_stream return if entry.nil? || invalid_entry?(entry) entry ensure acknowledge_and_remove(entry) unless entry.nil? end |
#inspect ⇒ String
Returns the inspected object
126 127 128 |
# File 'lib/redis_ipc/stream/consumer.rb', line 126 def inspect "#<#{self.class}:0x#{object_id} name=\"#{name}\" stream_name=\"#{stream_name}\" group_name=\"#{group_name}\" listening=#{listening?}>" end |
#listen ⇒ Object
Starts checking the stream for new entries
100 101 102 103 104 105 106 107 108 |
# File 'lib/redis_ipc/stream/consumer.rb', line 100 def listen return if listening? @task.execute change_availability log("Ready", severity: :debug) @task end |
#listening? ⇒ Boolean
Returns true if the consumer is listening for entries
93 94 95 |
# File 'lib/redis_ipc/stream/consumer.rb', line 93 def listening? @task.running? end |
#stop_listening ⇒ Object
Stops checking the stream for new entries
113 114 115 116 117 118 119 |
# File 'lib/redis_ipc/stream/consumer.rb', line 113 def stop_listening @task.shutdown change_availability log("Stopped", severity: :debug) true end |