Transport data structures

Using custom types as handler input

It is possible to use and return custom types inside subscription handlers if convert method from NATS.Msg is provided. See src/protocol/convert.jl for example implementation for String.

julia> using NATS
julia> struct Person name::String age::Int64 end
julia> import Base: convert
julia> function convert(::Type{Person}, msg::NATS.Msg) name, age = split(payload(msg), ",") Person(name, parse(Int64, age)) endconvert (generic function with 219 methods)
julia> connection = NATS.connect() # Important to create a connection after `convert` is definedNATS.Connection(unnamed cluster, CONNECTED, 0 subs, 0 unsubs)
julia> sub = subscribe(connection, "EMPLOYEES") do person::Person @show person endNATS.Sub("EMPLOYEES", nothing, 1)
julia> publish(connection, "EMPLOYEES", "John,44")
julia> sleep(0.2) # Wait for message delivered to subperson = Main.Person("John", 44)
julia> drain(connection, sub)

Returning custom types from handler

It is also possible to return any type from a handler in reply and put any type as publish argument if conversion to UTF-8 string is provided. Note that both Julia and NATS protocol use UTF-8 encoding, so no additional conversions are needed.

NATS module defines custom MIME types for payload and headers serialization:

const MIME_PAYLOAD  = MIME"application/nats-payload"
const MIME_HEADERS  = MIME"application/nats-headers"

Conversion method should look like this.

julia> using NATS
julia> struct Person name::String age::Int64 end
julia> import Base: convert, show
julia> function convert(::Type{Person}, msg::NATS.Msg) name, age = split(payload(msg), ",") Person(name, parse(Int64, age)) endconvert (generic function with 220 methods)
julia> function show(io::IO, ::NATS.MIME_PAYLOAD, person::Person) print(io, person.name) print(io, ",") print(io, person.age) endshow (generic function with 362 methods)
julia> connection = NATS.connect()NATS.Connection(unnamed cluster, CONNECTED, 0 subs, 0 unsubs)
julia> sub = reply(connection, "EMPLOYEES.SUPERVISOR") do person::Person if person.name == "Alice" Person("Bob", 44) else Person("Unknown", 0) end endNATS.Sub("EMPLOYEES.SUPERVISOR", nothing, 1)
julia> request(Person, connection, "EMPLOYEES.SUPERVISOR", Person("Alice", 22))Main.Person("Bob", 44)
julia> drain(connection, sub)

Error handling

Errors can be handled with custom headers.

julia> using NATS
julia> import Base: convert, show
julia> struct Person name::String age::Int64 departament::String end
julia> function convert(::Type{Person}, msg::NATS.Msg) name, age, departament = split(payload(msg), ",") Person(name, parse(Int64, age), departament) endconvert (generic function with 221 methods)
julia> function show(io::IO, ::NATS.MIME_PAYLOAD, person::Person) print(io, person.name) print(io, ",") print(io, person.age) print(io, ",") print(io, person.departament) endshow (generic function with 363 methods)
julia> nc = NATS.connect()NATS.Connection(unnamed cluster, CONNECTED, 0 subs, 0 unsubs)
julia> sub = reply(nc, "EMPLOYEES.SUPERVISOR") do person::Person if person.name == "Alice" Person("Bob", 44, "IT"), ["status" => "ok"] else Person("Unknown", 0, ""), ["status" => "error", "message" => "Supervisor not defined for $(person.name)" ] end endNATS.Sub("EMPLOYEES.SUPERVISOR", nothing, 1)
julia> request(Person, nc, "EMPLOYEES.SUPERVISOR", Person("Alice", 33, "IT"))Main.Person("Bob", 44, "IT")
julia> error_response = request(nc, "EMPLOYEES.SUPERVISOR", Person("Anna", 33, "ACCOUNTING"));
julia> headers(error_response)2-element Vector{Pair{String, String}}: "status" => "error" "message" => "Supervisor not defined for Anna"
julia> drain(nc, sub)