How to Set Up Message Queues for Asynchronous Sending

Message queuing is an interesting tool that can help us create scalable websites or web services. A message queue allows applications to communicate asynchronously by sending messages to each other.

At a high level, message queuing is pretty simple. A process, called the Producer, publishes Messages to a Queue, where they are stored until a Consumer process is ready to consume them.

Message Queue Diagram

Publishing messages to a message broker is a very fast operation, and we can leverage this to speed up our web services. We can delegate some tasks to background processes by making our web service publish a message to a queue instead. We can then have background consumer processes consume the messages and perform the delegated tasks as needed.

Message Queue Process Diagram

In this guide, we’ll be using RabbitMQ as a message broker. We’ll integrate RabbitMQ into a sample Flask application to defer sending mail to a background process. The application, Flaskr, may look familiar if you’ve previously completed the Flask Tutorial.

Let’s take a look at our signup view function:

@app.route('/signup', methods=['GET', 'POST'])                                  
def signup():  
    error = None                                                                
    if request.method == 'POST':                                                
        email = request.form['email']                                           
        password = request.form['password']                                     
        if not (email or password):                                             
            return signup_error('Email Address and Password are required.')        
        db = get_db()                                                              
        c = db.cursor()                                                            
        c.execute('SELECT * FROM users WHERE email=?;', (email,))                  
        if c.fetchone():                                                           
            return signup_error('Email Addres already has an account.')            
        c.execute('INSERT INTO users (email, password) VALUES (?, ?);',            
                  (email, pbkdf2_sha256.hash(password)))                           
        db.commit()                                                                  
        send_welcome_email(email)                                                  
        flash('Account Created')                                                   
        return redirect(url_for('login'))                                          
    else:                                                                          
        return render_template('signup.html')                                      


def send_welcome_email(address):  
    res = requests.post(                                                           
        "https://api.mailgun.net/v3/{}/messages".format(app.config['DOMAIN']),  
        auth=("api", MAILGUN_API_KEY),                                             
        data={"from": "Flaskr <noreply@{}>".format(app.config['DOMAIN']),          
              "to": [address],                                                     
              "subject": "Welcome to Flaskr!",                                     
              "text": "Welcome to Flaskr, your account is now active!"}            
    )                                                                              
    if res.status_code != 200:                                                     
        # Something terrible happened :-(                                          
        raise MailgunError("{}-{}".format(res.status_code, res.reason))            


def signup_error(error):  
    return render_template('signup.html', error=error)

Currently the view function checks the database to make sure an account with that email address doesn’t already exist and then adds the new user to the database. The view then immediately sends a welcome email message to the user before finishing the request.

There are a couple of issues with this view that we can improve with a message queue. We don’t need to make the user wait for a response while we send the welcome email. It would be better if we can respond to the user as soon as possible. We also want a way to retry sending the welcome email if for some reason we’re unable to do it on the first try.

First, let’s run an instance of RabbitMQ in a Docker container:

docker run -d --hostname my-rabbit -p 4369:4369 -p 5672:5672  
 -p 35197:35197 --name rabbitmq rabbitmq:3

Now let’s add some code to initialize a queue when our application starts:

def connect_queue():  
    if not hasattr(g, 'rabbitmq'):                                                                    
        g.rabbitmq = pika.BlockingConnection(                                                         
            pika.ConnectionParameters(app.config['RABBITMQ_HOST'])                                    
        )                                                                                             
    return g.rabbitmq                                                                                 


def get_welcome_queue():  
    if not hasattr(g, 'welcome_queue'):                                                               
        conn = connect_queue()                                                                        
        channel = conn.channel()                                                                      
        channel.queue_declare(queue='welcome_queue', durable=True)                                    
        channel.queue_bind(exchange='amq.direct', queue='welcome_queue')                              
        g.welcome_queue = channel                                                                     
    return g.welcome_queue                                                                            


@app.teardown_appcontext                                                                              
def close_queue(error)  
    if hasattr(g, 'rabbitmq'):                                                                        
        g.rabbitmq.close()

We’re declaring a queue called “welcome_queue” that we’ll use to send messages to workers with the email address where the welcome email needs to be sent.

Next, we can update our signup view to publish a message to the queue instead of sending the welcome email.

@app.route('/signup', methods=['GET', 'POST'])                                  
def signup():  
    error = None                                                                
    if request.method == 'POST':                                                
        email = request.form['email']                                           
        password = request.form['password']                                     
        if not (email or password):                                             
            return signup_error('Email Address and Password are required.')     
        db = get_db()                                                           
        c = db.cursor()                                                         
        c.execute('SELECT * FROM users WHERE email=?;', (email,))               
        if c.fetchone():                                                        
            return signup_error('Email Addres already has an account.')         
        c.execute('INSERT INTO users (email, password) VALUES (?, ?);',         
                  (email, pbkdf2_sha256.hash(password)))                        
        db.commit()                                                             
        q = get_welcome_queue()                                                 
        q.basic_publish(                                                        
            exchange='amq.direct',                                              
            routing_key='welcome_queue',                                        
            body=email,                                                         
            properties=pika.BasicProperties(                                    
                delivery_mode=_DELIVERY_MODE_PERSISTENT                         
            )                                                                   
        )                                                                       
        flash('Account Created')                                                
        return redirect(url_for('login'))                                       
    else:                                                                       
        return render_template('signup.html')

Now we can write our worker code. The cool thing about using a message queue is that the worker process can run anywhere. It can run on dedicated worker servers, or maybe alongside your web service. This is what a simple worker script would look like:

import pika  
import requests


# Configuration
DOMAIN = 'example.com'  
MAILGUN_API_KEY = 'YOUR_MAILGUN_API_KEY'  
RABBITMQ_HOST = 'localhost'

connection = pika.BlockingConnection(  
    pika.ConnectionParameters(host=RABBITMQ_HOST)
)
channel = connection.channel()  
channel.queue_declare(queue='welcome_queue', durable=True)


class Error(Exception):  
    pass


class MailgunError(Error):  
    def __init__(self, message):
        self.message = message


def send_welcome_message(ch, method, properties, body):  
    address = body.decode('UTF-8')
    print("Sending welcome email to {}".format(address))
    res = requests.post(
        "https://api.mailgun.net/v3/{}/messages".format(DOMAIN),
        auth=("api", MAILGUN_API_KEY),
        data={"from": "Flaskr <noreply@{}>".format(DOMAIN),
              "to": [address],
              "subject": "Welcome to Flaskr!",
              "text": "Welcome to Flaskr, your account is now active!"}
    )
    ch.basic_ack(delivery_tag=method.delivery_tag)
    if res.status_code != 200:
        # Something terrible happened :-O
        raise MailgunError("{}-{}".format(res.status_code, res.reason))


channel.basic_consume(send_welcome_message, queue='welcome_queue')  
channel.start_consuming()  

We’re almost done! We’ve successfully decoupled sending our welcome email from the web service. Now the only thing missing is for us to retry sending the welcome email in case it fails to send for some reason. One clever way to add retry logic to your application is to use a dead letter queue.

We’ll add a second queue, the “retry_queue”, which we’ll use to temporarily place a message if we ever encounter an email send error. The messages in the retry queue will have an expiration date some time in the future. We’ll configure RabbitMQ such that when a message expires it will be placed back in our welcome queue ready to be picked up by a worker to attempt to send the email again:

retry_channel = connection.channel()  
retry_channel.queue_declare(  
    queue='retry_queue',
    durable=True,
    arguments={
        'x-message-ttl': RETRY_DELAY_MS,
        'x-dead-letter-exchange': 'amq.direct',
        'x-dead-letter-routing-key': 'welcome_queue'
    }
)


def send_welcome_message(ch, method, properties, body):  
    address = body.decode('UTF-8')
    print("Sending welcome email to {}".format(address))
    res = requests.post(
        "https://api.mailgun.net/v3/{}/messages".format(DOMAIN),
        auth=("api", MAILGUN_API_KEY),
        data={"from": "Flaskr <noreply@{}>".format(DOMAIN),
              "to": [address],
              "subject": "Welcome to Flaskr!",
              "text": "Welcome to Flaskr, your account is now active!"}
    )
    ch.basic_ack(delivery_tag=method.delivery_tag)
    if res.status_code != 200:
        print("Error sending to {}. {} {}. Retrying...".format(
            address, res.status_code, res.reason
        ))
        retry_channel.basic_publish(
            exchange='',
            routing_key='retry_queue',
            body=address,
            properties=pika.BasicProperties(
                delivery_mode=_DELIVERY_MODE_PERSISTENT
            )
        )

And that’s it! We’ve successfully introduced asynchronous sending into our application using RabbitMQ. To download the full working example source code repository, check out my github repo.

Get more guides like this by subscribing to the blog. You'll get an update each week with the latest posts from the Mailgun team. And if you're not yet using Mailgun to send, receive, and track your application's emails, sign up over here. (Your first 10,000 emails are free every month.)

comments powered by Disqus

Mailgun Stay in Touch

Get new posts delivered straight to your inbox.