Thursday, September 19, 2024 11:56:41 PM
> settings

Customize


Authenticate

> stream_context.rb
# frozen_string_literal: true

RSpec.shared_context("stream") do
  let!(:stream_name) { "example_stream" }
  let!(:group_name) { "example_group" }

  let(:logger) { Logger.new($stdout, level: :info) }

  let!(:redis_commands_opts) { {} } # { {logger: logger} }
  let!(:redis_commands) do
    RedisIPC::Stream::Commands.new(stream_name, group_name, **redis_commands_opts)
  end

  let(:redis_pool) { redis_commands.redis_pool }
  let(:redis) { redis_pool.checkout }

  let(:ledger) { RedisIPC::Stream::Ledger.new }

  let(:example_entry) do
    RedisIPC::Stream::Entry.new(
      instance_id: redis_commands.instance_id,
      source_group: group_name,
      destination_group: "other_example_group",
      content: Faker::String.random
    )
  end

  before do
    redis_commands.destroy_group
    redis_commands.create_group

    # Do not delete the stream/group here as that this can run AFTER consumers are created
    # Which causes issues when they go to check for entries

    # Tracking which groups are created to avoid cleaning up data
    @groups = Concurrent::Map.new
    @groups[redis_commands.group_name] = redis_commands
  end

  after do
    redis_commands.destroy_group
    redis_commands.delete_stream

    # Forcing the checkin will silence the error
    redis_pool.checkin(force: true)
  end

  delegate :create_group, :entries_size, :consumer_info, :claim_entry,
    :next_unread_entry, :next_pending_entry,
    to: :redis_commands

  def create_consumer(name = nil, group: nil, consumer_class: RedisIPC::Stream::Consumer, **)
    group ||= group_name
    name ||= "#{group}_#{consumer_class.name.demodulize.downcase}_#{Time.now.to_f}"

    redis = @groups[group] ||= RedisIPC::Stream::Commands.new(stream_name, group, **redis_commands_opts)
    redis.create_group # Since this command can create new groups, this needs to be here

    consumer_class.new(name, redis: redis, **)
  end

  def create_dispatcher(name = nil, group: nil, **)
    create_consumer(name, group: group, consumer_class: RedisIPC::Stream::Dispatcher, **)
  end

  def add_to_stream(entry = example_entry, redis: redis_commands)
    redis.add_to_stream(entry)
  end

  def consumer_info_for(consumer)
    consumer_info[consumer.name]
  end

  def send_to_consumer(consumer, content:)
    entry = RedisIPC::Stream::Entry.new(
      instance_id: redis_commands.instance_id,
      content: content,
      source_group: group_name,
      destination_group: consumer.group_name
    )

    add_to_stream(entry)
    next_unread_entry(consumer)
  end
end
All opinions represented herein are my own
- © 2024 itsthedevman
- build 3c15a1b