Class: RedisIPC::Stream::Ledger

Inherits:
Concurrent::Map
  • Object
show all
Defined in:
lib/redis_ipc/stream/ledger.rb,
lib/redis_ipc/stream/ledger/consumer.rb

Defined Under Namespace

Classes: Consumer, Entry

Constant Summary collapse

DEFAULTS =
{
  entry_timeout: 5, # Seconds
  cleanup_interval: 1 # Seconds
}.freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ Ledger

Returns a new instance of Ledger.



23
24
25
26
27
28
29
30
31
32
# File 'lib/redis_ipc/stream/ledger.rb', line 23

def initialize(options = {})
  super()

  @options = DEFAULTS.merge(options)
  @timeout_in_seconds = @options[:entry_timeout].seconds

  @cleanup_task = Concurrent::TimerTask.execute(execution_interval: @options[:cleanup_interval]) do
    each { |id, ledger_entry| delete(id) if ledger_entry.expired? }
  end
end

Instance Attribute Details

#optionsObject (readonly)

Returns the value of attribute options.



21
22
23
# File 'lib/redis_ipc/stream/ledger.rb', line 21

def options
  @options
end

Instance Method Details

#delete_entry(entry) ⇒ Object

Removes an entry from the ledger

Parameters:



73
74
75
# File 'lib/redis_ipc/stream/ledger.rb', line 73

def delete_entry(entry)
  delete(entry.id)
end

#entry?(entry) ⇒ Boolean

Returns true if an entry is in the ledger

Parameters:

Returns:

  • (Boolean)


50
51
52
# File 'lib/redis_ipc/stream/ledger.rb', line 50

def entry?(entry)
  !!fetch_entry(entry)
end

#fetch_entry(entry) ⇒ Stream::Ledger::Entry?

Fetches a ledger entry by the stream entry's ID

Parameters:

Returns:



41
42
43
# File 'lib/redis_ipc/stream/ledger.rb', line 41

def fetch_entry(entry)
  self[entry.id]
end

#store_entry(entry) ⇒ Concurrent::MVar

Stores an entry in the ledger

Parameters:

Returns:

  • (Concurrent::MVar)

    The associated mailbox that may or may not contain the response

Raises:

  • (ArgumentError)


61
62
63
64
65
66
# File 'lib/redis_ipc/stream/ledger.rb', line 61

def store_entry(entry)
  raise ArgumentError, "#{entry.id} is already in the ledger" if entry?(entry)

  self[entry.id] = Ledger::Entry.new(expires_at: @timeout_in_seconds.from_now)
  self[entry.id].mailbox
end