JetDict
Basic usage
julia> using NATSjulia> using NATS.JetStreamjulia> nc = NATS.connect()NATS.Connection(unnamed cluster, CONNECTED, 0 subs, 0 unsubs)julia> kv = JetDict{String}(nc, "example_kv")JetDict{String}()julia> kv["key1"] = "Value 1""Value 1"julia> kv["key2"] = "Value 2""Value 2"julia> kvJetDict{String}(...): "key1" => "Value 1" "key2" => "Value 2"julia> delete!(kv, "key1")JetDict{String}(...): "key2" => "Value 2"julia> keyvalue_stream_delete(nc, "example_kv")NATS.JetStream.ApiResult(true)
Key encoding
Keys of keyvalue stream have limitations. They cannot start and and with '.' and can contain limited set of characters.
Alowed characters:
- letters
- digits
'-', '/', '_', '=', '.'
To allow arbitrary string as a key :base64url encoding parameter may be specified.
julia> using NATSjulia> using NATS.JetStreamjulia> nc = NATS.connect()NATS.Connection(unnamed cluster, CONNECTED, 0 subs, 0 unsubs)julia> kv = JetDict{String}(nc, "example_kv_enc", :base64url)JetDict{String}()julia> kv["encoded_key1"] = "Value 1""Value 1"julia> kv["!@#%^&"] = "Value 2""Value 2"julia> kv["!@#%^&"]"Value 2"julia> keyvalue_stream_delete(nc, "example_kv_enc")NATS.JetStream.ApiResult(true)
Custom values
julia> using NATSjulia> using NATS.JetStreamjulia> import Base: show, convertjulia> struct Person name::String age::Int64 endjulia> function convert(::Type{Person}, msg::NATS.Msg) name, age = split(payload(msg), ",") Person(name, parse(Int64, age)) endconvert (generic function with 221 methods)julia> function show(io::IO, ::NATS.MIME_PAYLOAD, person::Person) print(io, person.name) print(io, ",") print(io, person.age) endshow (generic function with 343 methods)julia> nc = NATS.connect()NATS.Connection(unnamed cluster, CONNECTED, 0 subs, 0 unsubs)julia> kv = JetDict{Person}(nc, "people")JetDict{Main.Person}()julia> kv["jakub"] = Person("jakub", 22)Main.Person("jakub", 22)julia> kv["martha"] = Person("martha", 22)Main.Person("martha", 22)julia> kv["martha"]Main.Person("martha", 22)julia> kvJetDict{Main.Person}(...): "jakub" => Person("jakub", 22) "martha" => Person("martha", 22)julia> keyvalue_stream_delete(nc, "people")NATS.JetStream.ApiResult(true)
Watching changes
julia> using NATSjulia> using NATS.JetStreamjulia> nc = NATS.connect()NATS.Connection(unnamed cluster, CONNECTED, 0 subs, 0 unsubs)julia> kv = JetDict{String}(nc, "example_kv_watch")JetDict{String}()julia> sub = watch(kv) do change @show change endStreamSub("$KV.example_kv_watch.>")julia> @async begin kv["a"] = "1" kv["b"] = "2" delete!(kv, "a") kv["b"] = "3" kv["a"] = "4" endTask (runnable) @0x00007f352e7755f0julia> sleep(1) # Wait for changeschange = "a" => "1" change = "b" => "2" change = "a" => nothing change = "b" => "3" change = "a" => "4"julia> stream_unsubscribe(nc, sub)NATS.JetStream.ApiResult(true)julia> keyvalue_stream_delete(nc, "example_kv_watch")NATS.JetStream.ApiResult(true)
Optimistic concurrency
julia> using NATSjulia> using NATS.JetStreamjulia> connection = NATS.connect()NATS.Connection(unnamed cluster, CONNECTED, 0 subs, 0 unsubs)julia> kv = JetStream.JetDict{String}(connection, "test_kv_concurrency")JetDict{String}()julia> kv["a"] = "1""1"julia> @async (sleep(2); kv["a"] = "2")Task (runnable) @0x00007f357dda6ef0julia> with_optimistic_concurrency(kv) do old = kv["a"] sleep(3) kv["a"] = "$(old)_updated" endERROR: JetStream 400: wrong last sequence: 2.julia> kv["a"]"2"julia> keyvalue_stream_delete(connection, "test_kv_concurrency")NATS.JetStream.ApiResult(true)