Getting started with RabbitMQ in Golang

Getting started with RabbitMQ in Golang

Introduction

What’s RabbitMQ and Why Should You Care?

Honestly, the RabbitMQ docs explain it so well that I will copy and paste it. Docs

“RabbitMQ is a message broker: it accepts and forwards messages. You can think about it as a post office: when you put the mail that you want to post in a post box, you can be sure that the letter carrier will eventually deliver the mail to your recipient. In this analogy, RabbitMQ is a post box, a post office, and a letter carrier.”

Why Use Golang?

Golang, or Go, is a programming language that's fast and easy to use. I may be biased here but I just love the simplicity of Go.

What You'll Learn

In this guide, you'll learn how to set up RabbitMQ with docker and write Go code to send and receive a blob of messages, using RabbitMQ a super smart mailman. By the end, you’ll get your feet wet and on your way to writing a more complex implementation.

Setting up a Golang Project

First, let’s start a new Go project. Open your terminal or command prompt and type in a command to create a new folder for your project. It’s like setting up a new workspace where all your code will live.

mkdir rabbitmq-tutotrial && cd rabbitmq-tutorial

Another necessary thing to do is to initialize a go module

go mod init <module name>

Now you create the entry point to your Golang app. For the scope of this tutorial the main.go is just fine.

touch main.go

One more thing is needed and that is the third-party package that allows you to communicate with RabbitMQ and you are all set.

go get github.com/rabbitmq/amqp091-go

Setting up RabbitMQ

The easiest way for me I have is using docker. I use docker to spin up services and I do not have to worry about anything like path configurations and all that.

You can start a RabbitMQ service by adding the below in your docker-compose.yml file in your project root and run docker-compose up -d to start the service.

services:
  rabbitMq:
    container_name: rabbitmq
    image: rabbitmq
    ports:
      - "5672:5672"

Connecting to RabbitMQ

Now that all is set up in your main.go file you can now connect to RabbitMQ. In your main.go file import the third-party package you installed earlier.

package main

import (
        amqp "github.com/rabbitmq/amqp091-go"
)

func main() {
    //connect to rabbitMQ 
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") // make sure service is running in docker.

    if err != nil {
        log.Fatalf("Could not connect to rabbitMQ: %s", err.Error())
    }

    defer conn.Close()
}

Let’s break this down a little bit. Dial accepts a string in the AMQP URI format and returns a new Connection over TCP using PlainAuth.

That connection helps you to do things with RabbitMQ. Where the fun is 😁

Publishing and Consuming Messages from a Queue

Trust me this is just technical jargon. A publisher sends the message to the queue and the Consumer retrieves the message in the queue. Simple.

Having said that you need two functions a publisher function and a consumer function.

Let’s just think about it from a top point of view. A publisher will have to connect to a queue or create one. After that is done the publisher will send the message to the queue.

For the consumer, it does the exact opposite. Connects to the queue or create one if one does not exist and listen for any message sent to the queue and does whatever you want it to do.

In your main.go file you create two functions publish and consume. Each function will take a pointer to the amqp connection.

func publish(conn *ampq.Connection) {
 //... logic
}

func consume(conn *ampq.Connection) {
    //... logic
}

Your main.go file should look like the one below now.

package main

import (
        amqp "github.com/rabbitmq/amqp091-go"
)

func main() {
    //connect to rabbitMQ 
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") // make sure service is running in docker.

    if err != nil {
        log.Fatalf("Could not connect to rabbitMQ: %s", err.Error())
    }

    defer conn.Close()
}

func publish(conn *ampq.Connection) {
 //... logic
}

func consume(conn *ampq.Connection) {
    //... logic
}

Let’s complete the publish function. A communication channel has to be opened to enable communication in and out of the queue. Also, the queue has to be declared. You can have multiple queues in RabbitMQ.

func publish(conn *amqp.Connection)  {
    //create a channel this is where most of the api for doing stuff are 
    channel, err :=  conn.Channel()

    if err != nil {
        log.Fatalf("Could not create channel: %s", err.Error())
    }

    defer channel.Close()

    //To send a message, the message must be sent into a queue.Declare a queue.

    queue, err := channel.QueueDeclare(
        "message", // name of queue
        false,   // durable
        false,   // delete when unused
        false,   // exclusive
        false,   // no-wait
        nil,     // arguments
    )

    if err != nil {
        log.Fatalf("Could not declare a queue: %s", err.Error())
    }

    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()

    messageToBeSent := "My first message sent via RabbitMQ"

    err = channel.PublishWithContext( ctx,
        "",
        queue.Name,
        false,
        false,
        amqp.Publishing{
            ContentType: "text/plain",
            Body: []byte(messageToBeSent),
        })

    if err != nil {
        log.Fatalf("Could not send message: %s", err.Error())
    }
}

QueueDeclare creates a queue. You can see that we are only giving it the queue name. This is just a basic implementation.

Ctx otherwise known as context helps RabbitMQ to keep track of what is going on. It tracks things like SIGINT known as signal interrupt.

PublishWithContext sends the message to the queue with a defined type. The message has to be of type []byte.

Now let’s complete the consume function

func consume(conn *amqp.Connection)  {
    channel, err := conn.Channel()
    if err != nil {
        log.Fatalf("Could not create channel: %s", err.Error())
    }

    queue, err := channel.QueueDeclare(
        "message", // name of queue
        false,   // durable
        false,   // delete when unused
        false,   // exclusive
        false,   // no-wait
        nil,     // arguments
    )

    if err != nil {
        log.Fatalf("Could not declare queue: %s", err.Error())
    }
    //consume the message
    msg, err := channel.Consume(
        queue.Name, // queue name
        "",     // consumer
        true,   // auto-ack
        false,  // exclusive
        false,  // no-local
        false,  // no-wait
        nil,    // args
    )

    if err != nil  {
        log.Printf("Could not consume message from rabbitmq queue: %s", err.Error())
    }

    //go routine to keep listening for messages sent to the queue
    go func() {
        for message := range msg{
            log.Printf("Message received: %s", message.Body)
        }
    }()

}

There are quite some similarities between the publish function and the consume function.

Consume function has a go routine. what is a go routine? Simply put it is doing many different things at the same time. Under the hood, the go routine is running on a thread while the main.go runs on a different thread.

Putting it all together we should have something like below

package main

import (
        amqp "github.com/rabbitmq/amqp091-go"
)

func main() {
    //connect to rabbitMQ 
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") // make sure service is running in docker.

    if err != nil {
        log.Fatalf("Could not connect to rabbitMQ: %s", err.Error())
    }

    defer conn.Close()
}

func publish(conn *amqp.Connection)  {
    //create a channel this is where most of the api for doing stuff are 
    channel, err :=  conn.Channel()

    if err != nil {
        log.Fatalf("Could not create channel: %s", err.Error())
    }

    defer channel.Close()

    //To send a message the message must be sent into a queue

    queue, err := channel.QueueDeclare(
        "message", // name of queue
        false,   // durable
        false,   // delete when unused
        false,   // exclusive
        false,   // no-wait
        nil,     // arguments
    )

    if err != nil {
        log.Fatalf("Could not declare a queue: %s", err.Error())
    }

    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()

    messageToBeSent := "My first message sent via RabbitMQ"

    err = channel.PublishWithContext( ctx,
        "",
        queue.Name,
        false,
        false,
        amqp.Publishing{
            ContentType: "text/plain",
            Body: []byte(messageToBeSent),
        })

    if err != nil {
        log.Fatalf("Could not send message: %s", err.Error())
    }
}

func consume(conn *amqp.Connection)  {
    channel, err := conn.Channel()
    if err != nil {
        log.Fatalf("Could not create channel: %s", err.Error())
    }

    queue, err := channel.QueueDeclare(
        "message", // name of queue
        false,   // durable
        false,   // delete when unused
        false,   // exclusive
        false,   // no-wait
        nil,     // arguments
    )

    if err != nil {
        log.Fatalf("Could not declare queue: %s", err.Error())
    }
    //consume the message
    msg, err := channel.Consume(
        queue.Name, // queue name
        "",     // consumer
        true,   // auto-ack
        false,  // exclusive
        false,  // no-local
        false,  // no-wait
        nil,    // args
    )

    if err != nil  {
        log.Printf("Could not consume message from rabbitmq queue: %s", err.Error())
    }

    //go routine to keep listening for messages sent to the queue
    go func() {
        for message := range msg{
            log.Printf("Message received: %s", message.Body)
        }
    }()

}

Update your main.go. This enables you to call the two functions we just completed.

package main

import (
        amqp "github.com/rabbitmq/amqp091-go"
)

func main() {
    //connect to rabbitMQ 
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") // make sure service is running in docker.

    if err != nil {
        log.Fatalf("Could not connect to rabbitMQ: %s", err.Error())
    }

    defer conn.Close()

    publish(conn)

    consume(conn)
}

func publish(conn *amqp.Connection)  {
    //create a channel this is where most of the api for doing stuff are 
    channel, err :=  conn.Channel()

    if err != nil {
        log.Fatalf("Could not create channel: %s", err.Error())
    }

    defer channel.Close()

    //To send a message the message must be sent into a queue

    queue, err := channel.QueueDeclare(
        "message", // name of queue
        false,   // durable
        false,   // delete when unused
        false,   // exclusive
        false,   // no-wait
        nil,     // arguments
    )

    if err != nil {
        log.Fatalf("Could not declare a queue: %s", err.Error())
    }

    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()

    messageToBeSent := "My first message sent via RabbitMQ"

    err = channel.PublishWithContext( ctx,
        "",
        queue.Name,
        false,
        false,
        amqp.Publishing{
            ContentType: "text/plain",
            Body: []byte(messageToBeSent),
        })

    if err != nil {
        log.Fatalf("Could not send message: %s", err.Error())
    }
}

func consume(conn *amqp.Connection)  {
    channel, err := conn.Channel()
    if err != nil {
        log.Fatalf("Could not create channel: %s", err.Error())
    }

    queue, err := channel.QueueDeclare(
        "message", // name of queue
        false,   // durable
        false,   // delete when unused
        false,   // exclusive
        false,   // no-wait
        nil,     // arguments
    )

    if err != nil {
        log.Fatalf("Could not declare queue: %s", err.Error())
    }
    //consume the message
    msg, err := channel.Consume(
        queue.Name, // queue name
        "",     // consumer
        true,   // auto-ack
        false,  // exclusive
        false,  // no-local
        false,  // no-wait
        nil,    // args
    )

    if err != nil  {
        log.Printf("Could not consume message from rabbitmq queue: %s", err.Error())
    }

    //go routine to keep listening for messages sent to the queue
    go func() {
        for message := range msg{
            log.Printf("Message received: %s", message.Body)
        }
    }()

}

Run the main.go and you should see something like below. Fingers crossed 🤞

Conclusion

We’ve walked through setting up RabbitMQ, using Go to connect to it, and sending and receiving messages.

But there are more complex things you and I can do with RabbitMQ. Some resources worth looking at: