Class: RedisIPC::Stream::Entry

Inherits:
Object
  • Object
show all
Defined in:
lib/redis_ipc/stream/entry.rb

Overview

Represents an entry in the Redis Stream

Constant Summary collapse

VALID_STATUS =
[
  STATUS_PENDING = "pending",
  STATUS_FULFILLED = "fulfilled",
  STATUS_REJECTED = "rejected"
].freeze

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(id: nil, redis_id: nil, status: nil) ⇒ Entry

Returns a new instance of Entry.

Parameters:

  • id (NilClass, String) (defaults to: nil)

    The ID for this entry. Note, this is not the Redis stream ID (that's redis_id)

  • redis_id (nil, String) (defaults to: nil)

    The ID for the entry in the stream. This is generated by Redis

  • status (String) (defaults to: nil)

    The status of the entry. See VALID_STATUS

  • content (Object)

    The data to sent in the entry

  • source_group (String)

    The name of the group that sent the entry. Used for responding back to entries

  • destination_group (String)

    The group that will receive this entry

Raises:

  • (ArgumentError)


50
51
52
53
54
55
56
# File 'lib/redis_ipc/stream/entry.rb', line 50

def initialize(id: nil, redis_id: nil, status: nil, **)
  id ||= SecureRandom.uuid.delete("-")[0..20]
  status ||= STATUS_PENDING
  raise ArgumentError, "Status is not one of #{VALID_STATUS}" unless VALID_STATUS.include?(status)

  super(id: id, redis_id: redis_id, status: status, **)
end

Class Method Details

.from_redis(redis_id, data) ⇒ Entry

Returns an array containing two items, the Redis entry ID and the new Entry instance

Parameters:

  • redis_id (String)

    The entry ID Redis uses internally

  • data (Hash)

    The data from the entry in Redis

Returns:



23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/redis_ipc/stream/entry.rb', line 23

def self.from_redis(redis_id, data)
  return if data.blank?

  data.symbolize_keys!

  # JSON support
  if data[:content].is_a?(String)
    data[:content] = begin
      JSON.parse(data[:content], symbolize_names: true)
    rescue
      data[:content]
    end
  end

  new(redis_id: redis_id, **data)
rescue
  nil
end

Instance Method Details

#fulfilled(content:) ⇒ Object

Makes a copy of the entry, marks it as fulfilled, and adjusts it so it can be sent back to the sender

Parameters:

  • content (String)

    The content to send



74
75
76
77
78
79
80
81
# File 'lib/redis_ipc/stream/entry.rb', line 74

def fulfilled(content:)
  with(
    content: content,
    status: STATUS_FULFILLED,
    source_group: destination_group,
    destination_group: source_group
  )
end

#fulfilled?Boolean

Was this entry fulfilled?

Returns:

  • (Boolean)


102
103
104
# File 'lib/redis_ipc/stream/entry.rb', line 102

def fulfilled?
  status == STATUS_FULFILLED
end

#pending?Boolean

This is entry pending?

Returns:

  • (Boolean)


120
121
122
# File 'lib/redis_ipc/stream/entry.rb', line 120

def pending?
  status == STATUS_PENDING
end

#rejected(content:) ⇒ Object

Makes a copy of the entry, marks it as rejected, and adjusts it so it can be sent back to the sender

Parameters:

  • content (String)

    The content to send



88
89
90
91
92
93
94
95
# File 'lib/redis_ipc/stream/entry.rb', line 88

def rejected(content:)
  with(
    content: content,
    status: STATUS_REJECTED,
    source_group: destination_group,
    destination_group: source_group
  )
end

#rejected?Boolean

Was this entry rejected?

Returns:

  • (Boolean)


111
112
113
# File 'lib/redis_ipc/stream/entry.rb', line 111

def rejected?
  status == STATUS_REJECTED
end

#to_hObject



58
59
60
61
62
63
64
65
66
67
# File 'lib/redis_ipc/stream/entry.rb', line 58

def to_h
  super.tap do |hash|
    hash.delete(:redis_id) # No need to send

    # Allow Hash/Array
    if !hash[:content].is_a?(String)
      hash[:content] = hash[:content].to_json
    end
  end
end