19 Oct 2015 Messages in the sky
One of the projects that I’m currently working on is developing a solution whereby millions of rows per hour are streamed real-time into Google BigQuery. This data is then available for immediate analysis by the business. The business likes this. It’s an extremely interesting, yet challenging project. And we are always looking for ways of improving our streaming infrastructure.
As I explained in a previous blog post, the data/rows that we stream to BigQuery are ad-impressions, which are generated by an ad-server (Google DFP). This was a great accomplishment in its own right, especially after optimising our architecture and adding Redis into the mix. Using Redis added robustness, and stability to our infrastructure. But – there is always a but – we still need to denormalise the data before analysing it.
In this blog post I’ll talk about how you can use Google Cloud Pub/Sub to denormalize your data in real-time before performing analysis on it.
The elephant in the room
So what is ‘denormalisation’? To understand that, lets first look at ‘normalisation’.
The data we receive from an ad-impression is normalised. This means that all the meaningful information related to the row is stored in other tables. Thus, we need to do several joins with these other tables (think mapping or reference tables) in order to get a meaningful piece of information that can be used for business intelligence (BI) such as Tableau.
This process is called denormalisation.
That’s easy to do, right? Just do your joins when interrogating the data for BI tools and you are done! Well, that’s what you would think, but in reality it’s not that straight forward. The elephant in the room is the tremendous amount of data we need to handle. We are talking about 3.5 million rows per day that gets appended to a table which itself already has billions of rows, and all weighing in at more than a few TBs. Furthermore, this data will continue to accumulate and grow in size, so it must be scalable too.
Denormalising in the cloud
Attempting to do several joins in BigQuery using massive datasets it’s not one of its greatest strengths – even with its power and scalability, it slows down the queries considerably when doing large joins. And don’t even get me started on how inefficiently some of these BI tools compose their queries which are auto generated! That’s even worse!
To overcome this problem we turned to Google Cloud Dataflow (CDF). This tool can ingest the normalised data, and through the use of different ‘transforms’ (functions that take data as input, manipulate that data and output the result), can denormalise each row prior to analysis in BigQuery. It’s basically a massively scalable ETL tool. This process greatly facilitates the queries, and it also improves the performance for the BI tools because the SQL doesn’t need to be so complex.
CDF has two execution modes:
- Batch mode – the data pipeline will execute and go through all its transformations, and then when it’s finished the pipeline will be terminated.
- Streaming mode – the data pipeline will start executing and will sleep and wait until information arrives. It never terminates.
A ‘data pipeline’ is a set of transforms that are applied to the input data. We created a batch job to execute our CDF data pipeline each morning to do this denormalisation process for us. This pipeline produces a new table with all the denormalised rows, and then we work on that table i.e. perform our analysis.
However, because of the volume of data we’re handling, and the fact that CDF has to spin up the environment from scratch each time, this batch job takes some time to complete. But, what if we could denormalise our data directly in real-time, and harness the powerful streaming mode that CDF also possesses?!
Well, it turns out there is a way of doing this – by hooking CDF up to Google Pub/Sub.
What is Google Cloud Pub/Sub?
Google Pub/Sub is an enterprise size cloud message oriented middleware service. That means that is highly scalable and flexible. It also provides secure and highly available communication between independently written applications.
Using Pub/Sub, you can easily implement a message oriented asynchronous architecture. This will allow you to integrate two or more components without them needing to be aware of the existence and state of each other.
In a nutshell, a publisher (the Pub in Pub/Sub) application sends a message to a ‘topic’, and the subscribing (the Sub in Pub/Sub) applications will receive this message and be able to process it accordingly.
Communication
There are 3 different types of communication models:
- One-to-many: one publisher to a topic with many subscriptions.
- Many-to-one: Many publishers sending messages to different topics and one subscriber getting messages from these topics
- Many-to-many: a combination of the above
Subscription
There are 2 different types of subscription models:
- Pull subscriptions: The messages are persisted until some consumer application gets them, and acknowledges them. The client reading this subscription checks for the messages periodically. If there are no messages it simply sleeps and waits for the next one.
- Push subscriptions: The messages get pushed to an endpoint (i.e. URL) you define. This means that as soon as the message arrives, you are notified and you can then consume it immediately.
Pub/Sub APIs & SDKs
As you would expect, Pub/Sub has an easy to use REST/JSON API. There are also numerous SDKs to choose from, so pick your weapon of choice:
- Java,
- Python,
- Go,
- Javascript,
- .NET,
- Node.js,
- PHP,
- Ruby
- Objective-c
What can we use Pub/Sub for?
Pub/Sub is very powerful, and is ideal in cases where you want to integrate two or more applications without the hassle of managing the communication and the underlaying infrastructure. In addition to this, and more importantly, it brings the power of asynchronous communication which greatly simplifies the implementation on each side of the pipe.
Other example use cases:
- Balancing workload in network clusters
- Distributing event notifications
- Refreshing distributed caches
- Logging to multiple systems
- Collecting logs from multiple systems
- Reliability improvement
- Data streaming from various processes or devices.
The last example use case immediately caught my attention (‘data streaming from various processes or devices’). Could this mean that CDF could be plugged into Pub/Sub, and then denormalise on-the-fly!? If so, it would mean that CDF would need to be running in some kind of streaming mode. “CDF has 2 modes…” – BINGO! Check here and search for “Streaming Execution” for more details on how the streaming mode works.
Excited after discovering this feature, I decided to dive deep and see how we could solve this interesting problem using both CDF + Pub/Sub i.e. denormalise our data in real-time.
My spike looked like this:
As shown in the diagram above, the events (ad-impressions) are directly published to a Pub/Sub topic which has a pull subscription. CDF accesses to this subscription, and every time a new message arrives it consumes it and transforms it into a denormalised row. Finally this denormalised row is streamed to BigQuery.
Getting Started
I remembered my experience with JMS queues, and a chill ran down my spine! I gathered my courage and I decided to march forward nevertheless. Fortunately, Google has made the whole Pub/Sub integration pretty easy and straight forward.
-
Enable Pub/Sub API
On the Google Developer Console (GDC), go to ‘APIs’ and enable ‘Google Cloud Pub/Sub’:
-
Create your topic
Go to ‘Big Data-> Pub/Sub’ and click on new ‘Topic’, set the name and click create
-
Add your subscriptions
Expand the topic you just created and click on ‘new subscription’. You will have two options here – ‘Push’ and ‘Pull’. Pushing to an endpoint will require you to prove you own that domain (which is a bit of a hassle), and CDF only uses pull anyway. So lets add the name and click create:
-
Send a message
Here is an example, written in Java, that shows how to publish a message to the topic:
Pubsub pubsub = PortableConfiguration.createPubsubClient(); String message = "Hello Cloud Pub/Sub!"; PubsubMessage pubsubMessage = new PubsubMessage(); // You need to base64-encode your message with // PubsubMessage#encodeData() method. pubsubMessage.encodeData(message.getBytes("UTF-8")); List<PubsubMessage> messages = ImmutableList.of(pubsubMessage); PublishRequest publishRequest = new PublishRequest().setMessages(messages); PublishResponse publishResponse = pubsub.projects().topics() .publish("projects/projectID/topics/testTopic", publishRequest) .execute(); List<String> messageIds = publishResponse.getMessageIds()
The message can be anything, but JSON of course is a great choice.
-
Read the message from the subscription
Here is an example of reading messages from a subscription:
Pubsub pubsub = PortableConfiguration.createPubsubClient(); String subscriptionName = "projects/projectID/subscriptions/testSubscription"; // You can fetch multiple messages with a single API call. int batchSize = 10; // Setting ReturnImmediately to false instructs the API to // wait to collect the message up to the size of // MaxEvents, or until the timeout. PullRequest pullRequest = new PullRequest() .setReturnImmediately(false) .setMaxMessages(batchSize); PullResponse pullResponse = pubsub.projects().subscriptions() .pull(subscriptionName, pullRequest).execute(); List<String> ackIds = new ArrayList<>(batchSize); List<ReceivedMessage> receivedMessages = pullResponse.getReceivedMessages(); if (receivedMessages == null || receivedMessages.isEmpty()) { // The result was empty. System.out.println("There were no messages."); return; } for(ReceivedMessage receivedMessage : receivedMessages) { PubsubMessage pubsubMessage = receivedMessage.getMessage(); if (pubsubMessage != null) { System.out.print("Message: "); System.out.println(new String(pubsubMessage.decodeData(), "UTF-8")); } ackIds.add(receivedMessage.getAckId()); } // Ack can be done asynchronously if you care about throughput. AcknowledgeRequest ackRequest =new AcknowledgeRequest().setAckIds(ackIds); pubsub.projects().subscriptions().acknowledge(subscriptionName, ackRequest).execute();
Plugging in CDF
The last piece of the puzzle is the denormalisation of the row with CDF. As fellow Shiner Graham Polley explained in this blog, CDF is a service that allows you to load, process, transform, and analyze your data. Again, think ETL. In this case, we are interested in the transform part of the ETL, which is essentially our denormalisation.
So, how could we leverage this with Pub/Sub? Well, we can create our pipeline, set it to run in streaming mode and instead of reading from a BigQuery table, we read from the Pub/Sub subscription. This will allow us to reuse the denormalisation code (which was already implemented in the pipeline executing in batch mode).
Here is an example of how to create a CDF pipeline, and configure streaming mode.
DataflowPipelineOptions options = PipelineOptionsFactory.create().as(DataflowPipelineOptions.class); options.setRunner(DataflowPipelineRunner.class); options.setProject(“projectID”); options.setWorkerMachineType("n1-standard-4"); options.setZone("us-central1-a"); options.setStagingLocation("gs://binaries/pubsubConsumer"); options.setNumWorkers(3); options.setAutoscalingAlgorithm(DataflowPipelineWorkerPoolOptions.AutoscalingAlgorithmType.NONE); options.setStreaming(true); Pipeline.create(options);
A simple boolean set to run the pipeline in streaming mode. It couldn’t be easier could it?!
Here is an example that show how we can read from the topic’s subscription we defined earlier:
Collection<String> subscriptionStream = pipeline.apply(PubsubIO.Read.named("MySubscriptionReader"). subscription("/subscriptions/projectID/messages")); subscriptionStream. apply(ParDo.of(new DoFn<String, TableRow>() { @Override //This gets executed every time a new message is published to the topic. public void processElement(ProcessContext c) throws Exception { String message = c.element(); String tag = String.valueOf(Math.random()); TableRow tableRow = new TableRow(); tableRow.set("tag", tag); tableRow.set("message", message); c.output(tableRow); } //Finally we write to a Big Query table }).named("MessagesProcessor")).apply(BigQueryIO.Write. name("MessagesDump"). to(“projectID:pubsub.pubsubTest"). withSchema(createSchema()).withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED). withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)); pipeline.run();
The code above shows how we can define a transformation that is capable of reading from the Pub/Sub subscription (note the use of 'PubsubIO.Read.named'
). As soon as the method (‘run’
) is executed, CDF will sleep and wait for messages to arrive to the pipeline.
Every time a new message is published the pipeline will run, causing a new BigQuery table row to be created. This new ‘TableRow’ object will be then sent to be written to the table with its fully qualified name ‘projectID:pubsub.pubsubTest’s in BigQuery.
Conclusion
Pub/Sub can greatly help to have your information ready for analysis as soon as it arrives at your infrastructure through its seamless integration with CDF. This is the key here – all this data is useless unless we can extract useful insights.
Pub/Sub + CDF allow you to overcome the difficulties in terms of multiple joins and complex transformations of data. And anyway, it’s always a good practice to denormalise and cleanse your data prior to analysis!
All these factors combined will help you to have all your big data ready to be analysed and gleaned for those juicy insights that we are all looking for. And, that is the real goal of all this – the insights!
No Comments