Publish - Subscribe
Functions implementing NATS Publish-Subscribe distribution model. For queue group 1:1 instead of default 1:N fanout configure subscriptions with the same queue_group argument.
NATS.publish — Functionpublish(connection, subject; ...)
publish(connection, subject, data; reply_to)
Publish data to a subject, payload is obtained with show method taking mime application/nats-payload, headers are obtained with show method taking mime application/nats-headers.
There are predefined convertion defined for String type. To publish headers there is defined conversion from tuple taking vector of pairs of strings.
Optional parameters:
reply_to: subject to which a result should be published
Examples:
publish(nc, "some_subject", "Some payload")
publish("some_subject", ("Some payload", ["some_header" => "Example header value"]))NATS.subscribe — Functionsubscribe(
f,
connection,
subject;
queue_group,
spawn,
channel_size,
monitoring_throttle_seconds
)
Subscribe to a subject.
Optional keyword arguments are:
queue_group: NATS server will distribute messages across queue group membersspawn: iftruetask will be spawn for eachfinvocation, otherwise messages are processed sequentially, default isfalsechannel_size: maximum items buffered for processing, if full messages will be ignored, default is524288, can be configured globally withNATS_SUBSCRIPTION_CHANNEL_SIZEenv variablemonitoring_throttle_seconds: time intervals in seconds that handler errors will be reported in logs, default is5.0seconds, can be configured globally withNATS_SUBSCRIPTION_ERROR_THROTTLING_SECONDSenv variable
subscribe(
connection,
subject;
queue_group,
channel_size,
monitoring_throttle_seconds
)
Subscribe to a subject in synchronous mode. Client is supposed to call next manually to obtain messages.
Optional keyword arguments are:
queue_group: NATS server will distribute messages across queue group memberschannel_size: maximum items buffered for processing, if full messages will be ignored, default is524288, can be configured globally withNATS_SUBSCRIPTION_CHANNEL_SIZEenv variablemonitoring_throttle_seconds: time intervals in seconds that handler errors will be reported in logs, default is5.0seconds, can be configured globally withNATS_SUBSCRIPTION_ERROR_THROTTLING_SECONDSenv variable
NATS.next — Functionnext(connection, sub; no_wait, no_throw)
Obtains next message for synchronous subscription.
Optional keyword arguments:
no_wait: do not wait for next message, returnnothingif buffer is emptyno_throw: do not throw exception, returnsnothingif cannot get next message
next(T, connection, sub; no_wait, no_throw)
Obtains next message for synchronous subscription converting it to requested T type.
Optional keyword arguments:
no_wait: do not wait for next message, returnnothingif buffer is emptyno_throw: do not throw exception, returnsnothingif cannot get next message
next(connection, sub, batch; no_wait, no_throw)
Obtains batch of messages for synchronous subscription.
Optional keyword arguments:
no_wait: do not wait for next message, return empty vector if buffer is emptyno_throw: do not throw exception, stops collecting messages on error
next(T, connection, sub, batch; no_wait, no_throw)
Obtains batch of messages for synchronous subscription converting them to reqested T type.
Optional keyword arguments:
no_wait: do not wait for next message, return empty vector if buffer is emptyno_throw: do not throw exception, stops collecting messages on error
NATS.unsubscribe — Functionunsubscribe(connection, sub; max_msgs)
Unsubscrible from a subject. sub is an object returned from subscribe or reply.
Optional keyword arguments are:
max_msgs: maximum number of messages server will send afterunsubscribemessage received in server side, what can occur after some time lag
NATS.drain — Functiondrain(connection)
Unsubscribe all subscriptions, wait for precessing all messages in buffers, then close connection. Drained connection is no more usable. This method is used to gracefuly stop the process.
Underneeth it periodicaly checks for state of all buffers, interval for checks is configurable per connection with drain_poll parameter of connect method. It can also be set globally with NATS_DRAIN_POLL_INTERVAL_SECONDS environment variable. If not set explicitly default polling interval is 0.2 seconds.
Error will be written to log if drain not finished until timeout expires. Default timeout value is configurable per connection on connect with drain_timeout. Can be also set globally with NATS_DRAIN_TIMEOUT_SECONDS environment variable. If not set explicitly default drain timeout is 5.0 seconds.
drain(connection, sub; timeout)
Unsubscribe a subscription and wait for precessing all messages in the buffer.
Underneeth it periodicaly checks for state of the buffer, interval for checks is configurable per connection with drain_poll parameter of connect method. It can also be set globally with NATS_DRAIN_POLL_INTERVAL_SECONDS environment variable. If not set explicitly default polling interval is 0.2 seconds.
Optional keyword arguments:
timeout: error will be thrown if drain not finished untiltimeoutexpires. Default value is configurable per connection onconnectwithdrain_timeout. Can be also set globally withNATS_DRAIN_TIMEOUT_SECONDSenvironment variable. If not set explicitly default drain timeout is5.0seconds.