| 
4 | 4 | |----------|--------------------------------------------------------------|  | 
5 | 5 | | Date     | 2025-07-11                                                   |  | 
6 | 6 | | Author   | @MauriceVanVeen                                              |  | 
7 |  | -| Status   | Proposed                                                     |  | 
 | 7 | +| Status   | Implemented                                                  |  | 
8 | 8 | | Tags     | jetstream, kv, objectstore, server, client, refinement, 2.12 |  | 
9 | 9 | | Updates  | ADR-8, ADR-17, ADR-20, ADR-31, ADR-37                        |  | 
10 | 10 | 
 
  | 
@@ -149,3 +149,130 @@ purge markers. Therefore, the KV abstraction still has these guarantees since it  | 
149 | 149 | Since this is an opt-in on a read request or consumer create basis, this is not a breaking change. Depending on client  | 
150 | 150 | implementation, this could be harder to implement. But given it's just another field in the `JSApiMsgGetRequest` and  | 
151 | 151 | `ConsumerConfig`, each client should have no trouble supporting it.  | 
 | 152 | + | 
 | 153 | +## Client implementation  | 
 | 154 | + | 
 | 155 | +The below sections outline what additions the clients should support for message read requests and consumers, as used in  | 
 | 156 | +JetStream streams, KV and Object Store.  | 
 | 157 | + | 
 | 158 | +Generally, clients should expect error codes such as `NATS/1.0 412 Min Last Sequence` for Direct Get requests. Message  | 
 | 159 | +Get requests will return the following error code:  | 
 | 160 | + | 
 | 161 | +```go  | 
 | 162 | +JSStreamMinLastSeqErr: {Code: 412, ErrCode: 10180, Description: "min last sequence"},  | 
 | 163 | +```  | 
 | 164 | + | 
 | 165 | +A consumer created with a `min_last_seq` does not return errors. However, the consumer will wait with delivering  | 
 | 166 | +messages until the minimum last sequence is reached for the underlying stream store.  | 
 | 167 | + | 
 | 168 | +### Note about testing  | 
 | 169 | + | 
 | 170 | +A replicated stream can have followers that are slightly lagging behind in their applies, allowing for a stale read to  | 
 | 171 | +be served after the client has just written a new value. This is inherently a race condition and can't be controlled by  | 
 | 172 | +a client test, unless it meticulously controls the state of the server (for example through embedding the server).  | 
 | 173 | + | 
 | 174 | +The recommended way for writing tests would be:  | 
 | 175 | + | 
 | 176 | +- Test Message Get/Direct Get requests with a too high sequence that doesn't exist (yet) in the stream. It should return  | 
 | 177 | +  the `412 Min Last Sequence` error. Then publish a new message to the stream, get the publish acknowledgement, and  | 
 | 178 | +  confirm that a retry of the previous read succeeds.  | 
 | 179 | +- Test Consumers by using a too high sequence that doesn't exist (yet) in the stream. The consumer should not deliver  | 
 | 180 | +  messages. Then publish a new message to the stream, reaching the min last sequence threshold, the consumer should now  | 
 | 181 | +  start delivering messages.  | 
 | 182 | + | 
 | 183 | +### Message read requests  | 
 | 184 | + | 
 | 185 | +- Message read requests (Message Get & Direct Get), such as `stream.GetMsg` and `stream.GetLastMsgForSubject`, should  | 
 | 186 | +  support an option to include `min_last_seq` in the body of `JSApiMsgGetRequest`.  | 
 | 187 | + | 
 | 188 | +**Example:**  | 
 | 189 | + | 
 | 190 | +```go  | 
 | 191 | +// Write  | 
 | 192 | +ack, err := js.Publish("foo", nil)  | 
 | 193 | + | 
 | 194 | +// Reads  | 
 | 195 | +msg, err := stream.GetMsg(ctx, ack.Sequence, jetstream.MinLastSequence(ack.Sequence))  | 
 | 196 | +// -> $JS.API.DIRECT.GET.STREAM {"seq":1,"min_last_seq":1}  | 
 | 197 | +msg, err := stream.GetLastMsgForSubject(ctx, "foo", jetstream.MinLastSequence(ack.Sequence))  | 
 | 198 | +// -> $JS.API.DIRECT.GET.STREAM.foo {"min_last_seq":1}  | 
 | 199 | +```  | 
 | 200 | + | 
 | 201 | +- Similar to the above additions, KV should also support passing a minimum last revision.  | 
 | 202 | + | 
 | 203 | +**Example:**  | 
 | 204 | + | 
 | 205 | +```go  | 
 | 206 | +kve, err := kv.Get(ctx, "key", jetstream.MinLastSequence(ack.Sequence))  | 
 | 207 | +kve, err := kv.GetRevision(ctx, "foo", 1, jetstream.MinLastSequence(ack.Sequence))  | 
 | 208 | +```  | 
 | 209 | + | 
 | 210 | +### Consumers  | 
 | 211 | + | 
 | 212 | +- Similar to passing a `min_last_seq` in read requests, this should also be optionally passed in the `ConsumerConfig`  | 
 | 213 | +  when creating a consumer. This is not strictly required when the consumer is used for endless consumption, but should  | 
 | 214 | +  be supported when an "ordered consumer" is used since it's often used for "limited consumption" for example with  | 
 | 215 | +  `kv.ListKeys()`.  | 
 | 216 | + | 
 | 217 | +**Example:**  | 
 | 218 | + | 
 | 219 | +```go  | 
 | 220 | +// Start consuming, ensuring the newly written message is included (in NumPending counts, etc.)  | 
 | 221 | +ack, err := js.Publish("foo", nil)  | 
 | 222 | +c, err := stream.CreateConsumer(ctx, jetstream.ConsumerConfig{MinLastSeq: ack.Sequence})  | 
 | 223 | + | 
 | 224 | +// List all keys, including a newly written key.  | 
 | 225 | +r, err := kv.Put(ctx, "key", []byte("value"))  | 
 | 226 | +keys, err := kv.ListKeys(ctx, jetstream.MinLastRevision(r))  | 
 | 227 | +```  | 
 | 228 | + | 
 | 229 | +### KV Store  | 
 | 230 | + | 
 | 231 | +The `kv.Create` method ensures a key only gets created if it doesn't already exist. If the key was previously deleted or  | 
 | 232 | +purged, the client can also handle these conditions. However, because the `kv.Create` is responded to by the stream  | 
 | 233 | +leader and the `kv.Get` it does internally could be answered by an outdated follower, the subsequent internal  | 
 | 234 | +`kv.Update` call could then fail.  | 
 | 235 | + | 
 | 236 | +When the client receives the following error: `wrong last sequence: 5`, it should recognize this and extract the  | 
 | 237 | +sequence from the error message. The error format is `wrong last sequence: {seq}`, and the sequence is that of the  | 
 | 238 | +revision it needs to pass in the `kv.Update` call.  | 
 | 239 | + | 
 | 240 | +This removes the need for the intermediate `kv.Get` call that could return stale reads, and ensures the `kv.Update` has  | 
 | 241 | +the required "monotonic read" property.  | 
 | 242 | + | 
 | 243 | +### Object Store  | 
 | 244 | + | 
 | 245 | +Object Store uses a combination of message read requests and consumers, to both get single-message object info as well  | 
 | 246 | +as reading the object itself.  | 
 | 247 | + | 
 | 248 | +- Write requests, such as `obs.Put`, should return the highest sequence of that object as `ObjectInfo.Sequence`. This  | 
 | 249 | +  highest sequence is the sequence of the "meta message" which is sent last after the object chunks.  | 
 | 250 | +- All single-message read requests should support, similar to KV, passing the `min_last_seq` in the message/direct get  | 
 | 251 | +  request.  | 
 | 252 | +- All consumers used to gather the object data should support passing the `min_last_seq` in the `ConsumerConfig`.  | 
 | 253 | + | 
 | 254 | +**Example:**  | 
 | 255 | + | 
 | 256 | +```go  | 
 | 257 | +// Write object.  | 
 | 258 | +info, err := obs.PutString(ctx, "file", "data")  | 
 | 259 | + | 
 | 260 | +// Listing objects should include written file.  | 
 | 261 | +lch, err := obs.List(ctx, jetstream.MinLastSequence(info.Sequence))  | 
 | 262 | + | 
 | 263 | +// Watch itself doesn't strictly require MinLastSequence support,  | 
 | 264 | +// since it's used for endless consumption.  | 
 | 265 | +watcher, err := obs.Watch(ctx)  | 
 | 266 | +for {  | 
 | 267 | +    select {  | 
 | 268 | +    case info := <-watcher.Updates():  | 
 | 269 | +        if info == nil {  | 
 | 270 | +            return  | 
 | 271 | +        }  | 
 | 272 | +        // Object read should support passing MinLastSequence to ensure the consumed metadata  | 
 | 273 | +        // can be retrieved. The watcher could live on the stream leader's server, but the  | 
 | 274 | +        // consumer to retrieve the chunks could be created on a temporarily outdated follower.  | 
 | 275 | +        value, err := obs.GetString(ctx, info.Name, jetstream.MinLastSequence(info.Sequence))  | 
 | 276 | +    }  | 
 | 277 | +}  | 
 | 278 | +```  | 
0 commit comments