Class: RedisIPC::Stream::Consumer

Inherits:
Object
  • Object
show all
Defined in:
lib/redis_ipc/stream/consumer.rb

Direct Known Subclasses

Dispatcher, Ledger::Consumer

Constant Summary collapse

DEFAULTS =
{
  # The number of Consumers to create to process entries for their group.
  # Any consumers created for a group will only process entries for their group
  pool_size: 20,

  # How often does the consumer check for new entries
  execution_interval: 0.001 # 1ms
}.freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(name, redis:, options: {}) ⇒ Consumer

Creates a new Consumer for the given stream. This class is configured to read from its own Pending Entries List by default. This means that this class cannot read entries without them being claimed by this consumer. See Dispatcher

Parameters:

  • name (String)

    The unique name for this consumer to be used in Redis

  • redis (RedisIPC::Stream::Commands)

    The redis commands instance used by Stream

  • options (Hash) (defaults to: {})

    Configuration values for the Consumer. See Consumer::DEFAULTS

Raises:

  • (ArgumentError)


29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# File 'lib/redis_ipc/stream/consumer.rb', line 29

def initialize(name, redis:, options: {})
  @name = name.freeze
  raise ArgumentError, "Consumer was created without a name" if @name.blank?

  @redis = redis
  @instance_id = redis.instance_id

  @redis.create_consumer(self)
  @options = DEFAULTS.merge(options).freeze
  @logger = @redis.logger

  # This is the workhorse for the consumer
  @task = Concurrent::TimerTask.new(
    execution_interval: @options[:execution_interval],
    freeze_on_deref: true
  ) { check_for_entries }
end

Instance Attribute Details

#nameObject (readonly)

Returns the value of attribute name.



15
16
17
# File 'lib/redis_ipc/stream/consumer.rb', line 15

def name
  @name
end

#redisObject (readonly)

Returns the value of attribute redis.



15
16
17
# File 'lib/redis_ipc/stream/consumer.rb', line 15

def redis
  @redis
end

Instance Method Details

#add_callback(callback_type, observer = nil, function = :update, &block) ⇒ Object

A wrapper for #add_observer that simplifies processing entries by removing the need to have code on every observer to handle nil entries or exceptions If manual exception handling is needed, use #add_observer. Just remember that the result can be nil

Parameters:

  • callback_type (Symbol)

    The type of callback to register

  • observer (Object) (defaults to: nil)

    If a block is not provided, this object will have function called on it

  • function (Symbol) (defaults to: :update)

    If a block is not provided, this is the method that will be called on the observer

  • &block (Proc)

    If provided this code will be called as the callback

Returns:

  • (Object)

    The registered observer



59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/redis_ipc/stream/consumer.rb', line 59

def add_callback(callback_type, observer = nil, function = :update, &block)
  handler = lambda do |data|
    if block
      yield(data)
    else
      observer.send(function, data)
    end
  end

  case callback_type
  # Ignores exceptions and only calls when it's a success
  when :on_message
    add_observer do |_, entry, exception|
      next if entry.nil? || exception

      handler.call(entry)
    end
  # Ignores successful entries and only calls on exceptions
  when :on_error
    add_observer do |_, _e, exception|
      next unless exception

      handler.call(exception)
    end
  else
    raise ArgumentError, "Invalid callback type #{callback_type} provided. Expected :on_message, or :on_error"
  end
end

#check_for_entriesObject

Note:

This is default functionality. This is expected to be overwritten by other classes

The method that is called by the consumer's task.

See Also:

  • Ledger::Consumer


136
137
138
139
140
141
142
143
# File 'lib/redis_ipc/stream/consumer.rb', line 136

def check_for_entries
  entry = read_from_stream
  return if entry.nil? || invalid_entry?(entry)

  entry
ensure
  acknowledge_and_remove(entry) unless entry.nil?
end

#inspectString

Returns the inspected object

Returns:

  • (String)


126
127
128
# File 'lib/redis_ipc/stream/consumer.rb', line 126

def inspect
  "#<#{self.class}:0x#{object_id} name=\"#{name}\" stream_name=\"#{stream_name}\" group_name=\"#{group_name}\" listening=#{listening?}>"
end

#listenObject

Starts checking the stream for new entries



100
101
102
103
104
105
106
107
108
# File 'lib/redis_ipc/stream/consumer.rb', line 100

def listen
  return if listening?

  @task.execute
  change_availability

  log("Ready", severity: :debug)
  @task
end

#listening?Boolean

Returns true if the consumer is listening for entries

Returns:

  • (Boolean)


93
94
95
# File 'lib/redis_ipc/stream/consumer.rb', line 93

def listening?
  @task.running?
end

#stop_listeningObject

Stops checking the stream for new entries



113
114
115
116
117
118
119
# File 'lib/redis_ipc/stream/consumer.rb', line 113

def stop_listening
  @task.shutdown
  change_availability

  log("Stopped", severity: :debug)
  true
end