NATS and Go

Overview

These are some notes and an example on hooking up Golang programs to the NATS messaging system

NATS Messaging

NATS messaging enables data sharing via a publish/subscribe mechanism across different processes, systems and platforms. NATS core offers an "at most once" quality of service. If a subscriber is not listening on the subject (no subject match), or is not active when the message is sent, the message is not received. NATS is a fire-and-forget messaging system – messages are not persistet to disk – this makes it quite lightweight and fast. There are additional modules available for persistence if required, see "STAN" aka NATS Streaming.

Subscription is based on a subject; subscribers may supply wildcards for pattern matching. Built on the basic pubsub pattern NATS supports a request/reply style data exchange. One feat feature is subscriptions based on queue groups. Basically, in a request/reply setup there may be multiple responders subscribed to a queue, and requests are transparently distributed across responders, providing simple free loadbalancing.

Example

As an example I've built a REST API service that sends requests to a backend via NATS using request/reply semantics. The example is written in Go and uses the Gin web framework for the REST frontend and the nats.go bindings to interface with NATS. Both the NATS and the Gin part are cobbled together from the various usage examples in their respective repositories.

  • The REST API will listen on port 8080, it supports two endpoints: a simple request counter and an endpoint for sending requests to backend workers via NATS

  • Backend workers receive parametrized requests, perform work, and send an answer back to the API. For this example, the workers will read a file and return it's contents.

API and backend are started via the same binary; a "role" arg will start the API and the backend respectively.

Frontend

This is rather straightward. The program takes one argument which can be either web or backend. It then will start the API or the backend worker process.

Abridged code:

package main

import (...)

func main() {
    // Role backend: run backend node
    if len(os.Args) > 1 && os.Args[1] == "backend" {
        node.SetupBackend()
        return
    }
    // Role web: run webserver
    if len(os.Args) > 1 && os.Args[1] == "web" {
        r := web.Setup()
        r.Run()
        return
    }

    log.Fatalln("Usage: fooweb web|backend")
}

API service

The API service uses the Gin web framework to create two endpoints, /stats and /req. The stats endpoint will simply return the value of a global counter variable. The /req endpoint will connect to the NATS system and send a request to the backend processes.

Connection setup

The connection function connects to the default NATS service endpoint. I don't want to reconnect on every request, so a global connection variable is utilized. In order to ensure connection setup happens only once, the global connection variable is guarded by a sync flag. Should an error occur during connection setup the program will bomb out.

var (
    once sync.Once
    conn *nats.Conn
)

func Connect() *nats.Conn {
    once.Do(func() {
        var err error
        conn, err = nats.Connect(nats.DefaultURL)
        if err != nil {
            log.Fatal(err)
        }
    })
    return conn
}
Request handler

The request handler will take a Gin context var and uses the NATS connection to send a request to the backend workers. It gets a filename parameter from the Gin context and sends the NATS request with a 2 second timeout. Should there be an error the program is terminated (in a real program the reaction probably should be less drastic). On success, the request counter is incremented, and the response data is returned, wrapped in JSON.

func backend_req(c *gin.Context) {
    nc := Connect()
    filename := c.DefaultPostForm("filename", "/etc/hostname")

    resp, err := nc.Request(NATS_SUBJECT, []byte(filename), 2*time.Second)
    if err != nil {
        if nc.LastError() != nil {
            log.Fatalf("%v for request", nc.LastError())
        }
        log.Fatalf("%v for request", err)
    }
    counter++
    c.JSON(200, map[string]string{ "file_contents":
        string(resp.Data)})
}

A note on the error handling. Errors can come from the request itself, or via the connection (function LastError(). Both are handled the same way here.

Complete module
package web

import (
    "github.com/gin-gonic/gin"
    "github.com/nats-io/nats.go"
    "log"
    "sync"
    "time"
)

const (
    NATS_SUBJECT = "fooweb_req"
)

var (
    once sync.Once
    conn *nats.Conn
    counter int
)

func Connect() *nats.Conn {
    once.Do(func() {
        var err error
        conn, err = nats.Connect(nats.DefaultURL)
        if err != nil {
            log.Fatal(err)
        }
    })
    return conn
}

// Return the value of the request counter
func stats(c *gin.Context) {
    c.JSON(200, gin.H{
        "req_counter": counter,
    })
}

// Handle a request for the backend
func backend_req(c *gin.Context) {
    nc := Connect()
    filename := c.DefaultPostForm("filename", "/etc/hostname")

    resp, err := nc.Request(NATS_SUBJECT, []byte(filename), 2*time.Second)
    if err != nil {
        if nc.LastError() != nil {
            log.Fatalf("%v for request", nc.LastError())
        }
        log.Fatalf("%v for request", err)
    }
    counter++
    c.JSON(200, map[string]string{ "file_contents":
        string(resp.Data)})
}

// Setup the REST API with two endpoints
func Setup() *gin.Engine {
    r := gin.Default()
    r.GET("/stats", stats)
    r.POST("/req", backend_req)
    return r
}

Backend

The backend subscribes a handler function to a queue. The handler function takes the request message, extracts a filename parameter, reads the file and answers the NATS request.

Subscription

The SetupBackend() function performs the NATS connection and subscription. The setup configures a number of options for reconnection and connection timeouts. The subscription function configures the handler function for a specific subject and queue, and returns a subscription object. As I'm not interested in managing the subscription I'm am ignoring the subscription object, along with any error. Errors are checked summarily via the LastError() function in the next lines. Upon subscription I send a Flush() as NATS buffers messages and I want the subscription passed to the server promptly.

Once the subscription is in place, it'll setup an operating system signal handler and wait for an interrupt signal. Upon receiving an interrupt, the connection will be drained so not requests get lost.

func SetupBackend() {
    opts := []nats.Option{nats.Name("fooweb backend")}
    opts = setupConnOptions(opts)

    // Connect to NATS, make conn encoded
    nc, err := nats.Connect(nats.DefaultURL, opts...)
    if err != nil {
        log.Fatal(err)
    }
    defer nc.Close()

    _, _ = nc.QueueSubscribe(NATS_SUBJ, NATS_QUEUE, reply) // errors handled below
    _ = nc.Flush()
    if err := nc.LastError(); err != nil {
        log.Fatal(err)
    }

    log.Printf("Listening on [%s]", NATS_SUBJ)

    // Setup the interrupt handler to drain so we don't miss
    // requests when scaling down.
    c := make(chan os.Signal, 1)
    signal.Notify(c, os.Interrupt)
    <-c // Block
    log.Println()
    log.Printf("Draining...")
    _ = nc.Drain()
    log.Fatalf("Exiting")
}
Handler

The handler function gets a request message as an argument. From this it extracts a request parameter, which is passed to ReadFile(). NATS request messages have a Respond() method which we're calling to return the file contents. Note there is a limit on message size imposed by NATS (currently 1MB) so I'm truncating the filecontents byte slice.

func reply(msg *nats.Msg) {
    fn := string(msg.Data)
    content, err := ioutil.ReadFile(fn)
    if err != nil {
        log.Printf("error reading %s: %s", fn, err)
    }
    if len(content) >= 1024*1024 {
        // Max. NATS message size
        content = content[:1024*1024]
    }
    _ = msg.Respond(content)
}
Complete module

For reference, the complete backend module

package node

import (
    "github.com/nats-io/nats.go"
    "io/ioutil"
    "log"
    "os"
    "os/signal"
    "time"
)

const (
    NATS_SUBJ  = "fooweb_req"
    NATS_QUEUE = "fooweb_queue"
)

// Worker function for the backend
func reply(msg *nats.Msg) {
    fn := string(msg.Data)
    content, err := ioutil.ReadFile(fn)
    if err != nil {
        log.Printf("error reading %s: %s", fn, err)
    }
    if len(content) >= 1024*1024 {
        // Max. NATS message size
        content = content[:1024*1024]
    }
    _ = msg.Respond(content)
}

// Setup backend service
func SetupBackend() {
    opts := []nats.Option{nats.Name("fooweb backend")}
    opts = setupConnOptions(opts)

    // Connect to NATS, make conn encoded
    nc, err := nats.Connect(nats.DefaultURL, opts...)
    if err != nil {
        log.Fatal(err)
    }
    defer nc.Close()

    _, _ = nc.QueueSubscribe(NATS_SUBJ, NATS_QUEUE, reply) // errors handled below
    _ = nc.Flush()
    if err := nc.LastError(); err != nil {
        log.Fatal(err)
    }

    log.Printf("Listening on [%s]", NATS_SUBJ)

    // Setup the interrupt handler to drain so we don't miss
    // requests when scaling down.
    c := make(chan os.Signal, 1)
    signal.Notify(c, os.Interrupt)
    <-c // Block
    log.Println()
    log.Printf("Draining...")
    _ = nc.Drain()
    log.Fatalf("Exiting")
}

func setupConnOptions(opts []nats.Option) []nats.Option {
    totalWait := 10 * time.Minute
    reconnectDelay := time.Second

    opts = append(opts, nats.ReconnectWait(reconnectDelay))
    opts = append(opts, nats.MaxReconnects(int(totalWait/reconnectDelay)))
    opts = append(opts, nats.DisconnectErrHandler(func(nc *nats.Conn, err error) {
        log.Printf("Disconnected due to: %s, will attempt reconnects for %.0fm", err, totalWait.Minutes())
    }))
    opts = append(opts, nats.ReconnectHandler(func(nc *nats.Conn) {
        log.Printf("Reconnected [%s]", nc.ConnectedUrl())
    }))
    opts = append(opts, nats.ClosedHandler(func(nc *nats.Conn) {
        log.Fatalf("Exiting: %v", nc.LastError())
    }))
    return opts
}

Running the example

To run the example, you will need a NATS server. See here for installation options. In general, NATS is distributed as simple binary you start in the foreground.

peter@pirx ~ » /usr/local/bin/nats-server 
[845301] 2020/12/20 21:16:29.995959 [INF] Starting nats-server version 2.1.9
[845301] 2020/12/20 21:16:29.995999 [INF] Git commit [7c76626]
[845301] 2020/12/20 21:16:29.996105 [INF] Listening for client connections on 0.0.0.0:4222
[845301] 2020/12/20 21:16:29.996110 [INF] Server id is NBX3CRYNFAM7ZWQ2KBHZJ33RV6QV2VKTJFHQA4RFTJISX74I4PGTGFDM
[845301] 2020/12/20 21:16:29.996113 [INF] Server is ready

You can get the example source from Github. Do a go build fooweb.go and ensure the NATS server is running.

peter@pirx ~/src/fooweb ±master⚡ » go build fooweb.go
peter@pirx ~/src/fooweb ±master⚡ » ./fooweb       
2020/12/20 20:11:32 Usage: fooweb web|backend

Start an API process and, in a second shell, (at least one) backend processes

peter@pirx ~/src/fooweb ±main⚡ » ./fooweb web
[GIN-debug] [WARNING] Creating an Engine instance with the Logger and Recovery middleware already attached.
[GIN-debug] [WARNING] Running in "debug" mode. Switch to "release" mode in production. 
 - using env:   export GIN_MODE=release
 - using code:  gin.SetMode(gin.ReleaseMode)

[GIN-debug] GET    /stats                    --> sabaini.at/fooweb/web.stats (3 handlers)
[GIN-debug] POST   /req                      --> sabaini.at/fooweb/web.backend_req (3 handlers)
[GIN-debug] Environment variable PORT is undefined. Using port :8080 by default
[GIN-debug] Listening and serving HTTP on :8080

peter@pirx ~/src/fooweb ±main⚡ » ./fooweb backend
2020/12/20 20:38:12 Listening on [fooweb_req]

Now you should be able to curl the endpoints:

peter@pirx ~ » curl -s -XGET http://localhost:8080/stats | json_pp
{
   "req_counter" : 0
}
peter@pirx ~ » curl -s -XPOST -d filename=/etc/hostname http://localhost:8080/req | json_pp
{
   "file_contents" : "pirx\n"
}

Test distribution of requests across multipler responders by retrieving the /proc/self/status file and look for the process id. In my tests it seemed as request distribution was random, that is you should achieve even distribution but only over many requests.

peter@pirx ~ » curl -s -XPOST -d filename=/proc/self/status http://localhost:8080/req | json_pp | grep Pid

Coda

I found NATS server quite enjoyable to work with. I like it for it's straightforward usage and light weight. As NATS itself is written in Go, unsurprisingly the Go bindings seem pretty complete and well-documented. I'd be curious to try and interface it with other systems, e.g. doing the frontend part with Django. For production purposes I'd like to have a properly packaged version with an init script – lets see if I get around to setting up a ppa.