19 Aug 2016 High availability, low latency streaming to BigQuery using an SQS Queue.
When you have a Big Data solution that relies upon a high quality, uninterrupted stream of data for it to meet the client’s expectation you need a solution in place that is extremely reliable and has many points of fault tolerance. That all sounds well and good but how exactly does that work in practice?
Let me start by explaining the problem. About 2 years ago our team was asked to spike a streaming service that could stream billions of events per month to Google’s BigQuery. The events were to come from an endpoint on our existing Apache web stack. We would be pushing the events to BigQuery using an application written in PHP. We did exactly this, however, we were finding that requests to BigQuery were taking too long and thus resulted in slow response times for users. So we needed to find a solution to Queue the events before sending them to BigQuery.
With time as a constraint and our biggest priority being a really low latency we decided to implement this using a Redis queue, more specifically we used a cluster of Amazon EC2 instances running Redis. The web application would connect to a Redis instance via a load balancer and then a script running on the same instance would pick up the events and push them to BigQuery (note that Elasticache was not used). This solution was great, it reduced the latency for the users and allowed the events to be available in BigQuery within a few seconds. This was exactly the solution we were after.
Now, that was 2 years ago. Let’s take a look at how things have changed over the following 2 years. The application was still kicking along but we have noticed a few things.
- BigQuery’s streaming API fails (A LOT!!) which cause a queue backlog and lost events.
- When one of our Redis instances goes down, we lose the backlog events.
- When the backlog on a Redis instance is high other instances don’t help to reduce the backlog so scaling up doesn’t help.
- We are receiving a lot more traffic.
- We are actually using the data, so reliability is a major concern.
It, therefore, became clear to us, that we would need a single queue that could scale with increased traffic and withstand multiple points of failure. We needed to be able to retain the events for a significant period of time when BigQuery goes down. Since all of our infrastructure (minus BigQuery) was in AWS the logical solution was to use Amazons SQS. It provided everything we need. It’s a queue that offers low latency and most importantly, fault tolerance. We could have multiple consumers reading from the same queue without any need to manage concurrency between producers or consumers.
Our new solution would consist of:
- SQS queue, to be used as a queue and temporary storage of events.
- Web servers which generated the events from user events and user data.
- Queue processing application, which would consume the events from the queue and wrote events to BigQuery.
SQS is a queue service that is fully managed by AWS. It is simple to use, all that is required is to create a queue, set up the permissions and then start pushing events to the queue. The rest is completely managed by the service. The great thing about SQS is the visibility timeout feature. Simply put, when a message is read from the queue it will no longer be visible until the visibility timeout period is up and if the message is deleted it will never be visible in the queue again. So for our solution we read some events from the queue, push them to BigQuery and once successful we delete them from the queue. This ensures that no other nodes processing the queue will get the same message unless a push to BigQuery fails and the visibility timeout is up. In such a case another node can process the event without duplicating or losing the event.
Apache Webservers (Event Producers)
When you have a user-facing application, it needs to be fast and to scale to demand or the experience for your users could be severely impacted. You will need to consider exactly how much of an impact changing the queue will have. I wasn’t concerned with how long it took to push events to the queue, as such, I was more concerned with how much of an overall impact it had on response times. We need our average response time to be sub 200ms which means under an average load we should be aiming for under 100ms response times. So we ran some tests and found the following:
- Under relatively moderate traffic we were getting under 100ms response times.
- Under increased high traffic we were seeing response times over 100ms.
- Under really high traffic we were seeing more than 1 second response times.
- All tests produced a higher CPU utilization while pushing to SQS compared to Redis.
Overall the test results were not quite as good as when using SQS, however, it was something that was able to be managed by the scaling policy that we have in place on the web stack. So with a good autoscaling policy, we were able to maintain good performance across our webstack.
Now that we are pushing our events to an SQS queue the next step is to consume the events and push them to BigQuery. As with our previous solution, we had all of the queue processors running on the same servers as the queues themselves. However, since we are now using SQS we need to find another solution. I considered using AWS lambda functions however I saw 2 constraints with lambdas when it comes to this solution:
- At the time they were not available in the Sydney region.
- They can’t be triggered by events in SQS.
The first point is kind of moot now since lambdas are available in Sydney, however, since we completed our solution about a week before lambdas were available in Sydney we couldn’t see any major advantage of using them. With regard to the second issue, there is no practical way to trigger lambdas to consume events from SQS. Sure you could trigger a lambda every x period of time but this is not a scalable solution. You would need to then have another lambda monitoring the queue size to ensure that more lambdas are triggered when the queue size increases. It would be convenient if you could trigger lambda invocations based on the queue size but we might have to wait a while for that feature if it ever appears at all.
So with this in mind, the next most obvious step is to have an application running on EC2 instances which simply reads from the queue, pushes to BigQuery then deletes the message from the queue if there are no errors. Then we can scale up the number of instances based on the backlog in the SQS queue. This can be done by setting a cloud watch alarm on the number of events in the queue backlog which would trigger scale up events until the backlog is reduced to an acceptable level. This would ensure that any increased load would have no impact on our data processing.
We get between 5k to 25k events per minute depending on the times of day and the campaigns we have on a particular day. We are also expecting a further increase in the number of events we receive. So a queue that can scale reliably is obviously very important to us.
Let’s go back to the one of the reasons for implementing this solution – The increasing number of failing inserts into BigQuery. Exactly how many inserts into BigQuery are failing? The chart below shows the number of failed streaming inserts into BigQuery in a given 6 hour period over the past 7 days. This demonstrates a very consistent failure rate in inserting records into BigQuery, thus the need for a highly available and robust queue.
In addition to this consistent failure rate, back on July 25th 2016 we got to see just how much of an advantage using SQS was. Bigquery’s streaming API started to experience a very high failure rate wich caused our SQS queue to back up to almost 1 million records. In this case, our queue consumer was able to scale up and get the queue back down once BigQuery had recovered.
BigQuery is a great product, it’s fast and scalable however it is lacking a bit when it comes to reliability. We had an application that did the job, it achieved a result that we could use. However, as traffic grew and the requirement for increased reliability became more important we needed a more robust and reliable solution in place that could guarantee, to a higher degree of certainty that we would not lose any events so using SQS as a queue became an obvious choice.
With having a single, distributed queue solution that is able to scale and orchestrate the consumption of events between multiple nodes we were able to implement a highly scalable solution without losing events. I am now very confident that our solution will be able to handle our traffic into the future as we start to get more events without the need for any changes to the core application.
sgtfoleyisthemanPosted at 17:23h, 20 August
I am unfamiliar with BigQuery’s API. Is it more efficient to submit batches of events?
If so, I think it would be better to put events to Kinesis and use Lambda to read data off these shards to push to BigQuery in batches. No EC2 instances required.
The other advantage here is that Kinesis is strictly time-ordered within a shard(SQS has no order guarantees, I dunno if you care about this) and also has better guarantees around ‘exactly-once delivery'(SQS has ‘at least once’ delivery semantics’. From the quote “This ensures that no other nodes processing the queue will get the same message” it sounds like you care about this)
Darren CibisPosted at 10:24h, 22 August
Hi, Thanks for your comment, you have raised some good points.
We did consider Kinesis as an option however there were a few reasons we decided to go with SQS. Fault tolerance is a big consideration for us. SQS provide a higher level of fault tolerance. Also with kinesis you need to manage the number of shards and hash ranges yourself. This wouldn’t have been a big deal but there is no need to manage anything like this with SQS. In the case of a message being delivered ‘at least once’ – it is considered a rare case where it is delivered more than once, however we do provide bigquery with an insert Id which will provide a level of tolerance against duplicate inserts. The order is not particularly important to us. We were finding that most of the records were in order which is fine for our requirements. With regards to batching bigquery inserts we do put them into batches of 10 records since this is a max batch size that can be read from SQS is 10. The performance is better when using higher batch sizes however we are not having any performance issues.
sgtfoleyisthemanPosted at 10:29h, 22 August
Understood. If BigQuery is doing deduping and order doesn’t matter than it sounds like you guys made the right call. It would have been great to hear these in the original post though!
Eric SchmidtPosted at 06:35h, 30 August
Go BigQuery streaming! :0)
Is data completeness a concern?
How are you handling latency in your upstream systems? If you have latency in your up stream e.g. lost connections from your web ends or failures reading SQS – your downstream data availability in BigQuery will be compromised. It’s nice that SQS provides at once processing, but it has no concept of time.
You said that ordering does not matter, so how are you guaranteeing accurate results from your queries against the streaming tables?
Darren CibisPosted at 09:53h, 30 August
Hi Eric, thanks for your question.
Our dataset being less than 100% complete is not a huge concern for us. Having said that, we do need a high degree of completeness. I have found that with this solution there is only a very small discrepancy when comparing the number of events for a given day and the expected number of events by looking at our webserver access logs. This would suggest that the reliability of reading from/writing to SQS is really high.
With regards to SQS having no concept of time, there is no real way to ensure the dataset we are querying has all of the data. We can, however, reduce the impact of this by monitoring the size of the events in our SQS queue when we are running queries. Since our reporting runs just after midnight and the queue size at this time is usually less than 10 backlog and less than 100 in flight our reporting can be run with a high degree of confidence that it has almost all of the events. This is fine for our purposes as it is unlikely they there will be any events delayed beyond a few minutes under this condition.
The worst case scenario would be a failure close to the end of the day when we run our reports. In this case, we would need to access the impact and possibly re-run when we have all of the data in BigQuery.
Eric SchmidtPosted at 06:03h, 31 August
Awesome. Sounds like a solid solution given those bounds. It’s always nice when you have stable event flow 🙂
Although in Alpha it might be worth a look at Google Cloud Functions as it has native trigger support for Cloud Pub/Sub. Thus you could remove the I’d have to understand your total monthly QPS to rationalize if the total is worth it.
I like your ideas of scaling/throttling Lambda/Functions based on backlog or throughput metrics.
Darren CibisPosted at 09:27h, 31 August
Thanks, Cloud Pub/Sub and cloud functions look great however with no GCP data centre in Australia replacing SQS with pub/sub may not be a viable option as it would likely increase latency.