JetDict
Basic usage
julia> using NATS
julia> using NATS.JetStream
julia> nc = NATS.connect()
NATS.Connection(unnamed cluster, CONNECTED, 0 subs, 0 unsubs)
julia> kv = JetDict{String}(nc, "example_kv")
JetDict{String}()
julia> kv["key1"] = "Value 1"
"Value 1"
julia> kv["key2"] = "Value 2"
"Value 2"
julia> kv
JetDict{String}(...): "key1" => "Value 1" "key2" => "Value 2"
julia> delete!(kv, "key1")
JetDict{String}(...): "key2" => "Value 2"
julia> keyvalue_stream_delete(nc, "example_kv")
NATS.JetStream.ApiResult(true)
Key encoding
Keys of keyvalue stream have limitations. They cannot start and and with '.' and can contain limited set of characters.
Alowed characters:
- letters
- digits
'-', '/', '_', '=', '.'
To allow arbitrary string as a key :base64url
encoding
parameter may be specified.
julia> using NATS
julia> using NATS.JetStream
julia> nc = NATS.connect()
NATS.Connection(unnamed cluster, CONNECTED, 0 subs, 0 unsubs)
julia> kv = JetDict{String}(nc, "example_kv_enc", :base64url)
JetDict{String}()
julia> kv["encoded_key1"] = "Value 1"
"Value 1"
julia> kv["!@#%^&"] = "Value 2"
"Value 2"
julia> kv["!@#%^&"]
"Value 2"
julia> keyvalue_stream_delete(nc, "example_kv_enc")
NATS.JetStream.ApiResult(true)
Custom values
julia> using NATS
julia> using NATS.JetStream
julia> import Base: show, convert
julia> struct Person name::String age::Int64 end
julia> function convert(::Type{Person}, msg::NATS.Msg) name, age = split(payload(msg), ",") Person(name, parse(Int64, age)) end
convert (generic function with 222 methods)
julia> function show(io::IO, ::NATS.MIME_PAYLOAD, person::Person) print(io, person.name) print(io, ",") print(io, person.age) end
show (generic function with 364 methods)
julia> nc = NATS.connect()
NATS.Connection(unnamed cluster, CONNECTED, 0 subs, 0 unsubs)
julia> kv = JetDict{Person}(nc, "people")
JetDict{Main.Person}()
julia> kv["jakub"] = Person("jakub", 22)
Main.Person("jakub", 22)
julia> kv["martha"] = Person("martha", 22)
Main.Person("martha", 22)
julia> kv["martha"]
Main.Person("martha", 22)
julia> kv
JetDict{Main.Person}(...): "jakub" => Person("jakub", 22) "martha" => Person("martha", 22)
julia> keyvalue_stream_delete(nc, "people")
NATS.JetStream.ApiResult(true)
Watching changes
julia> using NATS
julia> using NATS.JetStream
julia> nc = NATS.connect()
NATS.Connection(unnamed cluster, CONNECTED, 0 subs, 0 unsubs)
julia> kv = JetDict{String}(nc, "example_kv_watch")
JetDict{String}()
julia> sub = watch(kv) do change @show change end
StreamSub("$KV.example_kv_watch.>")
julia> @async begin kv["a"] = "1" kv["b"] = "2" delete!(kv, "a") kv["b"] = "3" kv["a"] = "4" end
Task (runnable) @0x00007f93a9cf12d0
julia> sleep(1) # Wait for changes
change = "a" => "1" change = "b" => "2" change = "a" => nothing change = "b" => "3" change = "a" => "4"
julia> stream_unsubscribe(nc, sub)
NATS.JetStream.ApiResult(true)
julia> keyvalue_stream_delete(nc, "example_kv_watch")
NATS.JetStream.ApiResult(true)
Optimistic concurrency
julia> using NATS
julia> using NATS.JetStream
julia> connection = NATS.connect()
NATS.Connection(unnamed cluster, CONNECTED, 0 subs, 0 unsubs)
julia> kv = JetStream.JetDict{String}(connection, "test_kv_concurrency")
JetDict{String}()
julia> kv["a"] = "1"
"1"
julia> @async (sleep(2); kv["a"] = "2")
Task (runnable) @0x00007f93f092a8b0
julia> with_optimistic_concurrency(kv) do old = kv["a"] sleep(3) kv["a"] = "$(old)_updated" end
ERROR: JetStream 400: wrong last sequence: 2.
julia> kv["a"]
"2"
julia> keyvalue_stream_delete(connection, "test_kv_concurrency")
NATS.JetStream.ApiResult(true)