Publish - Subscribe

Functions implementing NATS Publish-Subscribe distribution model. For queue group 1:1 instead of default 1:N fanout configure subscriptions with the same queue_group argument.

NATS.publishFunction
publish(connection, subject; ...)
publish(connection, subject, data; reply_to)

Publish data to a subject, payload is obtained with show method taking mime application/nats-payload, headers are obtained with show method taking mime application/nats-headers.

There are predefined convertion defined for String type. To publish headers there is defined conversion from tuple taking vector of pairs of strings.

Optional parameters:

  • reply_to: subject to which a result should be published

Examples:

    publish(nc, "some_subject", "Some payload")
    publish("some_subject", ("Some payload", ["some_header" => "Example header value"]))
source
NATS.subscribeFunction
subscribe(
    f,
    connection,
    subject;
    queue_group,
    spawn,
    channel_size,
    monitoring_throttle_seconds
)

Subscribe to a subject.

Optional keyword arguments are:

  • queue_group: NATS server will distribute messages across queue group members
  • spawn: if true task will be spawn for each f invocation, otherwise messages are processed sequentially, default is false
  • channel_size: maximum items buffered for processing, if full messages will be ignored, default is 524288, can be configured globally with NATS_SUBSCRIPTION_CHANNEL_SIZE env variable
  • monitoring_throttle_seconds: time intervals in seconds that handler errors will be reported in logs, default is 5.0 seconds, can be configured globally with NATS_SUBSCRIPTION_ERROR_THROTTLING_SECONDS env variable
source
subscribe(connection, subject; queue_group, channel_size)

Subscribe to a subject in synchronous mode. Client is supposed to call next manually to obtain messages.

Optional keyword arguments are:

  • queue_group: NATS server will distribute messages across queue group members
  • channel_size: maximum items buffered for processing, if full messages will be ignored, default is 524288, can be configured globally with NATS_SUBSCRIPTION_CHANNEL_SIZE env variable
source
NATS.nextFunction

Obtains next message for synchronous subscription.

Optional keyword arguments:

  • no_wait: do not wait for next message, return nothing if buffer is empty
  • no_throw: do not throw exception, returns nothing if cannot get next message
source
NATS.unsubscribeFunction
unsubscribe(connection, sub; max_msgs)

Unsubscrible from a subject. sub is an object returned from subscribe or reply.

Optional keyword arguments are:

  • max_msgs: maximum number of messages server will send after unsubscribe message received in server side, what can occur after some time lag
source
NATS.drainFunction
drain(connection)

Unsubscribe all subscriptions, wait for precessing all messages in buffers, then close connection. Drained connection is no more usable. This method is used to gracefuly stop the process.

Underneeth it periodicaly checks for state of all buffers, interval for checks is configurable per connection with drain_poll parameter of connect method. It can also be set globally with NATS_DRAIN_POLL_INTERVAL_SECONDS environment variable. If not set explicitly default polling interval is 0.2 seconds.

Error will be written to log if drain not finished until timeout expires. Default timeout value is configurable per connection on connect with drain_timeout. Can be also set globally with NATS_DRAIN_TIMEOUT_SECONDS environment variable. If not set explicitly default drain timeout is 5.0 seconds.

source
drain(connection, sub; timer)

Unsubscribe a subscription and wait for precessing all messages in the buffer.

Underneeth it periodicaly checks for state of the buffer, interval for checks is configurable per connection with drain_poll parameter of connect method. It can also be set globally with NATS_DRAIN_POLL_INTERVAL_SECONDS environment variable. If not set explicitly default polling interval is 0.2 seconds.

Optional keyword arguments:

  • timer: error will be thrown if drain not finished until timer expires. Default value is configurable per connection on connect with drain_timeout. Can be also set globally with NATS_DRAIN_TIMEOUT_SECONDS environment variable. If not set explicitly default drain timeout is 5.0 seconds.
source