Testing with Amazon SQS

We all know how great Amazon SQS is, and here at Mind Candy we use it extensively in our projects.

Quite recently, we started making some changes to our Data Pipeline in order to speed up our Event Processing, and we found ourselves with the following problem: how can we generate thousands of messages (events) to benchmark it? The first solution that came into our minds was to use the AWS Command Line Interface, which is a very nifty tool and works great.

The AWS Command Line Interface SQS module comes with the ability to send out messages in batches, with a maximum of 10 messages per batch, so we said: “right, let’s write a bash script to send out some batches”, and so we did.

Problem

It worked alright, but it had some problems:

  • It was slow; because messages were being sent in batches of up to 10 messages and not in parallel
  • The JSON payload had to contain some metadata along with the same message repeated 10 times (1 for each message entry)
  • If you needed to send 15 messages, you would have to have 1 message batch with 10 entries and another one with 5 entries (2 JSON files)
  • Bash scripts are not the best thing in the world for maintenance

So, what did we do to solve it? We wrote our own command line program, of course!

Solution: meet sqs-postman

Writing command line applications in Node.js is very very easy, with the aid of the good old Commander.js. Luckily, AWS has an SDK for Node.js, so that means that we don’t need to worry about: AWS authentication, SQS API design, etc. Convenient? Absolutely!

Sqs-postman was designed with the following features out of the box:

  • Sends messages in batches of up to 10 messages at a time (AWS limit)
  • Batches are sent out in parallel using a default of 10 producers, which can be configured using the –concurrent-producers option
  • A single message is read from disk, and expanded into the total number of messages that need to be sent out
  • It supports AWS configuration and profiles

In order to solve the “messages in parallel” problem, we used the async library. We basically split the messages into batches and we then use eachLimit to determine how many batches can be executed in parallel, which starts with a default value of 10 but can be configured with an option.

Can I see it in action?

Of course you can! sqs-postman has been published to npm, so you can install it by running:

 npm install -g sqs-postman

Once installed, just follow these simple steps:

  • Make sure to configure AWS
  • Create a file containing the message, i.e. message.json with a dummy content
    {
       "message": "hello from sqs-postman"
    }
  • Run it
    $ postman message my-test-queue --message-source ./message.json --concurrent-producers 100 --total 1000

If you would like to see more information, the debug mode can be enabled by prepending DEBUG=sqs-postman postman…

Text is boring, show me some numbers!

You are absolutely right! If we don’t share some numbers, it will be hard to determine how good sqs-postman is.

Messages aws-cli sqs-postman
100 0m 4.956s 0m 0.90s
1000 2m 31.457s 0m 4.18s
10000 8m 30.715s 0m 30.83s

As you can appreciate, the difference in performance between aws-cli and sqs-postman is huge! Because of sqs-postman’s ability to process batches in parallel (async), the execution time can be reduced quite considerably.

These tests were performed on a Macbook Pro 15-inch, Mid 2012 with a 2.6 GHz Intel Core i7 Processor and 16 GB 1600 MHz DDR3 of RAM. And time was measured using Unix time.

Conclusion

Writing this Node.js module was very easy (and fun). It clearly shows the power of Node.js for writing command line applications and how extensive the module library is when it comes to reusing existing modules/components (e.g. AWS SDK).

The module has been open sourced and can be found here. Full documentation can be found in there too.

As usual, feel free to raise issues or better yet contribute (we love contributors!).

Event Processing at Mind Candy

At Mind Candy we want to build great games that are fun and that captivate our audience. We gather a great deal of data from all of our products and analyse it to determine how our players interact with our games, and to find out how we can improve. The vast majority of this data consists of ‘events’; a blob of json that is fired by the client or server in response to an interesting action happening in the game.

This blog post is about the approach that we have taken at Mind Candy to gather and process these events, and scale the systems into the cloud using fluentd, akka, SQS, Redshift and other AWS Web Services.

What is an event?

From our point of view, an event is any arbitrary valid json that is fired (sent) to our Eventing service via a simple REST api.

When an event is received, it is enriched with some additional fields which includes a ‘fired_ts’ of when the event was received, a unique uuid and, importantly, the game name, version, and event name taken from the endpoint. These three together form what we call the ‘event key’.

This service is extremely lean, and does not itself expect or enforce a rigid taxonomy. It simply writes the enriched events to disk. As a result, the service is incredibly easy to scale and to achieve high availability.

Validation and processing

We then use fluentd, an open source data collector and aggregator, to take the enriched data written to disk and place it onto an SQS queue. Currently, we use a single queue (per environment) which receives data from many different eventing servers.

Once all that data is arriving on the queue, we need to do something useful with it! This is where our home grown event processing engine, Whirlpool, comes into play.

Whirlpool is a scala and akka based service which retrieves messages from SQS and processes and validates them accordingly. It uses a variant of the akka work-pull pattern with dedicated workers for pre-fetching, processing, and writing events, communicating with a master worker. The number of workers and other parameters can be tweaked for maximum throughput.

Where does the metadata for processing come from? We have a shared ‘data model’ which contains information on what an event should look like for a specific game and version. This is essentially a scala library that reads from a backing Postgres store.

The structure of that schema is (simplified):

Screen Shot 2014-07-25 at 15.49.50

An event field is a single field to be found in the json of the sent event. It has a number of different properties, for example whether it is mandatory or not, and whether it should be expanded (exploded out into multiple events), and the json path to where that field should be expected. The point of the eventversion table is to provide a history, so that all changes to all events are recorded over time so we have a rollback, as well as an audit trail for free.

An event destination configures where an event should end up in our warehouse. It can be copied to any number of schemas and tables as we require.

Whirlpool retrieves the metadata for an event based on the extracted event key. It then passes the event through a series of validation steps. If it fails at any level, the reason why is recorded. If it completes all validations, the event can be processed as expected.

The processMessage function looks like this:

Screen Shot 2014-07-25 at 16.49.28

We use Argonaut as our JSON processing library. It is a fully functional library written in Scala that is very nice to work with, as well as having the added benefit that our resident Mind Candy, Sean, is a contributor!

After our events have been validated, they are either a successful event for a particular game and version, or a failure. At this point we make use of fluentd again with a modified version of the Redshift plugin to load them into our Redshift data warehouse. Here they are available for querying by our data scientists and data analysts. Typically, the period from an event being received to being queryable within the data warehouse is measured in seconds, and in any case within a couple of minutes in normal cases.

Configuring events

To actually setup the metadata for what constitutes an event, we have created a simple GUI that can be accessed by all game teams. Any changes are picked up within a few minutes by Whirlpool, and those events will start to flow through our pipeline.

We also needed to solve one large problem with the configuration, namely: “How do you avoid having to create a mapping for every single game version when the events haven’t changed, and how do you accommodate for changes when they do occur?”

It took us a while to find a nice balance for solving this, but what we have now is a mapping from any POSIX regex which is matched against an incoming game version, to a specific version that should be used for retrieving the metadata (this is the purpose of the ‘configmapping’ table in the schema). So, when we release 1.0 of our game, we can create metadata that applies to “1.x”. If in version 1.5 we introduce a new event, we can create a new config at that point to apply to all later versions, while still having versions 1.0-1.4 processed correctly.

Handling Failure

Events can fail for a large variety of reasons. Currently there are 17 specific types of these, with a couple being:

  • The event is malformed; it does not contain the fields that we expect
  • The event is unknown

A failure is captured by the following class:

Screen Shot 2014-07-25 at 16.49.46

The FailureType here is another case class corresponding to the specific failure that was generated, and the fields contain some additional attributes which may or may not be extracted from the failure.

We treat failures separately from processed events, but they still make their way into Redshift in a separate schema. Each failure contains enough information to identity the problem with the event, which can then be fixed in most cases in the metadata; typically, event failures occur during development, and are a rare occurrence in production.

Scaling our infrastructure

We make heavy use of AWS at Mind Candy, and the eventing pipeline is no exception. All the eventing servers are described via Cloud Formation, and setup in an autoscale group fronted by an ELB. As a result, the number of servers deployed scales up and down in response to rising and waning demand.

The use of SQS also separates out our event gathering and event processing infrastructure. This means that Whirlpool instances do not have to scale as aggressively, as the queue provides a natural buffer to iron out fluctuations in the event stream due to peaks of traffic. For Redshift, we have a 6XL node cluster which we can scale up when required, thanks to the awesome features provided by Amazon.

Performance

We’ve benchmarked each of our eventing servers comfortably processing 5k events/sec, on m1.medium instances.

Whirlpool does a little more work, but we are currently running a configuration offering a sustained rate of just over 3k events/sec per instance, on c1.medium instances, with a quick ramp up time.

Instances of both Eventing and Whirlpool operate independently, so we scale horizontally as required.

Screen Shot 2014-07-25 at 16.24.54

The Future

We have real-time dashboards that run aggregations against our event data and display it on screens around the office. It’s very useful, but is only the first incarnation. Currently we’re working on streaming processed events from Whirlpool into Spark via Kafka, to complete our lambda architecture and greatly reduce the load on our Redshift cluster. We’re also improving the structure of how we store events in Redshift, based on our learnings over the last year or so! At some point when we have more time, we would also like to open-source Whirlpool into the community.