Class: RedisIPC::Stream::Commands

Inherits:
Object
  • Object
show all
Defined in:
lib/redis_ipc/stream/commands.rb

Defined Under Namespace

Classes: ConsumerProxy

Constant Summary collapse

REDIS_DEFAULTS =
{
  host: ENV.fetch("REDIS_HOST", "localhost"),
  port: ENV.fetch("REDIS_PORT", 6379)
}.freeze
READ_FROM_PEL =
"0"
READ_FROM_STREAM =
">"

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(stream_name, group_name, max_pool_size: 10, logger: nil, redis_options: {}) ⇒ Commands

A centralized location that holds all of the various Redis commands needed to interact with a stream

Parameters:

  • stream_name (String)

    The name of the Stream

  • group_name (String)

    The group name to use within the Stream

  • max_pool_size (Integer) (defaults to: 10)

    The maximum number of Redis connections

  • logger (nil, Logger) (defaults to: nil)

    A logger instance. If provided, logs will be appended on commands

  • redis_options (Hash) (defaults to: {})

    The connection options passed into the Redis client

Raises:

  • (ArgumentError)


31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/redis_ipc/stream/commands.rb', line 31

def initialize(stream_name, group_name, max_pool_size: 10, logger: nil, redis_options: {})
  # A unique ID to the stream instance. This allows the same stream group be created across multiple
  # instances, or processes, without having inbound responses being dispatched to an instance that
  # isn't currently tracking that request/response
  @instance_id = SecureRandom.uuid.delete("-")[0..5]

  @stream_name = stream_name
  @group_name = group_name

  raise ArgumentError, "Stream name cannot be blank" if stream_name.blank?
  raise ArgumentError, "Group name cannot be blank" if group_name.blank?

  @logger = logger

  redis_options = REDIS_DEFAULTS.merge(redis_options)
  @redis_pool = ConnectionPool.new(size: max_pool_size) { Redis.new(**redis_options) }
  log("Created pool with a size of #{max_pool_size}")
end

Instance Attribute Details

#group_nameObject (readonly)

Returns the value of attribute group_name.



20
21
22
# File 'lib/redis_ipc/stream/commands.rb', line 20

def group_name
  @group_name
end

#instance_idObject (readonly)

Returns the value of attribute instance_id.



20
21
22
# File 'lib/redis_ipc/stream/commands.rb', line 20

def instance_id
  @instance_id
end

#loggerObject (readonly)

Returns the value of attribute logger.



20
21
22
# File 'lib/redis_ipc/stream/commands.rb', line 20

def logger
  @logger
end

#redis_poolObject (readonly)

Returns the value of attribute redis_pool.



20
21
22
# File 'lib/redis_ipc/stream/commands.rb', line 20

def redis_pool
  @redis_pool
end

#stream_nameObject (readonly)

Returns the value of attribute stream_name.



20
21
22
# File 'lib/redis_ipc/stream/commands.rb', line 20

def stream_name
  @stream_name
end

Instance Method Details

#acknowledge_entry(entry) ⇒ Object

Acknowledges the entry in the PEL

Parameters:



83
84
85
86
87
88
89
# File 'lib/redis_ipc/stream/commands.rb', line 83

def acknowledge_entry(entry)
  redis_pool.with do |redis|
    suppress(Redis::CommandError) do
      redis.xack(stream_name, group_name, entry.redis_id)
    end
  end
end

#add_to_stream(entry) ⇒ RedisIPC::Stream::Entry

Adds an entry to the Stream

Parameters:

Returns:



73
74
75
76
# File 'lib/redis_ipc/stream/commands.rb', line 73

def add_to_stream(entry)
  redis_id = redis_pool.with { |redis| redis.xadd(stream_name, entry.to_h) }
  entry.with(redis_id: redis_id)
end

#available_consumer_names(instance_id = @instance_id) ⇒ Array<String>

Returns all available consumer names for a group. Each consumer in the list is added when they start listening and is removed when they stop listening

Parameters:

  • for_group_name (String)

    The name of group that the consumers belong to

Returns:

  • (Array<String>)


291
292
293
294
295
296
297
# File 'lib/redis_ipc/stream/commands.rb', line 291

def available_consumer_names(instance_id = @instance_id)
  redis_pool.with do |redis|
    # 0 is start index
    # -1 is end index (like array)
    redis.lrange(available_consumers_key(instance_id), 0, -1)
  end
end

#claim_entry(consumer, entry) ⇒ Object

Claims an entry for a given consumer, adding it to their PEL

Parameters:



245
246
247
248
249
250
251
252
253
254
# File 'lib/redis_ipc/stream/commands.rb', line 245

def claim_entry(consumer, entry)
  result = redis_pool.with do |redis|
    # 0 is minimum idle time
    redis.xclaim(stream_name, group_name, consumer.name, 0, entry.redis_id)&.first
  end

  return if result.blank?

  Entry.from_redis(*result)
end

#clear_available_consumersObject

Clears the array of available consumers for this group



278
279
280
# File 'lib/redis_ipc/stream/commands.rb', line 278

def clear_available_consumers
  redis_pool.with { |redis| redis.del(available_consumers_key) }
end

#consumer_available?(consumer) ⇒ Boolean

Returns:

  • (Boolean)


299
300
301
302
303
304
305
306
# File 'lib/redis_ipc/stream/commands.rb', line 299

def consumer_available?(consumer)
  result = redis_pool.with do |redis|
    # redis-rb does not have internal support for lpos. However, they do delegate missing methods
    redis.lpos(available_consumers_key, consumer.name, "RANK", 1)
  end

  !result.nil?
end

#consumer_info(filter_for: nil) ⇒ Object

Gets information about the stream's consumers for a given group

Parameters:

  • for_group_name (String)

    The group the consumers belong to



261
262
263
264
265
266
267
268
269
270
271
272
273
# File 'lib/redis_ipc/stream/commands.rb', line 261

def consumer_info(filter_for: nil)
  result = redis_pool.with do |redis|
    redis.xinfo(:consumers, stream_name, group_name)
  end

  result = result.map { |r| ConsumerProxy.new(**r.symbolize_keys) }

  if filter_for.is_a?(Array)
    result.select! { |consumer| filter_for.include?(consumer.name) }
  end

  result.index_by(&:name)
end

#create_consumer(consumer) ⇒ Object

Creates a consumer in the stream

Parameters:



142
143
144
145
146
# File 'lib/redis_ipc/stream/commands.rb', line 142

def create_consumer(consumer)
  redis_pool.with do |redis|
    redis.xgroup(:createconsumer, stream_name, group_name, consumer.name)
  end
end

#create_groupObject

Checks if the Stream group has been created and creates it if it hasn't



107
108
109
110
111
112
113
# File 'lib/redis_ipc/stream/commands.rb', line 107

def create_group
  redis_pool.with do |redis|
    suppress(Redis::CommandError) do
      redis.xgroup(:create, stream_name, group_name, "$", mkstream: true)
    end
  end
end

#delete_consumer(consumer) ⇒ Object

Deletes a consumer from the stream

Parameters:



153
154
155
156
157
# File 'lib/redis_ipc/stream/commands.rb', line 153

def delete_consumer(consumer)
  redis_pool.with do |redis|
    redis.xgroup(:delconsumer, stream_name, group_name, consumer.name)
  end
end

#delete_entry(entry) ⇒ Object

Removes the entry from the stream

Parameters:



96
97
98
99
100
101
102
# File 'lib/redis_ipc/stream/commands.rb', line 96

def delete_entry(entry)
  redis_pool.with do |redis|
    suppress(Redis::CommandError) do
      redis.xdel(stream_name, entry.redis_id)
    end
  end
end

#delete_streamObject

Deletes the stream from Redis



131
132
133
134
135
# File 'lib/redis_ipc/stream/commands.rb', line 131

def delete_stream
  redis_pool.with do |redis|
    redis.del(stream_name)
  end
end

#destroy_groupObject

Removes the group from the stream



118
119
120
121
122
123
124
125
126
# File 'lib/redis_ipc/stream/commands.rb', line 118

def destroy_group
  redis_pool.with do |redis|
    suppress(Redis::CommandError) do
      redis.xgroup(:destroy, stream_name, group_name)
    end

    redis.del(available_consumers_key)
  end
end

#entries_sizeInteger

Returns the number of entries in a stream

Returns:

  • (Integer)


62
63
64
# File 'lib/redis_ipc/stream/commands.rb', line 62

def entries_size
  redis_pool.with { |redis| redis.xlen(stream_name) }
end

#make_consumer_available(consumer) ⇒ Object

Makes the consumer available for receiving entries by adding it to the available consumers list

Parameters:

  • consumer_name (String)

    The name of the consumer



313
314
315
316
317
318
319
320
321
# File 'lib/redis_ipc/stream/commands.rb', line 313

def make_consumer_available(consumer)
  return if consumer_available?(consumer)

  redis_pool.with do |redis|
    redis.lpush(available_consumers_key, consumer.name)
  end

  true
end

#make_consumer_unavailable(consumer) ⇒ Object

Makes the consumer unavailable for receiving entries by removing it from the available consumers list

Parameters:

  • consumer_name (String)

    The name of the consumer



328
329
330
331
332
333
334
335
336
337
# File 'lib/redis_ipc/stream/commands.rb', line 328

def make_consumer_unavailable(consumer)
  return unless consumer_available?(consumer)

  redis_pool.with do |redis|
    # 0 is remove all
    redis.lrem(available_consumers_key, 0, consumer.name)
  end

  true
end

#next_pending_entry(consumer) ⇒ RedisIPC::Stream::Entry

Wrapper for #read_from_stream that returns an entry from the consumers PEL

Parameters:

Returns:



211
212
213
# File 'lib/redis_ipc/stream/commands.rb', line 211

def next_pending_entry(consumer, **)
  read_from_stream(consumer, READ_FROM_PEL, **)
end

#next_reclaimed_entry(consumer, min_idle_time: 10.seconds) ⇒ Object

Reclaims any entries that have been idle more than min_idle_time

Parameters:

  • consumer (RedisIPC::Stream::Consumer)

    The consumer processing this request to claim the entry

  • min_idle_time (Integer) (defaults to: 10.seconds)

    The number of seconds the entry has been idle

  • count (Integer)

    The number of entries to read



222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
# File 'lib/redis_ipc/stream/commands.rb', line 222

def next_reclaimed_entry(consumer, min_idle_time: 10.seconds)
  result = redis_pool.with do |redis|
    # "0-0" is a special ID, means at the start
    redis.xautoclaim(
      stream_name, group_name,
      consumer.name,
      min_idle_time.in_milliseconds.to_i,
      "0-0",
      count: 1
    )["entries"].first
  end

  return if result.blank?

  Entry.from_redis(*result)
end

#next_unread_entry(consumer) ⇒ RedisIPC::Stream::Entry

Wrapper for #read_from_stream that returns an unread entry

Parameters:

Returns:



200
201
202
# File 'lib/redis_ipc/stream/commands.rb', line 200

def next_unread_entry(consumer, **)
  read_from_stream(consumer, READ_FROM_STREAM, **)
end

#prune_consumersObject

Removes all consumers for this group that have been idle for longer than 30 seconds



162
163
164
165
# File 'lib/redis_ipc/stream/commands.rb', line 162

def prune_consumers
  consumer_info.values.select { |consumer| consumer.idle > 30.seconds.in_milliseconds.to_i }
    .each { |consumer| delete_consumer(consumer) }
end

#read_from_stream(consumer, read_id, block: 500) ⇒ RedisIPC::Stream::Entry

Reads an entry into the group using XREADGROUP

Parameters:

  • consumer_name (String)

    The consumer reading the entry

  • read_id (String)

    The ID or special ID to start reading from

  • count (Integer)

    The number of entries to read back

Returns:



176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
# File 'lib/redis_ipc/stream/commands.rb', line 176

def read_from_stream(consumer, read_id, block: 500)
  set_expiry(available_consumers_key, ttl: 0.1)
  set_expiry(stream_name, ttl: 7.days)

  result = redis_pool.with do |redis|
    redis.xreadgroup(
      group_name, consumer.name, stream_name, read_id,
      count: 1,
      block: block
    )&.values&.flatten
  end

  return if result.blank?

  Entry.from_redis(*result)
end

#shutdownObject

Gracefully shutdown the redis pool



53
54
55
# File 'lib/redis_ipc/stream/commands.rb', line 53

def shutdown
  redis_pool.shutdown(&:close)
end