Publish - Subscribe
Functions implementing NATS Publish-Subscribe distribution model. For queue group 1:1
instead of default 1:N
fanout configure subscriptions with the same queue_group
argument.
NATS.publish
— Functionpublish(connection, subject; ...)
publish(connection, subject, data; reply_to)
Publish data
to a subject
, payload is obtained with show
method taking mime
application/nats-payload
, headers are obtained with show
method taking mime
application/nats-headers
.
There are predefined convertion defined for String
type. To publish headers there is defined conversion from tuple taking vector of pairs of strings.
Optional parameters:
reply_to
: subject to which a result should be published
Examples:
publish(nc, "some_subject", "Some payload")
publish("some_subject", ("Some payload", ["some_header" => "Example header value"]))
NATS.subscribe
— Functionsubscribe(
f,
connection,
subject;
queue_group,
spawn,
channel_size,
monitoring_throttle_seconds
)
Subscribe to a subject.
Optional keyword arguments are:
queue_group
: NATS server will distribute messages across queue group membersspawn
: iftrue
task will be spawn for eachf
invocation, otherwise messages are processed sequentially, default isfalse
channel_size
: maximum items buffered for processing, if full messages will be ignored, default is524288
, can be configured globally withNATS_SUBSCRIPTION_CHANNEL_SIZE
env variablemonitoring_throttle_seconds
: time intervals in seconds that handler errors will be reported in logs, default is5.0
seconds, can be configured globally withNATS_SUBSCRIPTION_ERROR_THROTTLING_SECONDS
env variable
subscribe(connection, subject; queue_group, channel_size)
Subscribe to a subject in synchronous mode. Client is supposed to call next
manually to obtain messages.
Optional keyword arguments are:
queue_group
: NATS server will distribute messages across queue group memberschannel_size
: maximum items buffered for processing, if full messages will be ignored, default is524288
, can be configured globally withNATS_SUBSCRIPTION_CHANNEL_SIZE
env variable
NATS.next
— FunctionObtains next message for synchronous subscription.
Optional keyword arguments:
no_wait
: do not wait for next message, returnnothing
if buffer is emptyno_throw
: do not throw exception, returnsnothing
if cannot get next message
NATS.unsubscribe
— Functionunsubscribe(connection, sub; max_msgs)
Unsubscrible from a subject. sub
is an object returned from subscribe
or reply
.
Optional keyword arguments are:
max_msgs
: maximum number of messages server will send afterunsubscribe
message received in server side, what can occur after some time lag
NATS.drain
— Functiondrain(connection)
Unsubscribe all subscriptions, wait for precessing all messages in buffers, then close connection. Drained connection is no more usable. This method is used to gracefuly stop the process.
Underneeth it periodicaly checks for state of all buffers, interval for checks is configurable per connection with drain_poll
parameter of connect
method. It can also be set globally with NATS_DRAIN_POLL_INTERVAL_SECONDS
environment variable. If not set explicitly default polling interval is 0.2
seconds.
Error will be written to log if drain not finished until timeout expires. Default timeout value is configurable per connection on connect
with drain_timeout
. Can be also set globally with NATS_DRAIN_TIMEOUT_SECONDS
environment variable. If not set explicitly default drain timeout is 5.0
seconds.
drain(connection, sub; timer)
Unsubscribe a subscription and wait for precessing all messages in the buffer.
Underneeth it periodicaly checks for state of the buffer, interval for checks is configurable per connection with drain_poll
parameter of connect
method. It can also be set globally with NATS_DRAIN_POLL_INTERVAL_SECONDS
environment variable. If not set explicitly default polling interval is 0.2
seconds.
Optional keyword arguments:
timer
: error will be thrown if drain not finished untiltimer
expires. Default value is configurable per connection onconnect
withdrain_timeout
. Can be also set globally withNATS_DRAIN_TIMEOUT_SECONDS
environment variable. If not set explicitly default drain timeout is5.0
seconds.