Contents

NATS KV Codec

As part of a work project we needed to store some larger than normal data in the NATS KV. The default limit for a value is 1MiB, with the possible maximum being 64MiB. The recommendation is that you don’t go above 8MiB in order to minimise the impact of buffering these “large” payloads.

Because the payloads that I was looking to store are fairly compressible I was looking for a way to add a compression transparently to the caching module we are using to wrap around the ORM.

My initial steps were to look at adding this as a feature of the NATS adaptor in the caching module. However, this felt like the wrong path to take as none of the other adaptors would have this as an option.

After a bit of research I found that NATS KV has an option to supply a codec to the bucket connector, the codec is then applied to the keys and values automatically. Win!

The NATS.js Codec Interface

Under the hood in modern nats.js (specifically the @nats-io/kv package), the KV store operates on raw Uint8Array buffers. To allow custom behaviour, NATS exposes the KvCodec<T> and KvCodecs interfaces:

export interface KvCodec<T> {
  encode(value: T): T;
  decode(value: T): T;
}

export interface KvCodecs {
  key: KvCodec<string>;
  value: KvCodec<Uint8Array>;
}

By default, NATS uses a pass-through no-op codec for keys and values. But by implementing custom functions, we can transparently compress or encrypt the payloads on the client-side before they are sent to the NATS JetStream backend.

Building a Snappy Compression Codec in TypeScript

import { connect, KvCodec, KvCodecs } from "nats";
import { compressSync, uncompressSync } from "snappy";

// 1. Create a codec to handle transparent Snappy compression of the binary payloads
const snappyValueCodec: KvCodec<Uint8Array> = {
  encode(value: Uint8Array): Uint8Array {
    // Compress on write
    return compressSync(value);
  },
  decode(value: Uint8Array): Uint8Array {
    // Decompress on read
    return uncompressSync(value);
  }
};

// 2. Wrap it with the default no-op key codec
const compressedCodecs: KvCodecs = {
  key: {
    encode: (key: string) => key,
    decode: (key: string) => key,
  },
  value: snappyValueCodec
};

Hooking It Up to the KV Bucket

To use the custom codec, it is passed via the codec option when binding to or creating the KV bucket using JetStream:

// Connect to NATS
const nc = await connect({ servers: "localhost:4222" });
const js = jetstream(nc);

// Open or create the KV bucket with the compression codec applied
const kv = await Kvm(js).create("large_payload_cache", {
  codec: compressedCodecs,
});

Using It with JSON Payloads

Because the NATS client put() method accepts strings or Uint8Arrays, it automatically handles standard string conversions before passing them to the codec. If you are caching complex JSON objects, you need to stringify the object before passing it to the bucket alongside your compressed KV bucket:

// Writing to the bucket (automatically serialized to JSON, then Snappy-compressed)
const userSession = { userId: "user_123", permissions: ["user"], token: "..." };
await bucket.put("session.user_123", JSON.stringify(userSession));

// Reading from the bucket (automatically Snappy-decompressed, then parsed to JSON)
const entry = await kv.get("session.user_123");
if (entry) {
  const session = JSON.parse(entry.value);
  console.log(session.userId); // "user_123"
}

Downsides to using Codecs in NATS

Given that adding in a compression step that is synchronous there is the possibility that the event loop could be blocked while the payload is being compressed. However, given that the compression speed is around 250MiB/s, compressing a 14MiB JSON payload down to about 3.7MiB will add on 56ms to the length of a request. So because of this additional latency, the use of codecs needs to be reserved for objects which have a higher latency to generate, such as heavy DB queries.