Class: RedisIPC::Stream::Ledger
- Inherits:
-
Concurrent::Map
- Object
- Concurrent::Map
- RedisIPC::Stream::Ledger
- Defined in:
- lib/redis_ipc/stream/ledger.rb,
lib/redis_ipc/stream/ledger/consumer.rb
Defined Under Namespace
Constant Summary collapse
- DEFAULTS =
{ entry_timeout: 5, # Seconds cleanup_interval: 1 # Seconds }.freeze
Instance Attribute Summary collapse
-
#options ⇒ Object
readonly
Returns the value of attribute options.
Instance Method Summary collapse
-
#delete_entry(entry) ⇒ Object
Removes an entry from the ledger.
-
#entry?(entry) ⇒ Boolean
Returns true if an entry is in the ledger.
-
#fetch_entry(entry) ⇒ Stream::Ledger::Entry?
Fetches a ledger entry by the stream entry's ID.
-
#initialize(options = {}) ⇒ Ledger
constructor
A new instance of Ledger.
-
#store_entry(entry) ⇒ Concurrent::MVar
Stores an entry in the ledger.
Constructor Details
#initialize(options = {}) ⇒ Ledger
Returns a new instance of Ledger.
23 24 25 26 27 28 29 30 31 32 |
# File 'lib/redis_ipc/stream/ledger.rb', line 23 def initialize( = {}) super() @options = DEFAULTS.merge() @timeout_in_seconds = @options[:entry_timeout].seconds @cleanup_task = Concurrent::TimerTask.execute(execution_interval: @options[:cleanup_interval]) do each { |id, ledger_entry| delete(id) if ledger_entry.expired? } end end |
Instance Attribute Details
#options ⇒ Object (readonly)
Returns the value of attribute options.
21 22 23 |
# File 'lib/redis_ipc/stream/ledger.rb', line 21 def @options end |
Instance Method Details
#delete_entry(entry) ⇒ Object
Removes an entry from the ledger
73 74 75 |
# File 'lib/redis_ipc/stream/ledger.rb', line 73 def delete_entry(entry) delete(entry.id) end |
#entry?(entry) ⇒ Boolean
Returns true if an entry is in the ledger
50 51 52 |
# File 'lib/redis_ipc/stream/ledger.rb', line 50 def entry?(entry) !!fetch_entry(entry) end |
#fetch_entry(entry) ⇒ Stream::Ledger::Entry?
Fetches a ledger entry by the stream entry's ID
41 42 43 |
# File 'lib/redis_ipc/stream/ledger.rb', line 41 def fetch_entry(entry) self[entry.id] end |
#store_entry(entry) ⇒ Concurrent::MVar
Stores an entry in the ledger
61 62 63 64 65 66 |
# File 'lib/redis_ipc/stream/ledger.rb', line 61 def store_entry(entry) raise ArgumentError, "#{entry.id} is already in the ledger" if entry?(entry) self[entry.id] = Ledger::Entry.new(expires_at: @timeout_in_seconds.from_now) self[entry.id].mailbox end |