Class: RedisIPC::Stream

Inherits:
Object
  • Object
show all
Defined in:
lib/redis_ipc/stream.rb,
lib/redis_ipc/stream/entry.rb,
lib/redis_ipc/stream/ledger.rb,
lib/redis_ipc/stream/commands.rb,
lib/redis_ipc/stream/consumer.rb,
lib/redis_ipc/stream/dispatcher.rb,
lib/redis_ipc/stream/ledger/consumer.rb

Defined Under Namespace

Classes: Commands, Consumer, Dispatcher, Entry, Ledger

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(stream, group) ⇒ Stream

Joins a Stream with the given name and creates the group with the given name

Parameters:

  • stream (String)

    The name of the Stream

  • group (String)

    The name of the group to connect as. This must be unique!



16
17
18
19
20
21
22
23
24
# File 'lib/redis_ipc/stream.rb', line 16

def initialize(stream, group)
  self.stream_name = stream.to_s
  self.group_name = group.to_s

  @on_request = nil
  @on_error = lambda do |exception|
    log("#{exception.class} - #{exception}\n#{exception.backtrace}", severity: :error)
  end
end

Instance Attribute Details

#redisObject (readonly)

Returns the value of attribute redis.



8
9
10
# File 'lib/redis_ipc/stream.rb', line 8

def redis
  @redis
end

Instance Method Details

#connect(redis_options: {}, **options) ⇒ RedisIPC::Stream

Creates the consumers/dispatchers and connects to the stream

Parameters:

  • redis_options (Hash) (defaults to: {})

    The connection options passed into Redis. See redis_rb

  • **options (Hash)

    Configuration options for Stream, Consumer, Dispatcher, and Ledger

  • options (Hash)

    a customizable set of options

  • ledger (Hash)

    a customizable set of options

  • consumer (Hash)

    a customizable set of options

Options Hash (**options):

  • :logger (Logger, NilClass)

    An optional logger instance to enable logging Default: nil

  • :pool_size (Integer)

    The size of the pool of Redis clients to make available for sending. The stream will automatically configure the required number of Redis clients for reading. Ignored if max_pool_size is set Default: 10

  • :max_pool_size (Integer, NilClass)

    When provided, this force sets the Redis client pool size to be max_pool_size. This disables the automatic configuration mentioned above Default: nil

  • :ledger (Hash)

    Configuration options for the ledger

  • :consumer (Hash)

    Configuration options for the Consumers

  • :dispatcher (Hash)

Returns:



95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
# File 'lib/redis_ipc/stream.rb', line 95

def connect(redis_options: {}, **options)
  check_for_valid_configuration!

  options = {
    logger: nil,
    pool_size: 10,
    max_pool_size: nil,
    ledger: Ledger::DEFAULTS,
    consumer: Ledger::Consumer::DEFAULTS,
    dispatcher: Dispatcher::DEFAULTS
  }.deep_merge(options)

  #
  # The redis pool is shared between the stream, all consumers, and all dispatchers.
  #
  # A consumer (Consumer, Dispatcher, Ledger::Consumer) should only ever use one Redis client at a time
  # since there are no blocking commands and tasks cannot stack. This means the bulk of the pool size
  # is dependent on how many threads are going to be sending entries at a single time.
  #
  # Since this code has no real life testing, this default is purely based on educated guesses.
  #
  max_pool_size = options[:max_pool_size] || (
    options[:pool_size] +
    (options.dig(:consumer, :pool_size) * 3) +
    (options.dig(:dispatcher, :pool_size) * 3)
  )

  @logger = options[:logger]
  @redis = Commands.new(
    stream_name, group_name,
    max_pool_size: max_pool_size,
    redis_options: redis_options,
    **options.slice(:logger, :reset)
  )

  @instance_id = @redis.instance_id

  # Make sure the group is there
  @redis.create_group
  @redis.prune_consumers

  @ledger = Ledger.new(options[:ledger])
  @consumers = create_consumers(options[:consumer])
  @dispatchers = create_dispatchers(options[:dispatcher])

  log("Connected")
  self
end

#connected?Boolean

Is the Stream connected or not?

Returns:

  • (Boolean)


149
150
151
# File 'lib/redis_ipc/stream.rb', line 149

def connected?
  !@consumers.nil? && !@dispatchers.nil?
end

#disconnectObject

Disconnects the Stream from Redis and stops processing entries



156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
# File 'lib/redis_ipc/stream.rb', line 156

def disconnect
  # #connect might've not been called...
  @dispatchers&.each(&:stop_listening)
  @consumers&.each(&:stop_listening)
  @redis&.shutdown

  @dispatchers = nil
  @consumers = nil
  @ledger = nil
  @redis = nil

  log("Disconnected")

  self
end

#fulfill_request(entry, content:) ⇒ Object

Used when responding to a request, this marks the entry as fulfilled and sends it back to the sending group

Parameters:



196
197
198
199
200
201
202
203
204
# File 'lib/redis_ipc/stream.rb', line 196

def fulfill_request(entry, content:)
  check_for_ledger!

  log("Fulfilling entry #{entry.id} from \"#{entry.destination_group}\" with: #{content}")

  @redis.add_to_stream(entry.fulfilled(content: content))

  nil
end

#on_error(&block) ⇒ Object

Sets the on_error callback that is called if some part of the process raises an exception

Parameters:

  • &block (Proc)

    This callback is provided a single argument which is the exception



41
42
43
44
# File 'lib/redis_ipc/stream.rb', line 41

def on_error(&block)
  @on_error = block
  self
end

#on_request(&block) ⇒ Object

Sets the on_request callback that is called when the group receives an inbound request entry

Parameters:

  • &block (Proc)

    This callback is provided a single argument which is the inbound request entry



31
32
33
34
# File 'lib/redis_ipc/stream.rb', line 31

def on_request(&block)
  @on_request = block
  self
end

#reject_request(entry, content:) ⇒ Object

Used when responding to a request, this marks the entry as rejected and sends it back to the sending group

Parameters:



212
213
214
215
216
217
218
219
220
221
222
223
# File 'lib/redis_ipc/stream.rb', line 212

def reject_request(entry, content:)
  check_for_ledger!

  log(
    "Rejecting entry #{entry.id} from \"#{entry.destination_group}\" with: #{content}",
    severity: :warn
  )

  @redis.add_to_stream(entry.rejected(content: content))

  nil
end

#send_to_group(content:, to:) ⇒ RedisIPC::Response

Sends data (content) to a group on the Stream

Parameters:

  • content (Object)

    The data to send. Must be JSON compatible

  • to (String)

    The group to send the content to

Returns:

  • (RedisIPC::Response)

    The result of the data Use #fulfilled? to check if the request was a success If fulfilled, use #value to get the value Use #rejected? to check if the request was rejected or raised an exception If rejected, use #reason to get the reason. This could be anything, but likely a String or an error instance



184
185
186
187
188
# File 'lib/redis_ipc/stream.rb', line 184

def send_to_group(content:, to:)
  check_for_ledger!

  track_and_send(content, to)
end