Migrating to AWS SQS from ActiveMQ

After years of using ActiveMQ 5.x in our Java-based production system, we decided to migrate to the Amazon Simple Queue Service (SQS). Managing and configuring ActiveMQ no longer seemed to make sense, especially when SQS does it for us. Moreover, SQS recently added FIFO support. Messages in SQS FIFO queues are delivered in order and are guaranteed to be processed only once. These features are exactly what we needed to replace ActiveMQ with SQS and ensure that our backend processes would function as expected.

Considerations

Before migrating to SQS, here are some important considerations:

  • JMS support is limited to queues, i.e., point-to-point.
  • Message size is limited 256 KB.
  • Message content is restricted to text only, i.e., XML, JSON, and unformatted text. JMS’s MapMessage and StreamMessage interfaces are unsupported.
  • Message durability is constrained to a maximum of 14 days (defaults is 4 days).
  • JMS selectors for filtering messages are unsupported.

Overview

My company Rodax Software provides a propriety RESTful API for Skedi—a calendar aggregation app for families and teams. The API has many long running tasks, so we use queues to manage the workflow while providing fast HTTP responses to our mobile clients.

Our messaging architecture includes queues that are consumed by server-side components and remote mobile clients. Since migrating the mobile clients to SQS would require code changes and an App Store update, we decided to first migrate the server queues.

Code Migration

Before diving into the code, we also had to consider SQS’s JMS compliance. Thankfully, SQS supports most of the JMS 1.1 specification for queues, click here for more information about using JMS with Amazon SQS.

Given the JMS queue support in SQS, the server-side code changes were relatively painless. So, lets take a look at them, which includes accessing a JMS connection, starting a JMS listenersending a JMS message, and converting MapMessage to ObjectMessage.

Accessing a JMS connection

The primary difference between instantiating a JMS connection in SQS and ActiveMQ is that SQS requires credentials and the region where the queues are located.

//Before
Connection getJMSConnection() throws NamingException, JMSException {
	if(connection == null) {
		String url = "tcp://localhost:61616";
		ActiveMQConnectionFactory factory;
		factory = new ActiveMQConnectionFactory(url);
		connection = factory.createConnection();
	}
	return connection;
}

//After
Connection getSQSConnection() throws JMSException {
	AWSCredentials credentials;
	credentials = DefaultAWSCredentialsProviderChain.getInstance().getCredentials();
	AWSStaticCredentialsProvider credProvider;
	credProvider = new AWSStaticCredentialsProvider(credentials);

	Regions region = host.equals(PRODUCTION_HOST) ? 
	                            Regions.US_WEST_2 : Regions.US_EAST_2;
	SQSConnectionFactory factory;
	factory = new SQSConnectionFactory(new ProviderConfiguration(),
			AmazonSQSClientBuilder.standard()
			.withRegion(region)
			.withCredentials(credProvider));
	return factory.createConnection();
} 

Note: Using the DefaultAWSCredentialsProviderChain class, which is the preferred method for accessing AWS credentials.

Starting a JMS Listener

Unlike ActiveMQ, a SQS JMS listener must be listening to a queue that actually exists; therefore, before we start listening, we ensure that the queue exists.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
//Before
public void start() throws NamingException, JMSException  {

	log.info("Starting message listener...");

	connection = getJMSConnection();

	Session session;
	session  = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

	Destination destination = session.createQueue(queue.getQueueName());
	MessageConsumer consumer = session.createConsumer(destination);
	consumer.setMessageListener(this);

	connection.start();		
}

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
//After
public void start() throws NamingException, JMSException {

    log.info("Starting message listener...");

    connection = getJMSConnection();

    Session session;
    session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

    String queueName = getQueueName();
    if (connection instanceof SQSConnection) {
        //AWS SQS requires queues to be created before first use
        createSQSFifoQueue(queueName, (SQSConnection) connection);
    }

    Destination destination = session.createQueue(queue.getQueueName());
    MessageConsumer consumer = session.createConsumer(destination);
    consumer.setMessageListener(this);

    connection.start();

    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        log.error("Thread sleep error", e);
    }
}

public static void createSQSFifoQueue(String queueName, SQSConnection connection) throws JMSException {
    if (queueName.endsWith(".fifo")) {
        // Get the wrapped client
        AmazonSQSMessagingClientWrapper client = connection.getWrappedAmazonSQSClient();

        // Create an Amazon SQS FIFO queue, if it does not already exist
        if (!client.queueExists(queueName)) {
            Map < String, String > attributes = new HashMap < > ();
            attributes.put("FifoQueue", "true");
            attributes.put("ContentBasedDeduplication", "true");
            client.createQueue(new CreateQueueRequest().withQueueName(queueName).withAttributes(attributes));
        }
    } else {
        throw new ContextedRuntimeException("Invalid FIFO queue name")
            .addContextValue("queueName", queueName);
    }
}

Lines 11-15 above were added because Amazon requires the queues to be created before first use. Lines 23-27 were to added to force the main thread to wait for one second after starting the listener. Lines 30-46 is a new method to create the FIFO before we start listening.

Sending a JMS Message

Sending a message to a FIFO queue requires only one additional line of code to set the message group ID.

// Create the text message
TextMessage message = ...

// To send a message to a FIFO queue, you must set the message group ID.
message.setStringProperty("JMSXGroupID", "Default”);

For more information about these changes click here.

Converting MapMessage to ObjectMessage

SQS doesn’t support  MapMessage; therefore, we simply refactored by converting our MapMessage objects to HashMap<String, String> instances. Note: An ObjectMessage object must implement Serializable.

//Before
Message createMessage(Session session) throws JMSException {
        MapMessage mapMessage = session.createMapMessage();
        mapMessage.setString("key0", "value0");
        //...
        return mapMessage;
}
//After
Message createMessage(Session session) throws JMSException {
        HashMap<String, String> mapMessage = new HashMap<>();
        mapMessage.put("key0", "value0");
        //...
        return session.createObjectMessage(mapMessage);
}

Voilà! The next thing we have to do is to migrate our mobile clients using the ActiveMQ Stomp protocol and JMS selectors. Replacing the selector filtering will most likely require architectural changes. In any case, once we’ve completed the mobile migration, I’ll blog about it here too.

To conclude, it’s worth noting that these SQS migration changes should work with any JMS compliant codebase. Let me know if you have any experiences or thoughts to share.

Advertisements
Migrating to AWS SQS from ActiveMQ

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google+ photo

You are commenting using your Google+ account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s