Class: RedisIPC::Stream::Commands
- Inherits:
-
Object
- Object
- RedisIPC::Stream::Commands
- Defined in:
- lib/redis_ipc/stream/commands.rb
Defined Under Namespace
Classes: ConsumerProxy
Constant Summary collapse
- REDIS_DEFAULTS =
{ host: ENV.fetch("REDIS_HOST", "localhost"), port: ENV.fetch("REDIS_PORT", 6379) }.freeze
- READ_FROM_PEL =
"0"- READ_FROM_STREAM =
">"
Instance Attribute Summary collapse
-
#group_name ⇒ Object
readonly
Returns the value of attribute group_name.
-
#instance_id ⇒ Object
readonly
Returns the value of attribute instance_id.
-
#logger ⇒ Object
readonly
Returns the value of attribute logger.
-
#redis_pool ⇒ Object
readonly
Returns the value of attribute redis_pool.
-
#stream_name ⇒ Object
readonly
Returns the value of attribute stream_name.
Instance Method Summary collapse
-
#acknowledge_entry(entry) ⇒ Object
Acknowledges the entry in the PEL.
-
#add_to_stream(entry) ⇒ RedisIPC::Stream::Entry
Adds an entry to the Stream.
-
#available_consumer_names(instance_id = @instance_id) ⇒ Array<String>
Returns all available consumer names for a group.
-
#claim_entry(consumer, entry) ⇒ Object
Claims an entry for a given consumer, adding it to their PEL.
-
#clear_available_consumers ⇒ Object
Clears the array of available consumers for this group.
- #consumer_available?(consumer) ⇒ Boolean
-
#consumer_info(filter_for: nil) ⇒ Object
Gets information about the stream's consumers for a given group.
-
#create_consumer(consumer) ⇒ Object
Creates a consumer in the stream.
-
#create_group ⇒ Object
Checks if the Stream group has been created and creates it if it hasn't.
-
#delete_consumer(consumer) ⇒ Object
Deletes a consumer from the stream.
-
#delete_entry(entry) ⇒ Object
Removes the entry from the stream.
-
#delete_stream ⇒ Object
Deletes the stream from Redis.
-
#destroy_group ⇒ Object
Removes the group from the stream.
-
#entries_size ⇒ Integer
Returns the number of entries in a stream.
-
#initialize(stream_name, group_name, max_pool_size: 10, logger: nil, redis_options: {}) ⇒ Commands
constructor
A centralized location that holds all of the various Redis commands needed to interact with a stream.
-
#make_consumer_available(consumer) ⇒ Object
Makes the consumer available for receiving entries by adding it to the available consumers list.
-
#make_consumer_unavailable(consumer) ⇒ Object
Makes the consumer unavailable for receiving entries by removing it from the available consumers list.
-
#next_pending_entry(consumer) ⇒ RedisIPC::Stream::Entry
Wrapper for #read_from_stream that returns an entry from the consumers PEL.
-
#next_reclaimed_entry(consumer, min_idle_time: 10.seconds) ⇒ Object
Reclaims any entries that have been idle more than min_idle_time.
-
#next_unread_entry(consumer) ⇒ RedisIPC::Stream::Entry
Wrapper for #read_from_stream that returns an unread entry.
-
#prune_consumers ⇒ Object
Removes all consumers for this group that have been idle for longer than 30 seconds.
-
#read_from_stream(consumer, read_id, block: 500) ⇒ RedisIPC::Stream::Entry
Reads an entry into the group using XREADGROUP.
-
#shutdown ⇒ Object
Gracefully shutdown the redis pool.
Constructor Details
#initialize(stream_name, group_name, max_pool_size: 10, logger: nil, redis_options: {}) ⇒ Commands
A centralized location that holds all of the various Redis commands needed to interact with a stream
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
# File 'lib/redis_ipc/stream/commands.rb', line 31 def initialize(stream_name, group_name, max_pool_size: 10, logger: nil, redis_options: {}) # A unique ID to the stream instance. This allows the same stream group be created across multiple # instances, or processes, without having inbound responses being dispatched to an instance that # isn't currently tracking that request/response @instance_id = SecureRandom.uuid.delete("-")[0..5] @stream_name = stream_name @group_name = group_name raise ArgumentError, "Stream name cannot be blank" if stream_name.blank? raise ArgumentError, "Group name cannot be blank" if group_name.blank? @logger = logger = REDIS_DEFAULTS.merge() @redis_pool = ConnectionPool.new(size: max_pool_size) { Redis.new(**) } log("Created pool with a size of #{max_pool_size}") end |
Instance Attribute Details
#group_name ⇒ Object (readonly)
Returns the value of attribute group_name.
20 21 22 |
# File 'lib/redis_ipc/stream/commands.rb', line 20 def group_name @group_name end |
#instance_id ⇒ Object (readonly)
Returns the value of attribute instance_id.
20 21 22 |
# File 'lib/redis_ipc/stream/commands.rb', line 20 def instance_id @instance_id end |
#logger ⇒ Object (readonly)
Returns the value of attribute logger.
20 21 22 |
# File 'lib/redis_ipc/stream/commands.rb', line 20 def logger @logger end |
#redis_pool ⇒ Object (readonly)
Returns the value of attribute redis_pool.
20 21 22 |
# File 'lib/redis_ipc/stream/commands.rb', line 20 def redis_pool @redis_pool end |
#stream_name ⇒ Object (readonly)
Returns the value of attribute stream_name.
20 21 22 |
# File 'lib/redis_ipc/stream/commands.rb', line 20 def stream_name @stream_name end |
Instance Method Details
#acknowledge_entry(entry) ⇒ Object
Acknowledges the entry in the PEL
83 84 85 86 87 88 89 |
# File 'lib/redis_ipc/stream/commands.rb', line 83 def acknowledge_entry(entry) redis_pool.with do |redis| suppress(Redis::CommandError) do redis.xack(stream_name, group_name, entry.redis_id) end end end |
#add_to_stream(entry) ⇒ RedisIPC::Stream::Entry
Adds an entry to the Stream
73 74 75 76 |
# File 'lib/redis_ipc/stream/commands.rb', line 73 def add_to_stream(entry) redis_id = redis_pool.with { |redis| redis.xadd(stream_name, entry.to_h) } entry.with(redis_id: redis_id) end |
#available_consumer_names(instance_id = @instance_id) ⇒ Array<String>
Returns all available consumer names for a group. Each consumer in the list is added when they start listening and is removed when they stop listening
291 292 293 294 295 296 297 |
# File 'lib/redis_ipc/stream/commands.rb', line 291 def available_consumer_names(instance_id = @instance_id) redis_pool.with do |redis| # 0 is start index # -1 is end index (like array) redis.lrange(available_consumers_key(instance_id), 0, -1) end end |
#claim_entry(consumer, entry) ⇒ Object
Claims an entry for a given consumer, adding it to their PEL
245 246 247 248 249 250 251 252 253 254 |
# File 'lib/redis_ipc/stream/commands.rb', line 245 def claim_entry(consumer, entry) result = redis_pool.with do |redis| # 0 is minimum idle time redis.xclaim(stream_name, group_name, consumer.name, 0, entry.redis_id)&.first end return if result.blank? Entry.from_redis(*result) end |
#clear_available_consumers ⇒ Object
Clears the array of available consumers for this group
278 279 280 |
# File 'lib/redis_ipc/stream/commands.rb', line 278 def clear_available_consumers redis_pool.with { |redis| redis.del(available_consumers_key) } end |
#consumer_available?(consumer) ⇒ Boolean
299 300 301 302 303 304 305 306 |
# File 'lib/redis_ipc/stream/commands.rb', line 299 def consumer_available?(consumer) result = redis_pool.with do |redis| # redis-rb does not have internal support for lpos. However, they do delegate missing methods redis.lpos(available_consumers_key, consumer.name, "RANK", 1) end !result.nil? end |
#consumer_info(filter_for: nil) ⇒ Object
Gets information about the stream's consumers for a given group
261 262 263 264 265 266 267 268 269 270 271 272 273 |
# File 'lib/redis_ipc/stream/commands.rb', line 261 def consumer_info(filter_for: nil) result = redis_pool.with do |redis| redis.xinfo(:consumers, stream_name, group_name) end result = result.map { |r| ConsumerProxy.new(**r.symbolize_keys) } if filter_for.is_a?(Array) result.select! { |consumer| filter_for.include?(consumer.name) } end result.index_by(&:name) end |
#create_consumer(consumer) ⇒ Object
Creates a consumer in the stream
142 143 144 145 146 |
# File 'lib/redis_ipc/stream/commands.rb', line 142 def create_consumer(consumer) redis_pool.with do |redis| redis.xgroup(:createconsumer, stream_name, group_name, consumer.name) end end |
#create_group ⇒ Object
Checks if the Stream group has been created and creates it if it hasn't
107 108 109 110 111 112 113 |
# File 'lib/redis_ipc/stream/commands.rb', line 107 def create_group redis_pool.with do |redis| suppress(Redis::CommandError) do redis.xgroup(:create, stream_name, group_name, "$", mkstream: true) end end end |
#delete_consumer(consumer) ⇒ Object
Deletes a consumer from the stream
153 154 155 156 157 |
# File 'lib/redis_ipc/stream/commands.rb', line 153 def delete_consumer(consumer) redis_pool.with do |redis| redis.xgroup(:delconsumer, stream_name, group_name, consumer.name) end end |
#delete_entry(entry) ⇒ Object
Removes the entry from the stream
96 97 98 99 100 101 102 |
# File 'lib/redis_ipc/stream/commands.rb', line 96 def delete_entry(entry) redis_pool.with do |redis| suppress(Redis::CommandError) do redis.xdel(stream_name, entry.redis_id) end end end |
#delete_stream ⇒ Object
Deletes the stream from Redis
131 132 133 134 135 |
# File 'lib/redis_ipc/stream/commands.rb', line 131 def delete_stream redis_pool.with do |redis| redis.del(stream_name) end end |
#destroy_group ⇒ Object
Removes the group from the stream
118 119 120 121 122 123 124 125 126 |
# File 'lib/redis_ipc/stream/commands.rb', line 118 def destroy_group redis_pool.with do |redis| suppress(Redis::CommandError) do redis.xgroup(:destroy, stream_name, group_name) end redis.del(available_consumers_key) end end |
#entries_size ⇒ Integer
Returns the number of entries in a stream
62 63 64 |
# File 'lib/redis_ipc/stream/commands.rb', line 62 def entries_size redis_pool.with { |redis| redis.xlen(stream_name) } end |
#make_consumer_available(consumer) ⇒ Object
Makes the consumer available for receiving entries by adding it to the available consumers list
313 314 315 316 317 318 319 320 321 |
# File 'lib/redis_ipc/stream/commands.rb', line 313 def make_consumer_available(consumer) return if consumer_available?(consumer) redis_pool.with do |redis| redis.lpush(available_consumers_key, consumer.name) end true end |
#make_consumer_unavailable(consumer) ⇒ Object
Makes the consumer unavailable for receiving entries by removing it from the available consumers list
328 329 330 331 332 333 334 335 336 337 |
# File 'lib/redis_ipc/stream/commands.rb', line 328 def make_consumer_unavailable(consumer) return unless consumer_available?(consumer) redis_pool.with do |redis| # 0 is remove all redis.lrem(available_consumers_key, 0, consumer.name) end true end |
#next_pending_entry(consumer) ⇒ RedisIPC::Stream::Entry
Wrapper for #read_from_stream that returns an entry from the consumers PEL
211 212 213 |
# File 'lib/redis_ipc/stream/commands.rb', line 211 def next_pending_entry(consumer, **) read_from_stream(consumer, READ_FROM_PEL, **) end |
#next_reclaimed_entry(consumer, min_idle_time: 10.seconds) ⇒ Object
Reclaims any entries that have been idle more than min_idle_time
222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 |
# File 'lib/redis_ipc/stream/commands.rb', line 222 def next_reclaimed_entry(consumer, min_idle_time: 10.seconds) result = redis_pool.with do |redis| # "0-0" is a special ID, means at the start redis.xautoclaim( stream_name, group_name, consumer.name, min_idle_time.in_milliseconds.to_i, "0-0", count: 1 )["entries"].first end return if result.blank? Entry.from_redis(*result) end |
#next_unread_entry(consumer) ⇒ RedisIPC::Stream::Entry
Wrapper for #read_from_stream that returns an unread entry
200 201 202 |
# File 'lib/redis_ipc/stream/commands.rb', line 200 def next_unread_entry(consumer, **) read_from_stream(consumer, READ_FROM_STREAM, **) end |
#prune_consumers ⇒ Object
Removes all consumers for this group that have been idle for longer than 30 seconds
162 163 164 165 |
# File 'lib/redis_ipc/stream/commands.rb', line 162 def prune_consumers consumer_info.values.select { |consumer| consumer.idle > 30.seconds.in_milliseconds.to_i } .each { |consumer| delete_consumer(consumer) } end |
#read_from_stream(consumer, read_id, block: 500) ⇒ RedisIPC::Stream::Entry
Reads an entry into the group using XREADGROUP
176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 |
# File 'lib/redis_ipc/stream/commands.rb', line 176 def read_from_stream(consumer, read_id, block: 500) set_expiry(available_consumers_key, ttl: 0.1) set_expiry(stream_name, ttl: 7.days) result = redis_pool.with do |redis| redis.xreadgroup( group_name, consumer.name, stream_name, read_id, count: 1, block: block )&.values&.flatten end return if result.blank? Entry.from_redis(*result) end |
#shutdown ⇒ Object
Gracefully shutdown the redis pool
53 54 55 |
# File 'lib/redis_ipc/stream/commands.rb', line 53 def shutdown redis_pool.shutdown(&:close) end |