JetDict

Basic usage

julia> using NATS
julia> using NATS.JetStream
julia> nc = NATS.connect()NATS.Connection(unnamed cluster, CONNECTED, 0 subs, 0 unsubs)
julia> kv = JetDict{String}(nc, "example_kv")JetDict{String}()
julia> kv["key1"] = "Value 1""Value 1"
julia> kv["key2"] = "Value 2""Value 2"
julia> kvJetDict{String}(...): "key1" => "Value 1" "key2" => "Value 2"
julia> delete!(kv, "key1")JetDict{String}(...): "key2" => "Value 2"
julia> keyvalue_stream_delete(nc, "example_kv")NATS.JetStream.ApiResult(true)

Key encoding

Keys of keyvalue stream have limitations. They cannot start and and with '.' and can contain limited set of characters.

Alowed characters:

  • letters
  • digits
  • '-', '/', '_', '=', '.'

To allow arbitrary string as a key :base64url encoding parameter may be specified.

julia> using NATS
julia> using NATS.JetStream
julia> nc = NATS.connect()NATS.Connection(unnamed cluster, CONNECTED, 0 subs, 0 unsubs)
julia> kv = JetDict{String}(nc, "example_kv_enc", :base64url)JetDict{String}()
julia> kv["encoded_key1"] = "Value 1""Value 1"
julia> kv["!@#%^&"] = "Value 2""Value 2"
julia> kv["!@#%^&"]"Value 2"
julia> keyvalue_stream_delete(nc, "example_kv_enc")NATS.JetStream.ApiResult(true)

Custom values

julia> using NATS
julia> using NATS.JetStream
julia> import Base: show, convert
julia> struct Person name::String age::Int64 end
julia> function convert(::Type{Person}, msg::NATS.Msg) name, age = split(payload(msg), ",") Person(name, parse(Int64, age)) endconvert (generic function with 222 methods)
julia> function show(io::IO, ::NATS.MIME_PAYLOAD, person::Person) print(io, person.name) print(io, ",") print(io, person.age) endshow (generic function with 364 methods)
julia> nc = NATS.connect()NATS.Connection(unnamed cluster, CONNECTED, 0 subs, 0 unsubs)
julia> kv = JetDict{Person}(nc, "people")JetDict{Main.Person}()
julia> kv["jakub"] = Person("jakub", 22)Main.Person("jakub", 22)
julia> kv["martha"] = Person("martha", 22)Main.Person("martha", 22)
julia> kv["martha"]Main.Person("martha", 22)
julia> kvJetDict{Main.Person}(...): "jakub" => Person("jakub", 22) "martha" => Person("martha", 22)
julia> keyvalue_stream_delete(nc, "people")NATS.JetStream.ApiResult(true)

Watching changes

julia> using NATS
julia> using NATS.JetStream
julia> nc = NATS.connect()NATS.Connection(unnamed cluster, CONNECTED, 0 subs, 0 unsubs)
julia> kv = JetDict{String}(nc, "example_kv_watch")JetDict{String}()
julia> sub = watch(kv) do change @show change endStreamSub("$KV.example_kv_watch.>")
julia> @async begin kv["a"] = "1" kv["b"] = "2" delete!(kv, "a") kv["b"] = "3" kv["a"] = "4" endTask (runnable) @0x00007f93a9cf12d0
julia> sleep(1) # Wait for changeschange = "a" => "1" change = "b" => "2" change = "a" => nothing change = "b" => "3" change = "a" => "4"
julia> stream_unsubscribe(nc, sub)NATS.JetStream.ApiResult(true)
julia> keyvalue_stream_delete(nc, "example_kv_watch")NATS.JetStream.ApiResult(true)

Optimistic concurrency

julia> using NATS
julia> using NATS.JetStream
julia> connection = NATS.connect()NATS.Connection(unnamed cluster, CONNECTED, 0 subs, 0 unsubs)
julia> kv = JetStream.JetDict{String}(connection, "test_kv_concurrency")JetDict{String}()
julia> kv["a"] = "1""1"
julia> @async (sleep(2); kv["a"] = "2")Task (runnable) @0x00007f93f092a8b0
julia> with_optimistic_concurrency(kv) do old = kv["a"] sleep(3) kv["a"] = "$(old)_updated" endERROR: JetStream 400: wrong last sequence: 2.
julia> kv["a"]"2"
julia> keyvalue_stream_delete(connection, "test_kv_concurrency")NATS.JetStream.ApiResult(true)