When you have a Big Data solution that relies upon a high quality, uninterrupted stream of data for it to meet the client’s expectation you need a solution in place that is extremely reliable and has many points of fault tolerance. That all sounds well and good but how exactly does that work in practice?
Let me start by explaining the problem. About 2 years ago our team was asked to spike a streaming service that could stream billions of events per month to Google’s BigQuery. The events were to come from an endpoint on our existing Apache web stack. We would be pushing the events to BigQuery using an application written in PHP. We did exactly this, however, we were finding that requests to BigQuery were taking too long and thus resulted in slow response times for users. So we needed to find a solution to Queue the events before sending them to BigQuery.
With time as a constraint and our biggest priority being a really low latency we decided to implement this using a Redis queue, more specifically we used a cluster of Amazon EC2 instances running Redis. The web application would connect to a Redis instance via a load balancer and then a script running on the same instance would pick up the events and push them to BigQuery (note that Elasticache was not used). This solution was great, it reduced the latency for the users and allowed the events to be available in BigQuery within a few seconds. This was exactly the solution we were after.
Now, that was 2 years ago. Let’s take a look at how things have changed over the following 2 years. The application was still kicking along but we have noticed a few things.
- BigQuery’s streaming API fails (A LOT!!) which cause a queue backlog and lost events.
- When one of our Redis instances goes down, we lose the backlog events.
- When the backlog on a Redis instance is high other instances don’t help to reduce the backlog so scaling up doesn’t help.
- We are receiving a lot more traffic.
- We are actually using the data, so reliability is a major concern.
It, therefore, became clear to us, that we would need a single queue that could scale with increased traffic and withstand multiple points of failure. We needed to be able to retain the events for a significant period of time when BigQuery goes down. Since all of our infrastructure (minus BigQuery) was in AWS the logical solution was to use Amazons SQS. It provided everything we need. It’s a queue that offers low latency and most importantly, fault tolerance. We could have multiple consumers reading from the same queue without any need to manage concurrency between producers or consumers.
Our new solution would consist of:
- SQS queue, to be used as a queue and temporary storage of events.
- Web servers which generated the events from user events and user data.
- Queue processing application, which would consume the events from the queue and wrote events to BigQuery.
SQS is a queue service that is fully managed by AWS. It is simple to use, all that is required is to create a queue, set up the permissions and then start pushing events to the queue. The rest is completely managed by the service. The great thing about SQS is the visibility timeout feature. Simply put, when a message is read from the queue it will no longer be visible until the visibility timeout period is up and if the message is deleted it will never be visible in the queue again. So for our solution we read some events from the queue, push them to BigQuery and once successful we delete them from the queue. This ensures that no other nodes processing the queue will get the same message unless a push to BigQuery fails and the visibility timeout is up. In such a case another node can process the event without duplicating or losing the event.
Apache Webservers (Event Producers)
When you have a user-facing application, it needs to be fast and to scale to demand or the experience for your users could be severely impacted. You will need to consider exactly how much of an impact changing the queue will have. I wasn’t concerned with how long it took to push events to the queue, as such, I was more concerned with how much of an overall impact it had on response times. We need our average response time to be sub 200ms which means under an average load we should be aiming for under 100ms response times. So we ran some tests and found the following:
- Under relatively moderate traffic we were getting under 100ms response times.
- Under increased high traffic we were seeing response times over 100ms.
- Under really high traffic we were seeing more than 1 second response times.
- All tests produced a higher CPU utilization while pushing to SQS compared to Redis.
Overall the test results were not quite as good as when using SQS, however, it was something that was able to be managed by the scaling policy that we have in place on the web stack. So with a good autoscaling policy, we were able to maintain good performance across our webstack.
Now that we are pushing our events to an SQS queue the next step is to consume the events and push them to BigQuery. As with our previous solution, we had all of the queue processors running on the same servers as the queues themselves. However, since we are now using SQS we need to find another solution. I considered using AWS lambda functions however I saw 2 constraints with lambdas when it comes to this solution:
- At the time they were not available in the Sydney region.
- They can’t be triggered by events in SQS.
The first point is kind of moot now since lambdas are available in Sydney, however, since we completed our solution about a week before lambdas were available in Sydney we couldn’t see any major advantage of using them. With regard to the second issue, there is no practical way to trigger lambdas to consume events from SQS. Sure you could trigger a lambda every x period of time but this is not a scalable solution. You would need to then have another lambda monitoring the queue size to ensure that more lambdas are triggered when the queue size increases. It would be convenient if you could trigger lambda invocations based on the queue size but we might have to wait a while for that feature if it ever appears at all.
So with this in mind, the next most obvious step is to have an application running on EC2 instances which simply reads from the queue, pushes to BigQuery then deletes the message from the queue if there are no errors. Then we can scale up the number of instances based on the backlog in the SQS queue. This can be done by setting a cloud watch alarm on the number of events in the queue backlog which would trigger scale up events until the backlog is reduced to an acceptable level. This would ensure that any increased load would have no impact on our data processing.
We get between 5k to 25k events per minute depending on the times of day and the campaigns we have on a particular day. We are also expecting a further increase in the number of events we receive. So a queue that can scale reliably is obviously very important to us.
Let’s go back to the one of the reasons for implementing this solution – The increasing number of failing inserts into BigQuery. Exactly how many inserts into BigQuery are failing? The chart below shows the number of failed streaming inserts into BigQuery in a given 6 hour period over the past 7 days. This demonstrates a very consistent failure rate in inserting records into BigQuery, thus the need for a highly available and robust queue.
In addition to this consistent failure rate, back on July 25th 2016 we got to see just how much of an advantage using SQS was. Bigquery’s streaming API started to experience a very high failure rate wich caused our SQS queue to back up to almost 1 million records. In this case, our queue consumer was able to scale up and get the queue back down once BigQuery had recovered.
BigQuery is a great product, it’s fast and scalable however it is lacking a bit when it comes to reliability. We had an application that did the job, it achieved a result that we could use. However, as traffic grew and the requirement for increased reliability became more important we needed a more robust and reliable solution in place that could guarantee, to a higher degree of certainty that we would not lose any events so using SQS as a queue became an obvious choice.
With having a single, distributed queue solution that is able to scale and orchestrate the consumption of events between multiple nodes we were able to implement a highly scalable solution without losing events. I am now very confident that our solution will be able to handle our traffic into the future as we start to get more events without the need for any changes to the core application.