Class: ESM::Connection::Client
- Inherits:
-
Object
- Object
- ESM::Connection::Client
- Includes:
- Lifecycle
- Defined in:
- lib/esm/connection/client.rb,
lib/esm/connection/client/lifecycle.rb
Defined Under Namespace
Modules: Lifecycle
Constant Summary collapse
- HEARTBEAT_INTERVAL =
seconds
3- Metadata =
ImmutableStruct.define(:vg_enabled, :vg_max_sizes)
Constants included from Lifecycle
Lifecycle::VALID_REQUEST_TYPES
Instance Attribute Summary collapse
-
#connected_at ⇒ Object
readonly
Returns the value of attribute connected_at.
-
#public_id ⇒ Object
readonly
Returns the value of attribute public_id.
-
#server_id ⇒ Object
readonly
Returns the value of attribute server_id.
-
#session_id ⇒ Object
readonly
Returns the value of attribute session_id.
Instance Method Summary collapse
- #close(reason = "") ⇒ Object
-
#initialize(tcp_client) ⇒ Client
constructor
A new instance of Client.
- #recent_heartbeat? ⇒ Boolean
- #send_error(content, block: false) ⇒ Object
- #send_message(message) ⇒ Object
-
#send_request(message = nil, type:, block: true, timeout: @config.response_timeout) ⇒ ESM::Connection::Promise, ESM::Message
Sends a request over the network to the client.
- #set_metadata ⇒ Object
- #update_last_heartbeat ⇒ Object
-
#write(type:, id: nil, content: nil) ⇒ ESM::Connection::Client::Promise
Lower level method to send a request to the client and either disregard the response or block (default).
Constructor Details
#initialize(tcp_client) ⇒ Client
Returns a new instance of Client.
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
# File 'lib/esm/connection/client.rb', line 16 def initialize(tcp_client) @socket = ClientSocket.new(tcp_client) @ledger = Ledger.new @config = ESM.config.connection_client @public_id = nil @server_id = nil @session_id = nil @thread_pool = Concurrent::CachedThreadPool.new (vg_enabled: false, vg_max_sizes: 0) execution_interval = @config.request_check @task = Concurrent::TimerTask.execute(execution_interval:) { } @task.add_observer(ErrorHandler.new) @connected_at = Time.current @last_heartbeat = Time.current info!(address:, state: :on_connect) end |
Instance Attribute Details
#connected_at ⇒ Object (readonly)
Returns the value of attribute connected_at.
12 13 14 |
# File 'lib/esm/connection/client.rb', line 12 def connected_at @connected_at end |
#public_id ⇒ Object (readonly)
Returns the value of attribute public_id.
12 13 14 |
# File 'lib/esm/connection/client.rb', line 12 def public_id @public_id end |
#server_id ⇒ Object (readonly)
Returns the value of attribute server_id.
12 13 14 |
# File 'lib/esm/connection/client.rb', line 12 def server_id @server_id end |
#session_id ⇒ Object (readonly)
Returns the value of attribute session_id.
12 13 14 |
# File 'lib/esm/connection/client.rb', line 12 def session_id @session_id end |
Instance Method Details
#close(reason = "") ⇒ Object
42 43 44 45 46 47 48 49 50 51 52 53 |
# File 'lib/esm/connection/client.rb', line 42 def close(reason = "") @socket.shutdown @socket.close ESM::Database.with_connection do on_disconnect(reason) end ESM::Connection::Server.on_disconnect(self) @task.shutdown end |
#recent_heartbeat? ⇒ Boolean
165 166 167 |
# File 'lib/esm/connection/client.rb', line 165 def recent_heartbeat? (Time.current - @last_heartbeat) < HEARTBEAT_INTERVAL.seconds end |
#send_error(content, block: false) ⇒ Object
59 60 61 62 |
# File 'lib/esm/connection/client.rb', line 59 def send_error(content, block: false) = ESM::Message.new.add_error(:message, content) send_request(, type: :error, block:) end |
#send_message(message) ⇒ Object
55 56 57 |
# File 'lib/esm/connection/client.rb', line 55 def (, **) send_request(, type: :message, **) end |
#send_request(message = nil, type:, block: true, timeout: @config.response_timeout) ⇒ ESM::Connection::Promise, ESM::Message
Sends a request over the network to the client
81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 |
# File 'lib/esm/connection/client.rb', line 81 def send_request( = nil, type:, block: true, timeout: @config.response_timeout) # I feel so dirty. Multiline unless statements *shudder* unless .nil? || .is_a?(ESM::Message) raise TypeError, "Expected ESM::Message or nil. Got #{.class}" end info!( address:, public_id:, server_id:, outbound: {type:, content: &.to_h} ) id = &.id content = &.to_s # Send the data over the network promise = write(id:, type:, content:) return promise.execute unless block # Block and wait for a response or timeout response = promise.wait_for_response(timeout) raise response.reason if response.rejected? = ESM::Message.from_string(response.value) .(server_id:) info!(address:, public_id:, server_id:, inbound: .to_h) # Messages with errors do not contain any extra data or metadata # Merge the errors from the response into the original message and use that # to build the error messages (the error message can reference data/metadata) if .errors? .(server_id:) .add_errors(.errors.map(&:to_h)) = ESM::Embed.build(:error, description: ..join("\n")) raise ESM::Exception::ExtensionError, end end |
#set_metadata ⇒ Object
38 39 40 |
# File 'lib/esm/connection/client.rb', line 38 def (**) @metadata = Metadata.new(**) end |
#update_last_heartbeat ⇒ Object
161 162 163 |
# File 'lib/esm/connection/client.rb', line 161 def update_last_heartbeat @last_heartbeat = Time.current end |
#write(type:, id: nil, content: nil) ⇒ ESM::Connection::Client::Promise
Lower level method to send a request to the client and either disregard the response or block (default)
139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 |
# File 'lib/esm/connection/client.rb', line 139 def write(type:, id: nil, content: nil) request = Request.new(id:, type:, content:) # Adding the request to the ledger allows us to track the request across multiple threads # ensuring the response to passed back to the blocking thread promise = @ledger.add(request) # All data passed is in JSON format # ESM will never be huge to the point where JSON is a limitation so this # isn't a concern of mine content = request.to_json # Compress content = ActiveSupport::Gzip.compress(content) # Encrypt content = @encryption.encrypt(content) # Once the promise is executed, write the content to the client promise.then { @socket.write(content) } end |