Test Driving Google Cloud Dataflow

Test Driving Google Cloud Dataflow

Bq_tOGxCMAELB4k

Back in June 2014, at the annual Google IO in San Francisco, Google unveiled their newest, and much hyped cloud product, Cloud Dataflow. The demo they did that day, using a live twitter feed to analyze supporter sentiment during the 2014 world cup, got my mouth watering at the prospect of working with it. It looked downright freaking awesome, and I just couldn’t wait to get my hands on it to take it for a spin.

On the road to simplicity

Dataflow is a complement to Google’s existing cloud services. It’s not in any way designed to be a replacement for such offerings like BigQuery (BQ), Cloud Compute or Cloud SQL. Rather, it’s designed to be used in conjunction with those other services to help load, process, transform, and analyze your data. Dataflow is aimed squarely at making it easy to write complex data pipelines, and letting Google worry about such things as scalability and parallelization.

mh_google01

This simple approach allows software engineers to crack on with writing code, and solving problems, without having to worry about the gnarly complexities of spinning up worker pools, and managing the resources that power them. With Dataflow, the service will handle all that for you, and make the whole process far less cumbersome, and ultimately much more efficient. That’s a very good thing in software engineering.

Getting the keys

When I saw Dataflow in action, I immediately started to visualize the numerous ways it could be applied to a variety of our client’s projects, some of whom are struggling to manage their complex data pipelines, and massive datasets. So I set to work on getting access to the alpha version, which is currently available.

Some of you may recall my previous posts on BQ, and the cool work we had done with it. If not, then cast your mind back. Using our previous experience with BQ and Google’s cloud, we put the feelers out and asked them for access to Dataflow. We were surprised that in just a couple weeks, we were given the keys to take the alpha version for a spin. Score! It was now time to have some fun, and to really see what this puppy was capable of.

Note: Although Dataflow will be available in both streaming and batch mode when it’s production ready, the streaming functionality wasn’t quite ready for testing, so we only had access to the batch side of things. Nevertheless, we had the perfect test case to throw at it, and it was very well suited to batch processing anyway.

The test track

The test we wanted to throw at Dataflow was quite simple, but it also packed enough punch to make a worthy adversary, and put Dataflow through its paces. On a previous project, we had successfully used BQ to grind through massive datasets of ad-serving logs, which had billions of rows and TB’s of data – in about 30 seconds. Don’t believe me!? Here. The ad-serving logs were pumped directly into Google Cloud Storage (GCS) from Double Click for Publishers (DFP), and were made up of 3 distinct sources:

  1. Impressions
  2. Clicks
  3. ActiveViews

From GCS we loaded the files directly into BQ using its bulk loading functionality. However, as we later discovered, some of the fields needed to be transformed before the business could start interrogating the data, and making sense of it. Did someone just order a classic ETL use case on the rocks?!

Once loaded into BQ, we then ran numerous SQL statements over the datasets, performing subsequent joins onto reference tables to denormalize the data. And this is when we discovered the Achilles heel of BQ. Although it’s phenomenally quick at analyzing gigantic datasets, it’s really not geared for large multiple joins, and as such, its performance significantly degrades the more joins you throw into the mix. Thus, we had to apply some cumbersome workarounds to map the data, such as sharding tables, and splitting the joins into individual steps.

In the end it all felt a bit dirty even though it worked fine. Here’s how it looked architecturally:

 

blogxml (2)

But, what if we could have used an ETL service that had all the plumbing in place for integrating seamlessly with existing Google cloud services, could handle massive datasets, scale effortlessly, was easy to use, and cheap at the same time!? Enough said. We now had our perfect test for Dataflow, and it was time to rev the engine to see what it could do.

All too easy

To kick off proceedings, we decided we wanted to start our testing with just 2 days worth of logs, which equated to approximately 200M rows, or around 100GB. Aside from a few minor hurdles, like getting acquainted with the Java SDK classes and the Dataflow programming model, getting our first pipeline up and running was laughably easy. In fact, so much so, that one of our engineers remarked “That can’t be all there is to it!?“. He was wrong.

  1. Configure & create the pipeline
  2. Define the input
  3. Define the transformation
  4. Define the output
  5. Execute the pipeline

Seriously, that’s all there is to it.

bermuda-triangle-nah-too-easy

No need to worry about spinning up instances. Forget about the headaches of managing big clusters, IO, disk capacity, and scalability. Close the book on tearing down resources when the work is done, and when they are no longer needed. Yup, press the reset button folks. Dataflow requires a subtle shift in mindset. Repeat after me:

  • I shall no longer worry about any infrastructure.”
  • “I shall no longer worry about any infrastructure.”
  • ..

Got it?! Fantastic. Let’s move on.

1. Configure & create the pipeline

The first step in the process is configuring the pipeline.  A few lines of code, and Bob’s your uncle:

DataflowPipelineOptions options = PipelineOptionsFactory.create().as(BlockingDataflowPipelineRunner.class);

options.setRunner(BlockingDataflowPipelineRunner.class);
options.setProject("<project_name>");
options.setWorkerMachineType("n1-standard-1");
options.setZone("asia-east1-a");
options.setStagingLocation("gs://dataflow/binaries");
options.setNumWorkers(100);
options.setMaxNumWorkers(150);
options.setAutoscalingAlgorithm(DataflowPipelineWorkerPoolOptions.AutoscalingAlgorithmType.BASIC);

Pipeline pipeline = Pipeline.create(options);
  • Line 3: Specify the runner type. Currently there are 3 available:
  1. DirectPipelineRunner: The pipeline will run locally on your machine. Use this for development and testing.
  2. BlockingDataflowPipelineRunner: The pipeline will be executed synchronously in Google’s cloud. All dependancies, JAR’s etc. will be uploaded for cloud execution.
  3. DataflowPipelineRunner: The same as the BlockingDataflowPipelineRunner except it’s executed asynchronously.
  • Line 5: Choose the type of instance that will be spun up in the worker pool. See here for a list that you can use depending on your requirements e.g. if you need to do some computationally expensive operations then pick a higher spec CPU type.
  • Line 7: Set the ‘staging location’. This is where all the application binaries, JAR’s etc. will be uploaded to for execution in the cloud.
  • Lines 8-10: Set the number of workers (i.e. instances) that you want spun up in your worker pool. You can set floor and ceiling values, and set the autoscaling algorithm to configure the pool to scale automatically. Awesome!!

That’s all there is to configuring the worker pool and having X number of instances spun up in Google’s cloud, which will execute your pipeline. Write 10 lines of code and you’ve got a fully operational cluster with autoscaling in place. That’s just game-changing.

Oh yeah, and did I mention that you don’t need to worry about tearing everything down after the pipeline has finished executing!? Yup, you can forget about all that nonsense too from now on.

2. Define the input

All the files we needed to transform were already sitting in GCS. So this part was trivial. Because we wanted to test using 2 days worth for each of the sources (Impressions, Clicks, and ActiveViews), we simply used a glob pattern to instruct Dataflow where to pull our files from:

pipeline.apply(TextIO.Read.from("gs://<bucket>/Impressions_201502[12]*");

Using the ‘*’ pattern made it possible to scoop up all the files for those 2 days (1 per hour, so 48 in total for each source). That resulted in a total of 144 files that needed to be read in order to process 2 day’s worth of Impressions, Clicks and ActiveViews. Easy.

Note: It’s possible to specify multiple sources instead of using a glob pattern. This requires using a flatten function to merge them for the pipeline to consume. We did try this, but we ran into some issues with it. See our question on stackoverflow.

3. Define the transformation

Dataflow currently supports 4 core transformations, which is the processor step of the pipeline. The requirements will ultimately determine which type of transformation you need to use. For us, we simply needed to regex a few String fields, and then map them to other Strings. Thus, it was obvious that core parallel processing using a ParDo transformation would be the best choice.

To implement a ParDo, simply implement the processElement(ProcessContext c) method in the abstract class DoFn<I,O>:

public class MyDoFn extends DoFn&amp;lt;String, String&amp;gt;

@Override
public void processElement(ProcessContext c) throws Exception {
   String[] event = c.element().split(&quot;,&quot;);
   String mapped = performMappings(event);
   c.output(mapped);
}

When you implement your ParDo, you must define the type of the input and output. The input in our case was simply String (a line in the log file), and the output was again a String (the same line transformed with the newly added mappings). Simples!

Note: One thing that did catch us out when writing our ParDo was that any object(s) it references must implement Serializable. Google are obviously serializing and deserializing your code when it’s running in the cloud to distribute it across the worker pool. It is mentioned in the javadoc, but we had missed it. Double Doh!

facepalm,

4. Define the output

Now that the input source and transformation were done and dusted, we needed to define where the results should be written to. There are currently 3 options available in the alpha release

  1. Text
  2. BQ
  3. Arvo.

For this test, we settled on writing the results straight back to GCS instead of directly to BQ. This would allow us to leverage our existing BQ bulk file loader application, which had some smarts and automation baked into it. We also wanted to see how quick Dataflow was able to write to disk, because we had some other cool ideas we were keen to test in the future that would require writing to GCS.

Specifying the output as GCS was a breeze:

TextIO.Write.to(&quot;gs://&amp;lt;bucket&amp;gt;/dataflow_test_2_days&quot;);

Note: It’s currently not possible to write the results to GCS compressed. But, it looks like write compression it will be supported in a later release.

5. Execute the pipeline

Finally, glue the input, transformation and output together and pull the trigger:

pipeline
.apply(TextIO.Read.from(&quot;gs://&amp;lt;bucket&amp;gt;/Impressions_201502[12]*&quot;))
.apply(ParDo.of(new MyDoFn()))
.apply(TextIO.Write.to(&quot;gs://&amp;lt;bucket&amp;gt;/dataflow_test_2_days&quot;));
pipeline.run();

It’s ridiculously easy isn’t it!? Honestly, I’ve written far more complex code in Java trying to do something much simpler than this.

If you’re feeling a little bewildered, and are not quite sure what just happened, fear not my friends! We also had that feeling when we kicked off our first pipeline. So here’s a brief summary of what happens when the pipeline is executed to keep you from falling over in disbelief:

  1. The application code and it’s dependancies are uploaded to the cloud
  2. The execution graph is constructed from the code
  3. The graph is optimized and fused by the Dataflow service
  4. The worker pool is spun up (in a freakishly quick time), and the graph begins executing
  5. When the pipelines is finished, the worker pool and all resources are torn down.

The Google documentation on executing the pipeline gives much more details on what is actually going on under the hood. It also explains what happens with the code much better than me:

“When run, your Java program only builds a pipeline. The Pipeline object that you create in your Java program essentially generates a specification for the data processing job for the service to run.

When the Cloud Dataflow service builds your actual pipeline on cloud resources, it may be faster to run certain transformations together, or in a different order, for the fastest and most efficient execution. The Cloud Dataflow service fully manages this aspect of your pipeline’s execution”

Keeping the engine warm

Once the pipeline is executed, it’s now possible to monitor it’s progress by opening the developers console and clicking on the job. When you have access to Dataflow, you’ll get a new link appearing under “Big Data”, which brings you to the DataFlow dashboard. From there, you get a cool graphical overview of the pipeline as it’s running, can view the logs, and see vital statistics like writes p/s or number of records processed.

Screen Shot 2015-02-23 at 8.22.44 pm

Looking at the screenshot above, you can see all 3 data sources (Impressions, Clicks and ActiveViews) being processed, and written to GCS. Of course, it’s also possible to track the progress programmatically via the SDK from the returned Job ID, but I didn’t want to bore you with console output! Look at these nice shiny dashboards instead!

You can also keep track of all the instances that the Dataflow service has spun up, and that will be managed by Dataflow on your behalf:

Screen Shot 2015-02-24 at 9.37.14 pm

You can even SSH in to the instance directly from the browser if you so wish. Outstanding!

Returning the keys

For our two day’s worth of data, across the 3 data sources, we were certainly moving a hefty enough chunk of data around. Weighing in at about 100GB and 200M rows, we felt it was a good initial test for Dataflow, and to see what it was all about. Using 50 and 75 as our floor and ceiling worker values respectively, we saw consistent results around the 10 minute region.

That time also included spinning up the worker pool and tearing it all down again once the pipeline was finished. Given that it can take 1-2 minutes to spin up the worker pool alone, it’s nevertheless an impressive result. We also witnessed write speeds of over 1.45M records p/s. No..seriously. Look!

Untitled 2

However, we think it’s still got a lot more to give hidden under up its sleeve. Because it’s still in alpha, no doubt its performance is still being fine tuned and tweaked. But, performance aside, it was something else that impressed us the most. Something that all engineers strive for in their work. Something that is often unattainable by so many of us.

The art of simplicity.

It was this that impressed us the most about Dataflow. Sure, there are still some some bugs to ironed out. But we were simply blown away by how easy it is to use, and to get up and running with it. In a matter of just a few hours we had a basic pipeline executing in the cloud with 75 instances spinning away. Sweet.

That’s what we love about software products like Dataflow. None of the usual “oh, change this config file there, that one over here, install that package dependancy next…” annoying setup steps. Nope. Dataflow just works. We’re really looking forward to it coming out of alpha, when we can then start building production systems on top of it, and testing the streaming functionality. We’d also like to throw even more data at it – like a year’s worth with 50 billion rows – and see how it performs with that!

Bruce Lee once said, “simplicity is the key to brilliance“. Google might have just nailed it with their latest cloud service, Cloud Dataflow.

 

server.admin@shinesolutions.com
4 Comments

Leave a Reply

%d bloggers like this: