Class: RedisIPC::Stream
- Inherits:
-
Object
- Object
- RedisIPC::Stream
- Defined in:
- lib/redis_ipc/stream.rb,
lib/redis_ipc/stream/entry.rb,
lib/redis_ipc/stream/ledger.rb,
lib/redis_ipc/stream/commands.rb,
lib/redis_ipc/stream/consumer.rb,
lib/redis_ipc/stream/dispatcher.rb,
lib/redis_ipc/stream/ledger/consumer.rb
Defined Under Namespace
Classes: Commands, Consumer, Dispatcher, Entry, Ledger
Instance Attribute Summary collapse
-
#redis ⇒ Object
readonly
Returns the value of attribute redis.
Instance Method Summary collapse
-
#connect(redis_options: {}, **options) ⇒ RedisIPC::Stream
Creates the consumers/dispatchers and connects to the stream.
-
#connected? ⇒ Boolean
Is the Stream connected or not?.
-
#disconnect ⇒ Object
Disconnects the Stream from Redis and stops processing entries.
-
#fulfill_request(entry, content:) ⇒ Object
Used when responding to a request, this marks the entry as fulfilled and sends it back to the sending group.
-
#initialize(stream, group) ⇒ Stream
constructor
Joins a Stream with the given name and creates the group with the given name.
-
#on_error(&block) ⇒ Object
Sets the on_error callback that is called if some part of the process raises an exception.
-
#on_request(&block) ⇒ Object
Sets the on_request callback that is called when the group receives an inbound request entry.
-
#reject_request(entry, content:) ⇒ Object
Used when responding to a request, this marks the entry as rejected and sends it back to the sending group.
-
#send_to_group(content:, to:) ⇒ RedisIPC::Response
Sends data (content) to a group on the Stream.
Constructor Details
#initialize(stream, group) ⇒ Stream
Joins a Stream with the given name and creates the group with the given name
16 17 18 19 20 21 22 23 24 |
# File 'lib/redis_ipc/stream.rb', line 16 def initialize(stream, group) self.stream_name = stream.to_s self.group_name = group.to_s @on_request = nil @on_error = lambda do |exception| log("#{exception.class} - #{exception}\n#{exception.backtrace}", severity: :error) end end |
Instance Attribute Details
#redis ⇒ Object (readonly)
Returns the value of attribute redis.
8 9 10 |
# File 'lib/redis_ipc/stream.rb', line 8 def redis @redis end |
Instance Method Details
#connect(redis_options: {}, **options) ⇒ RedisIPC::Stream
Creates the consumers/dispatchers and connects to the stream
95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 |
# File 'lib/redis_ipc/stream.rb', line 95 def connect(redis_options: {}, **) check_for_valid_configuration! = { logger: nil, pool_size: 10, max_pool_size: nil, ledger: Ledger::DEFAULTS, consumer: Ledger::Consumer::DEFAULTS, dispatcher: Dispatcher::DEFAULTS }.deep_merge() # # The redis pool is shared between the stream, all consumers, and all dispatchers. # # A consumer (Consumer, Dispatcher, Ledger::Consumer) should only ever use one Redis client at a time # since there are no blocking commands and tasks cannot stack. This means the bulk of the pool size # is dependent on how many threads are going to be sending entries at a single time. # # Since this code has no real life testing, this default is purely based on educated guesses. # max_pool_size = [:max_pool_size] || ( [:pool_size] + (.dig(:consumer, :pool_size) * 3) + (.dig(:dispatcher, :pool_size) * 3) ) @logger = [:logger] @redis = Commands.new( stream_name, group_name, max_pool_size: max_pool_size, redis_options: , **.slice(:logger, :reset) ) @instance_id = @redis.instance_id # Make sure the group is there @redis.create_group @redis.prune_consumers @ledger = Ledger.new([:ledger]) @consumers = create_consumers([:consumer]) @dispatchers = create_dispatchers([:dispatcher]) log("Connected") self end |
#connected? ⇒ Boolean
Is the Stream connected or not?
149 150 151 |
# File 'lib/redis_ipc/stream.rb', line 149 def connected? !@consumers.nil? && !@dispatchers.nil? end |
#disconnect ⇒ Object
Disconnects the Stream from Redis and stops processing entries
156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 |
# File 'lib/redis_ipc/stream.rb', line 156 def disconnect # #connect might've not been called... @dispatchers&.each(&:stop_listening) @consumers&.each(&:stop_listening) @redis&.shutdown @dispatchers = nil @consumers = nil @ledger = nil @redis = nil log("Disconnected") self end |
#fulfill_request(entry, content:) ⇒ Object
Used when responding to a request, this marks the entry as fulfilled and sends it back to the sending group
196 197 198 199 200 201 202 203 204 |
# File 'lib/redis_ipc/stream.rb', line 196 def fulfill_request(entry, content:) check_for_ledger! log("Fulfilling entry #{entry.id} from \"#{entry.destination_group}\" with: #{content}") @redis.add_to_stream(entry.fulfilled(content: content)) nil end |
#on_error(&block) ⇒ Object
Sets the on_error callback that is called if some part of the process raises an exception
41 42 43 44 |
# File 'lib/redis_ipc/stream.rb', line 41 def on_error(&block) @on_error = block self end |
#on_request(&block) ⇒ Object
Sets the on_request callback that is called when the group receives an inbound request entry
31 32 33 34 |
# File 'lib/redis_ipc/stream.rb', line 31 def on_request(&block) @on_request = block self end |
#reject_request(entry, content:) ⇒ Object
Used when responding to a request, this marks the entry as rejected and sends it back to the sending group
212 213 214 215 216 217 218 219 220 221 222 223 |
# File 'lib/redis_ipc/stream.rb', line 212 def reject_request(entry, content:) check_for_ledger! log( "Rejecting entry #{entry.id} from \"#{entry.destination_group}\" with: #{content}", severity: :warn ) @redis.add_to_stream(entry.rejected(content: content)) nil end |
#send_to_group(content:, to:) ⇒ RedisIPC::Response
Sends data (content) to a group on the Stream
184 185 186 187 188 |
# File 'lib/redis_ipc/stream.rb', line 184 def send_to_group(content:, to:) check_for_ledger! track_and_send(content, to) end |