Transport data structures
Using custom types as handler input
It is possible to use and return custom types inside subscription handlers if convert method from NATS.Msg is provided. See src/protocol/convert.jl for example implementation for String.
julia> using NATSjulia> struct Person name::String age::Int64 endjulia> import Base: convertjulia> function convert(::Type{Person}, msg::NATS.Msg) name, age = split(payload(msg), ",") Person(name, parse(Int64, age)) endconvert (generic function with 218 methods)julia> connection = NATS.connect() # Important to create a connection after `convert` is definedNATS.Connection(unnamed cluster, CONNECTED, 0 subs, 0 unsubs)julia> sub = subscribe(connection, "EMPLOYEES") do person::Person @show person endNATS.Sub("EMPLOYEES", nothing, 1)julia> publish(connection, "EMPLOYEES", "John,44")julia> sleep(0.2) # Wait for message delivered to subperson = Main.Person("John", 44)julia> drain(connection, sub)
Returning custom types from handler
It is also possible to return any type from a handler in reply and put any type as publish argument if conversion to UTF-8 string is provided. Note that both Julia and NATS protocol use UTF-8 encoding, so no additional conversions are needed.
NATS module defines custom MIME types for payload and headers serialization:
const MIME_PAYLOAD = MIME"application/nats-payload"
const MIME_HEADERS = MIME"application/nats-headers"Conversion method should look like this.
julia> using NATSjulia> struct Person name::String age::Int64 endjulia> import Base: convert, showjulia> function convert(::Type{Person}, msg::NATS.Msg) name, age = split(payload(msg), ",") Person(name, parse(Int64, age)) endconvert (generic function with 219 methods)julia> function show(io::IO, ::NATS.MIME_PAYLOAD, person::Person) print(io, person.name) print(io, ",") print(io, person.age) endshow (generic function with 341 methods)julia> connection = NATS.connect()NATS.Connection(unnamed cluster, CONNECTED, 0 subs, 0 unsubs)julia> sub = reply(connection, "EMPLOYEES.SUPERVISOR") do person::Person if person.name == "Alice" Person("Bob", 44) else Person("Unknown", 0) end endNATS.Sub("EMPLOYEES.SUPERVISOR", nothing, 1)julia> request(Person, connection, "EMPLOYEES.SUPERVISOR", Person("Alice", 22))Main.Person("Bob", 44)julia> drain(connection, sub)
Error handling
Errors can be handled with custom headers.
julia> using NATSjulia> import Base: convert, showjulia> struct Person name::String age::Int64 departament::String endjulia> function convert(::Type{Person}, msg::NATS.Msg) name, age, departament = split(payload(msg), ",") Person(name, parse(Int64, age), departament) endconvert (generic function with 220 methods)julia> function show(io::IO, ::NATS.MIME_PAYLOAD, person::Person) print(io, person.name) print(io, ",") print(io, person.age) print(io, ",") print(io, person.departament) endshow (generic function with 342 methods)julia> nc = NATS.connect()NATS.Connection(unnamed cluster, CONNECTED, 0 subs, 0 unsubs)julia> sub = reply(nc, "EMPLOYEES.SUPERVISOR") do person::Person if person.name == "Alice" Person("Bob", 44, "IT"), ["status" => "ok"] else Person("Unknown", 0, ""), ["status" => "error", "message" => "Supervisor not defined for $(person.name)" ] end endNATS.Sub("EMPLOYEES.SUPERVISOR", nothing, 1)julia> request(Person, nc, "EMPLOYEES.SUPERVISOR", Person("Alice", 33, "IT"))Main.Person("Bob", 44, "IT")julia> error_response = request(nc, "EMPLOYEES.SUPERVISOR", Person("Anna", 33, "ACCOUNTING"));julia> headers(error_response)2-element Vector{Pair{String, String}}: "status" => "error" "message" => "Supervisor not defined for Anna"julia> drain(nc, sub)