NATS Logo by Example

Request-Reply in JetStream

CLI Go Python Deno Node Rust C# Java Ruby Elixir C
Jump to the output or the recording
$ nbe run jetstream/request-reply/go
View the source code or learn how to run this example yourself

Code

package main


import (
	"fmt"
	"os"
	"time"


	"github.com/nats-io/nats.go"
)


func main() {
	natsURL := os.Getenv("NATS_URL")


	nc, _ := nats.Connect(natsURL)
	defer nc.Drain()


	js, _ := nc.JetStream()

Create a [work-queue][wq] stream that will act as a buffer for requests.

	js.AddStream(&nats.StreamConfig{
		Name:      "REQUESTS",
		Subjects:  []string{"requests.*"},
		Retention: nats.WorkQueuePolicy,
	})

Create an ephemeral consumer + subscription responsible for replying.

	sub, _ := js.Subscribe("requests.*", func(msg *nats.Msg) {
		var r string
		switch msg.Subject {
		case "requests.order-sandwich":
			r = "🥪"
		case "requests.order-bagel":
			r = "🥯"
		case "requests.order-flatbread":
			r = "🥙"
		default:
			return
		}
		msg.Respond([]byte(r))
	})
	defer sub.Drain()

Send some requests.

	rep, _ := js.Request("requests.order-sandwich", nil, time.Second)
	fmt.Println(string(rep.Data))


	rep, _ = js.Request("requests.order-flatbread", nil, time.Second)
	fmt.Println(string(rep.Data))

If a request cannot be fulfilled, the message is terminated.

	_, err := js.Request("requests.order-drink", nil, time.Second)
	fmt.Printf("timeout? %v\n", err == nats.ErrTimeout)


	info, _ := js.StreamInfo("REQUESTS")
	fmt.Printf("%d remaining in the stream\n", info.State.Msgs)
}

Output

Network ddfc0b34_default  Creating
Network ddfc0b34_default  Created
Container ddfc0b34-nats-1  Creating
Container ddfc0b34-nats-1  Created
Container ddfc0b34-nats-1  Starting
Container ddfc0b34-nats-1  Started
🥪
🥙
timeout? true
0 remaining in the stream

Recording

Note, playback is half speed to make it a bit easier to follow.