Class: ESM::Connection::Client

Inherits:
Object
  • Object
show all
Includes:
Lifecycle
Defined in:
lib/esm/connection/client.rb,
lib/esm/connection/client/lifecycle.rb

Defined Under Namespace

Modules: Lifecycle

Constant Summary collapse

HEARTBEAT_INTERVAL =

seconds

3
Metadata =
ImmutableStruct.define(:vg_enabled, :vg_max_sizes)

Constants included from Lifecycle

Lifecycle::VALID_REQUEST_TYPES

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(tcp_client) ⇒ Client

Returns a new instance of Client.



16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# File 'lib/esm/connection/client.rb', line 16

def initialize(tcp_client)
  @socket = ClientSocket.new(tcp_client)
  @ledger = Ledger.new
  @config = ESM.config.connection_client

  @public_id = nil
  @server_id = nil
  @session_id = nil

  @thread_pool = Concurrent::CachedThreadPool.new
  (vg_enabled: false, vg_max_sizes: 0)

  execution_interval = @config.request_check
  @task = Concurrent::TimerTask.execute(execution_interval:) { on_message }
  @task.add_observer(ErrorHandler.new)

  @connected_at = Time.current
  @last_heartbeat = Time.current

  info!(address:, state: :on_connect)
end

Instance Attribute Details

#connected_atObject (readonly)

Returns the value of attribute connected_at.



12
13
14
# File 'lib/esm/connection/client.rb', line 12

def connected_at
  @connected_at
end

#public_idObject (readonly)

Returns the value of attribute public_id.



12
13
14
# File 'lib/esm/connection/client.rb', line 12

def public_id
  @public_id
end

#server_idObject (readonly)

Returns the value of attribute server_id.



12
13
14
# File 'lib/esm/connection/client.rb', line 12

def server_id
  @server_id
end

#session_idObject (readonly)

Returns the value of attribute session_id.



12
13
14
# File 'lib/esm/connection/client.rb', line 12

def session_id
  @session_id
end

Instance Method Details

#close(reason = "") ⇒ Object



42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/esm/connection/client.rb', line 42

def close(reason = "")
  @socket.shutdown
  @socket.close

  ESM::Database.with_connection do
    on_disconnect(reason)
  end

  ESM::Connection::Server.on_disconnect(self)

  @task.shutdown
end

#recent_heartbeat?Boolean

Returns:

  • (Boolean)


165
166
167
# File 'lib/esm/connection/client.rb', line 165

def recent_heartbeat?
  (Time.current - @last_heartbeat) < HEARTBEAT_INTERVAL.seconds
end

#send_error(content, block: false) ⇒ Object



59
60
61
62
# File 'lib/esm/connection/client.rb', line 59

def send_error(content, block: false)
  message = ESM::Message.new.add_error(:message, content)
  send_request(message, type: :error, block:)
end

#send_message(message) ⇒ Object



55
56
57
# File 'lib/esm/connection/client.rb', line 55

def send_message(message, **)
  send_request(message, type: :message, **)
end

#send_request(message = nil, type:, block: true, timeout: @config.response_timeout) ⇒ ESM::Connection::Promise, ESM::Message

Sends a request over the network to the client

Parameters:

  • message (ESM::Message, nil) (defaults to: nil)

    The data to send

  • type (Symbol)

    The type of request. See ESM::Connection::Request::TYPES

  • block (true/false) (defaults to: true)

    Cause this method to block the current thread and either

    1. Until the request is responded to by the client
    2. The timeout is reached. This will raise ESM::Exception::RejectedPromise
  • timeout (Integer) (defaults to: @config.response_timeout)

    A number in seconds on how long the process will block before considering a message to be timed out. Defaults to response_timeout in config.yml

Returns:



81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
# File 'lib/esm/connection/client.rb', line 81

def send_request(message = nil, type:, block: true, timeout: @config.response_timeout)
  # I feel so dirty. Multiline unless statements *shudder*
  unless message.nil? || message.is_a?(ESM::Message)
    raise TypeError, "Expected ESM::Message or nil. Got #{message.class}"
  end

  info!(
    address:,
    public_id:,
    server_id:,
    outbound: {type:, content: message&.to_h}
  )

  id = message&.id
  content = message&.to_s

  # Send the data over the network
  promise = write(id:, type:, content:)
  return promise.execute unless block

  # Block and wait for a response or timeout
  response = promise.wait_for_response(timeout)
  raise response.reason if response.rejected?

  response_message = ESM::Message.from_string(response.value)
  response_message.(server_id:)

  info!(address:, public_id:, server_id:, inbound: response_message.to_h)

  # Messages with errors do not contain any extra data or metadata
  # Merge the errors from the response into the original message and use that
  # to build the error messages (the error message can reference data/metadata)
  if response_message.errors?
    message
      .(server_id:)
      .add_errors(response_message.errors.map(&:to_h))

    embed = ESM::Embed.build(:error, description: message.error_messages.join("\n"))

    raise ESM::Exception::ExtensionError, embed
  end

  response_message
end

#set_metadataObject



38
39
40
# File 'lib/esm/connection/client.rb', line 38

def (**)
  @metadata = Metadata.new(**)
end

#update_last_heartbeatObject



161
162
163
# File 'lib/esm/connection/client.rb', line 161

def update_last_heartbeat
  @last_heartbeat = Time.current
end

#write(type:, id: nil, content: nil) ⇒ ESM::Connection::Client::Promise

Lower level method to send a request to the client and either disregard the response or block (default)

Parameters:

  • id (String, nil) (defaults to: nil)

    A UUID, if any, that will be used to differentiate this request

  • type (Symbol)

    The request type. See ESM::Connection::Client::Request::TYPES for full list

  • content (Symbol, Array<Numeric>, nil, #bytes) (defaults to: nil)

    The content to send in the request

  • block (true, false)

    Should this method block and wait for the response?

Returns:

  • (ESM::Connection::Client::Promise)

See Also:



139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
# File 'lib/esm/connection/client.rb', line 139

def write(type:, id: nil, content: nil)
  request = Request.new(id:, type:, content:)

  # Adding the request to the ledger allows us to track the request across multiple threads
  # ensuring the response to passed back to the blocking thread
  promise = @ledger.add(request)

  # All data passed is in JSON format
  # ESM will never be huge to the point where JSON is a limitation so this
  # isn't a concern of mine
  content = request.to_json

  # Compress
  content = ActiveSupport::Gzip.compress(content)

  # Encrypt
  content = @encryption.encrypt(content)

  # Once the promise is executed, write the content to the client
  promise.then { @socket.write(content) }
end