License to Queue

License to Queue

licenceToKill

A few months ago I joined an exciting project with fellow Shiner Graham Polley, who you might know from such hits as Put on your streaming shoes. This is a follow-up article, discussing the elegant way in which we solved a hideous asynchronous limitation in PHP.

My role on the project was DevOps-based, and I was there to build some infrastructure using Amazon Web Services. As the cool kids would put it, I was there to put our client’s enterprise application INTO the cloud, or, more succinctly, to build a solution coupling services from two rival cloud service providers and provide a new league of scalability and flexibility.
The solution was pretty simple, but, like any simple solution, the little complexities come out along the way, and when you’re least expecting them.

At the time, I had already read Graham’s blog post about streaming data into Google’s BigQuery, and so somewhere in the deep, dark catacombs of my mind, I knew that the PHP async issue was lurking, waiting for an opportune moment to rear its ugly head.

Put simply, if you need to perform a HTTP request using PHP, which is exactly what the BigQuery client library for PHP does for every database insert, the client browser is held up until the TCP session has completed. PHP offers no graceful method for performing an asynchronous, fire-and-forget request.

It wasn’t until some point in our preliminary load testing of the Apache web servers that I stumbled upon the notorious PHP asynchronous impossibility for myself. I had been sifting through a few relatively heavy-hitting PHP scripts that seemed to be causing much more load than they should, and I happened upon a section of code that… *deep breath* opens a socket back to the local Apache HTTP server to perform a HTTP request of another PHP file, which is sitting in the same directory as itself.

Fortunately, despite all of the commotion and shock, I managed to capture a selfie, which I’ve included here to illustrate to you the raw emotion I felt at the time:

Man-crying-in-love-1

The shock to my system caused me to lose all memory of Graham’s earlier blog, and he had to explain it to me all over again. Why had such a hack been implemented in a production environment?

In short, PHP.

Hint: Now is a good time to skim through Graham’s post again if you need to freshen your memory.

A little help from Q

Q_pen

Over the next day or two, this issue kept bugging me. I felt dirty knowing that this hack was present in the shiny new environment that I’d built. It’s the kind of stink that, no matter how long you sit in it, your nose still doesn’t adjust.

We needed a method of pumping those requests somewhere into a queue, to be processed independently of the client-side browsing experience. If it took several seconds for a BigQuery insert process to return, that should not be reflected in the front-end user experience.

Fortunately, a solution presented itself quite quickly: Redis. We were already using it on other parts of the project and, while you might know it best as an in-memory key store, it can also be used as a queuing mechanism. Being in-memory was OK with us as we didn’t really need data persistence – it’s intended to be entirely transient. Consequently, setting up our queue was super-simple.

Redis has a really neat pub/sub feature, where you can simply subscribe to a named ‘channel’ in Redis, and listen. A client will ‘publish’ to the channel, and  your listener will immediately be sent the data. Awesome, but there is a catch. Messages published to a channel while a listener is not listening are lost forever. This system is intended to be handling events in real-time, so pub/sub was given serious consideration, but ultimately decided against for this reason.

Instead, we chose Redis’ push/pop mechanism, which is an actual queue.

Push and Pop

Push and pop are pretty standard methods for managing a FIFO stack. Push (rpush) will add a message to the right side of the stack, and pop (lpop) will return the left-most value and remove it from the stack. It’s a queue.

With this method, you need to loop and poll each queue yourself, but it does afford a pretty important feature: It will maintain a backlog of messages if they are not processed in time. This allows us to sustain spikes in load, or if the queue processor script is down temporarily it can catch up.

It seemed like a pretty natural logical separation to go with one queue per BigQuery table, and makes for good looking code on the client side (using php-redis):

$redis->rpush("table", "data");

Simple!

My weapon of choice

The next decision was another easy one. What will I use to write the queue processor?

Google offers a pretty mature client library for both Java and Python, which means really simple authentication and inserting into BigQuery using either language.

bond

Python was my weapon of choice to write the queue processor. It offers a pretty decent client library for each component in my solution, and it was really quick to get a working prototype.

Note: There are BigQuery client libraries for a few other languages, all of which are either in alpha, beta or weren’t really applicable my situation.

Performance

This little engine started off pretty simple, but it has a lot of responsibility and some pretty hefty performance requirements, so as it was developed and tested, things got a bit more complex. Currently we are handling roughly 1000 inserts per minute, and we are barely out of proof-of-concept stage. This will be scaled up a lot more, and therefore needs to be able to handle itself – without solving a code problem using infrastructure, that is.

Parallelism

First of all, multithreading is an absolute must. It needs to handle many simultaneous HTTP sessions, and without this, an enormous backlog would be created within seconds.

For example, if a single transaction with BigQuery took 1500ms (1.5 seconds), with a single threaded queue processor we would reach a maximum of 40 requests per minute. Not even close.

The threading Python library makes writing a multithreaded application a simple exercise.

Coalesced inserts

Even with multithreading, and a large number of concurrent threads working away, performance was just not bad. The queue processor and the system looked like it was working reasonably hard, but it was only just keeping up with the load.

One database row insert per thread is pretty wasteful, though. The majority of the turnaround time for each insert is protocol overhead.

The data format for BQ inserts is simple JSON, and a quick glance at an example makes it obvious that multiple rows are not only allowed, but encouraged:

{
    "rows": [
        { "json": { ... } },
        { "json": { ... } },
        { "json": { ... } }
    ]
}

By coalescing multiple row inserts – multiple Redis queue messages – into one HTTP request, throughput skyrocketed and the number of concurrent threads dropped. The Python code became a bit more complex to implement this, but it was definitely worth it.

The queue processor will bank up messages from each Redis queue until a threshold has been reached, and then they will be coalesced and dispatched to BigQuery in one go. In my implementation, there are actually three triggers that will cause these banks to be emptied, and messages dispatched to BigQuery:

  1. The bank reaches a max value (currently 24 messages)
  2. The number of active threads drops below a lower threshold (currently 6 threads)
  3. A cleaner thread runs an unconditional dispatch every 1 second. This simply ensures nothing gets left in the queue in an unexpected situation

So far we’ve been able to process a backlog of 8192 inserts in around 1min 30sec, or ~5460 inserts per minute, using the smallest Amazon EC2 instance available (t2.micro).

This coalesce method does mean that a message, after being picked up from the queue, is sitting in memory a fraction longer, but it is really only a matter of milliseconds in this case. A delay that was deemed acceptable.

Going sideways

Now that we have the queue processor running reasonably fast, it’s time to think a little more about scalability and resiliency. Adding more/faster CPUs will help to a point, but the returns are diminishing, and not practical; changing the server profile in AWS requires a shutdown of the host.

In addition to scalability, I need some sort of resiliency. If the server falls in a heap, any new row inserts will be simply lost, along with any backlog that has accumulated in Redis. That’s the inherent nature of this asynchronous, fire-and-forget solution that we’ve been searching for, though. So then, what’s the plan?

Fortunately, this problem is pretty easily solved once again. Scaling this solution horizontally makes complete sense, and requires no alteration to the existing set up. Just a few more EC2 instances running an identical configuration and an Elastic Load Balancer sitting in front. Done.

This proved to be a really solid way of increasing throughput, and provides some safety for our data, too.

I’ll get you next time, Bond

travelyan

What’s next? Well, a nice way to avoid losing in-flight messages if a system failure occurred would be good. At the moment we just avoid a backlog like my dog avoids bath time. If you’ve got ideas, I’m all ears!

This was a fun problem to solve, and I’ve learnt a good lesson in the process: Taking a step back from a problem that you’re trying to solve is always advantageous. It is very easy to become blinkered in your approach, and to try to solve a problem using whatever tools are immediately in front of you, even if it’s not the right tool for the job.

Moving to AWS also opened some doors for us and allowed us to be creative with the approach to this issue. Before the migration, our toolset was very limited.

I’m super keen to hear from anyone else out there tackling a similar issue, and the measures you’ve taken to solve the problem or to work around it. If you’ve worked on something like this, drop me a note to say hello!

7 Comments
  • g2-8995997ddd66f90460b037839b8a0bdc
    Posted at 19:39h, 22 December Reply

    It’s likely a large party of the 1.5 seconds is due to reestablishing the the connection to google each time. Either use a library that supports persistence or stick Apache traffic server or nginx in between. Both of them can also be configured to give you those fire and forget semantics you are looking for. Though batching will beat that.

    • Shane Neubauer
      Posted at 08:49h, 23 December Reply

      Hey there! Great ideas, I haven’t used nginx like that before. I did want to shift the load away from the web servers, so implementing persistent connections on the web servers themselves wasn’t my favourite option.
      Incidentally, the ‘httplib2’ Python library does inherently persist HTTP connections. I’m sure played a part in the performance I get. Though, I found the largest gain in batching up the inserts.
      Thanks for the comment!

  • Brian
    Posted at 22:19h, 22 December Reply

    Just wondering. Why did you not use Amazon’s SQS service. It is fully managed, scales, and can retain messages for days if something fails. It has sdks in several languages and you are already using Amazon services?

    • Shane Neubauer
      Posted at 09:04h, 23 December Reply

      Hey Brian, it did cross my mind, but Amazon SQS just does not perform as well as having the queue stored in memory on the same host as the queue processor.

      For inserting into the queue, I performed a few very crude measurements, and while Redis took around ~0.2sec, an SQS message send took around ~0.4sec. It all adds up.

      Though, the biggest performance difference would be found in polling for messages in the queues. Checking a local instance of Redis, I can loop and use lpop or blpop and the turnaround time is almost negligible. Accepting a message from an SQS queue takes about ~0.4sec again. It’s the latency and overhead that is the killer.

      I am using Amazon SQS in other areas of the project, and I think it’s great – just not designed for this kind of work.

      Thank you for the comment! I appreciate the feedback.

  • anujspandit
    Posted at 21:18h, 07 January Reply

    Hi Shane

    Thanks for this very insightful post. We are in the process of using big-query and the MQ solution in use is ‘Rackspace Cloud Queues’ because it allows us to pick a message and add a TTL so no one else can claim the message unless a processor fails to process the message and delete it within the TTL. Our message volumes are very low and hence there are no issues at the moment.

    After reading your post I was looking at potential replacements for our MQ and came across BeanStalk – it seems to support a message status called ‘reserved’. So if a message is not processed by the processor, it will be available after a period of time to other processors.

    I was wondering if Beanstalk is an option worth considering over Redis as far as the MQ is concerned? Never used it or Redis before but would be great to have your thoughts.

    Anuj

    • Shane Neubauer
      Posted at 09:47h, 08 January Reply

      Hey Anuj,

      I love the simplicity of Beanstalk, and it would work equally well.

      One of the big reasons Redis was chosen over other technologies is we already had the PHP Redis client library installed and in-use on the web server fleet, so there were no changes required there. Just an update to the PHP scripts themselves.

      Beanstalk is actually very nice, and if you have no other factors in your decision, I can’t think of any arguments against choosing it!

      I’m curious to know how the Rackspace Cloud Queues solution performs? I have been asked why I did not choose AWS SQS, and the reason is simply: ‘performance’.

      Thanks for your comment!

      • Anuj S Pandit (@anuj_s_pandit)
        Posted at 16:28h, 09 January

        Hi Shane

        Well, our message volumes are pretty low and we wanted to switch from Microsoft’s MSMQ to something which required no management or maintenance from our team. Also, something over HTTP – a service. Because we were on Rackspace already, Cloud Queues with a almost-RESTFUL API and the 300 msg/sec limit was more than enough for us to make a decision.

        Digging a bit deeper revealed that Cloud Queues is actually Marconi in action, an OpenStack project. So what I gather is that if we wish, we could deploy it on our own openstack infrastructure and things will perform better (low latency?).

        Interestingly, Marconi uses Mongodb for storage and there is an option to used Redis as the backend (see https://github.com/cabrera/marconi-redis). So Marconi is a wrapper of sorts from what I understand (an opensource alternative to SQS and SNS by Amazon).

        Regards
        Anuj

Leave a Reply

Discover more from Shine Solutions Group

Subscribe now to keep reading and get access to the full archive.

Continue reading