Creating a serverless ETL nirvana using Google BigQuery

Creating a serverless ETL nirvana using Google BigQuery

Quite a while back, Google released two new features in BigQuery. One was federated sources. A federated source allows you to query external sources, like files in Google Cloud Storage (GCS), directly using SQL. They also gave us user defined functions (UDF) in that release too. Essentially, a UDF allows you to ram JavaScript right into your SQL to help you perform the map phase of your query. Sweet!

In this blog post, I’ll go step-by-step through how I combined BigQuery’s federated sources and UDFs to create a scalable, totally serverless, and cost-effective ETL pipeline in BigQuery.

Better late than never

I had been meaning to investigate both features in more detail for a while, and to see if either of them could be helpful on our projects that run BigQuery. But I just never got around to doing it. I’ll put all that down to enjoying the summer months in Australia, and preparing to become a father for the first time. That’s my excuse anyway.

Both features looked intriguing, especially UDFs, and it got me thinking about how we currently do our ETL for some clients that use Google Cloud Platform for their big data analytics. By combining both features, would it be possible to perform a completely serverless ETL in BigQuery – in just one pass? And if so, would it be able to scale to billions of records and terabytes of data!?

There was only one way to find out.

What is a federated source?

Getting data into BigQuery is normally achieved by either:

  1. Direct load jobs
  2. Batch ETL from Cloud Dataflow/Hadoop
  3. Streaming

There is a fourth method however. Federated sources. They allow you to query your data from locations external to BigQuery. That gives you the ability to both load and clean your data in one pass, by querying the data directly from GCS, and writing the cleaned result into BigQuery. Currently, only reading from GCS is supported. And only three data formats are supported:

  1. CSV
  2. JSON
  3. Google Cloud Datastore backup files

Federated sources do have limits and limitations. They are fully documented here.

What is a UDF?

javascript_logo_unofficial-300x300

Well, in a nutshell, and according to the Google folk who actually built it:

BigQuery supports user-defined functions (UDFs) written in JavaScript. A UDF is similar to the “Map” function in a MapReduce: it takes a single row as input and produces zero or more rows as output. The output can potentially have a different schema than the input.

What this means is that you can now mix JavaScript and SQL in BigQuery. This is particularly useful when you find yourself frustrated by trying to express complex and difficult computations in SQL e.g. string parsing, loops etc. SQL is too restrictive by nature, and not expressive enough to suit more complex tasks. According to Thomas Park, a Google engineer that worked on building UDFs:

JavaScript UDFs are executed on instances of Google V8 running on Google servers. Your code runs close to your data in order to minimize added latency.

Although it’s not explicitly stated there, what I suspect is happening is that the UDF is running on the same nodes that are executing in the BigQuery server tree i.e. the same node that is processing the data. Thus, that implies the Google V8 JavaScript engine has to have been deployed on each node in the BigQuery cluster – and that should make it super fast. Outstanding!

As with federated sources, UDF also have their own limits and limitations. They too are documented on the Google site here.

Our current ETL solutions

The use case I had in mind was for a particular client project that relies on BigQuery as the core of the solution. Our client has lots of data. Lots and lots of data. Think tens of billions of rows and terabytes of data generated per month. This data is then stored in GCS. Our current workflow to ETL those CSV files into BigQuery for analysis is:

  1. Perform a load job from GCS into BigQuery
  2. Run lots of SQL to map/clean/transform the data using the regex_extract function
  3. Aggregate the data using even more SQL

This works fine, but it’s a rather clunky solution. It requires lots of boilerplate code to load the CSV files into BigQuery, and the SQL used to chop up the data quickly becomes complex and hard to maintain – primarily because of all the gnarly string parsing and joins.

Another technique we use sometimes is to spin up a Cloud Dataflow pipeline, and do all our complex work there before writing the results to BigQuery. Again, it works fine, but it’s much slower than using BigQuery native SQL, and we have to pay more money (albeit not a lot) for the Dataflow cluster while it’s processing the data. That said however, Dataflow is still a fine option for complex ETL.

A serverless nirvana

empty

So, instead of using one of the above techniques to ETL the data, the idea floating around in my head was to use federated sources and UDFs together, so that the ETL could all be achieved in one SQL/JavaScript pass.

It would be serverless, quicker, and much easier to implement all the string transformations in JavaScript. The workflow would be:

  1. Using a federated source, read the data (CSV files) directly from GCS
  2. While reading the data, use a UDF to map/clean/transform it
  3. In paralell, aggregate the data using SQL
  4. Finally, write the results to a native BigQuery table

No servers. No gnarly SQL for complex string parsing. Massively scalable. Fast. Simple. It sounded like heaven, and in theory, it ticked all the boxes of a good software solution.

Step 1 – create a federated source

As I already mentioned, our client’s data sits as big fat CSV files in GCS. So the first thing I needed to do was to create a federated source in BigQuery to read that data directly from GCS. It was easy to create the federated source via the web UI. Simply follow these instructions on the Google documentation page. The trick is to select the “Google Cloud Storage” & “External Table” options:

Screen Shot 2016-03-23 at 2.18.45 PM

The volume I used to test was roughly a month’s worth of data. It weighed in at about 1.5 billion records, and almost was just shy of a TB. Let me self-quote that for impact reading:

“1.5 billion rows and just shy of a terabyte”

Here’s what the federated source looked like after I created it. As you can see, it’s pointing to a bunch of CSV files in GCS:

Screen Shot 2016-03-23 at 10.04.47 AM

It’s worth noting that I used the web UI to run this test, however all this functionality is of course available via BigQuery’s API.

Step 2 – sprinkle in some JavaScript

Creating the UDF simply required me clicking on the “UDF Editor” button in the web UI. Once you do this you enter the wonderful word of JavaScript and BigQuery. Ta da!

Screen Shot 2016-03-23 at 10.20.40 AM

The first thing I did was write the mapping function that would be called from my SQL. BigQuery UDFs are functions with two formal parameters. The first parameter is a variable to which each input row will be bound. The second parameter is an “emitter” function. Each time the emitter is invoked with a JavaScript object, that object will be returned as a row to the query.

For my example, I wanted to test one of the largest string fields we had in the data. It is the field which requires the most parsing and munging to make any sense of it. I aptly named it MyBigFatStringColumn for this testThe emit function returns two parsed string fragments named foo & bar which will come back to me as columns in BigQuery.

// UDF definition
function mapFooBar(row, emit) {
    emit({
          foo: mapFoo(row.MyBigFatStringColumn),
          bar: mapBar(row.MyBigFatStringColumn)
    });
}

It’s probably important to note at this point that you should absolutely not store or access mutable state across UDF calls. Why not? Well, think about it. BigQuery is highly distributed/paralell by design. That means when you run a query it will actually be running on multiple nodes in a cluster. This applies to UDFs too. Each node has a standalone JavaScript processing environment, so doing something like this is a very bad idea indeed:

var numRows = 0;

function dontDoThis(r, emit) {
 emit(rowCount: ++numRows);
}

// The query.
SELECT max(rowCount) FROM dontDoThis(t);

Next up, I needed some basic mappings for parsing the string in order to mimic our current ETL workflow. Being in JavaScript wonderland, creating some basic KV mapping objects was an absolute breeze – just like in any other JavaScript environment! I called them fooMappings & barMappings. Very creative, I know.

Then I created two more functions; mapFoo(s) and mapBar(s). Within each method, it looks for a pattern, and then extracts the digits after the “=” using some simple regex. This is the same requirement as what we have now using the regex_extract function in BigQuery’s SQL.

var fooMappings = [
{key: 0,value: 'foo-mapping-0'},
{key: 1,value: 'foo-mapping-1'}
];
var barMappings = [
{key: 0,value: 'bar-mapping-0'},
{key: 1,value: 'bar-mapping-1'},
{key: 2,value: 'bar-mapping-2'},
{key: 3,value: 'bar-mapping-3'},
{key: 4,value: 'bar-mapping-4'}
];

// Map Foo
function mapFoo(s) {
  if (s) {
      var regex = s.match(/abc=(\d+)/);
      if (regex) return fooMappings[regex[1]].value;
  }
}

// Map Bar
function mapBar(s) {
  if (s) {
      var regex = s.match(/xyz=(\d+)/);
      if (regex) return barMappings[regex[1]].value;
  }
}

Note: all this code was knocked up pretty quick and with brevity in mind! Yes, I know I could have easily created one mapping/regex function for both! So don’t call it out with stuff like “oh, that’s terrible JavaScript practice…bla bla..should have used a different function to do that..bla bla…could have done it this way…bla bla..” 🙂

The final piece of the JavaScript puzzle was registering the function with BigQuery so it’s visible from the SQL that’s calling it:

// UDF registration
bigquery.defineFunction(
    'mapFooBar',                    // Name used to call the function from SQL
    ['MyBigFatStringColumnName'],   // Input column names
    [{name: 'foo', type: 'string'}, // JSON representation of the output schema
     {name: 'bar', type: 'string'}],
    mapFooBar                       // The function reference
);

It’s that easy.

Step 3 – SQL joins the party

This was the easiest part. I wrote the following SQL:

SELECT
  foo,
  bar,
  COUNT(*) AS total
FROM
  mapFooBar(
  SELECT
    MyBigFatStringColumnName
  FROM
    [CPT_7414_PLAYGROUND.Foobar3] )
GROUP BY foo, bar
HAVING NOT foo is null and NOT bar is null
ORDER BY total DESC

e1fd271a276fbe74e46c148fdf055aef8e53e7b552ae3c7debb96f43d88e0e22

Let’s break that SQL down shall we folks?

  • LN 7 -10 is passing a sub query as a parameter to the UDF. That sub query is reading from the federated source I created in step 1 (Foobar3).
  • LN 2 & 3 are selecting the columns emitted from the UDF, plus a COUNT(*) that I threw in there for good measure.
  • LN 6 is calling the actual UDF – in this case mapFooBar()
  • LN 11-13 are just some basic SQL aggregation, ordering and filtering out nulls from the result set.

I also configured my query to write the results back to a native table in BigQuery. Doing this was trivial. Simply use the following options:

Screen Shot 2016-06-13 at 2.07.23 PM

 

Lets take a step back and admire this for a minute. Sure, it doesn’t look like much now, nor is the code/SQL doing anything overly complex. But the fact that you can mix JavaScript & SQL so easily to query massive datasets is, at least for me, game changing. Most of my time working with big data is spent doing janitorial work i.e. cleaning the data. This is pain, and I’d much rather be focusing on writing code to get insights from the data.

Finally, I’m not spinning up a single server. Nope. Not even one. Of course, under the hood, this is running on multiple nodes inside Google’s infrastructure, but I don’t need to care about any of that. It boils down to this – I don’t care about infrastructure anymore, I only care about the results.

Step 4 – Click “Run”

Organized1

Once I had all my ducks lined up, it was time to pull the trigger on the query! Now, normally when you’re doing a query on this amount of data in a traditional database it would take a while. In fact, it probably wouldn’t even run. But this is BigQuery folks. Its middle name is fast. Or is it scalable? I can never remember. Anyway..

Running the query with around 1.5 billion records and almost a terabyte was fast. It was actually a hell of a lot faster than what I thought it would be. BigQuery now had to read the data from GCS and not native BigQuery storage. It also had to execute the UDF too. So it should be slower right? At least a few minutes, no!? How wrong I was.

It took just 37.7 seconds to complete.

Yes folks. Just 37.7 seconds. 37.7 seconds to read almost 1.5 billion records from dozens of files stored in GCS, do some string parsing using JavaScript, and then finally aggregate the results too. That’s only slightly slower than what we experience running normal SQL over native tables in BiQuery.

*jaw drop*

Screen Shot 2016-03-23 at 2.22.43 PM

After I had digested just how quick it this ETL had run, I was then curious to know where most of the time was spent. So I clicked on the “Explanation”, and this is what I saw:

Screen Shot 2016-03-23 at 3.14.01 PM

I was expecting to see a lot more time spent in the read stage. But in fact, it was pretty much the same as the compute! Wow. I mean, seriously, how many cores must that query be running on in order to be able to achieve those kind of times!?

It’s mind boggling.

Wrapping up

I’ve hopefully just shown you how you can create a fully serverless, scalable, and blazingly fast ETL pipeline using only BigQuery and GCS. The trick is to use federated sources and UDFs in combination so you only need make one pass on the data to ETL it.

Wiring all this up was a breeze via the web UI, and I’d imagine doing it via the API wouldn’t be much harder either. The team is now scoping this out as a new ETL solution for our client because, simply put, it just makes sense to it! It will save costs, be quicker, and be much easier to maintain for future developers. Oh, and we can use JavaScript error handling (try/catch) to deal with dirty data now. Neat!

Some things to take into consideration:

  • Unlike native BigQuery tables, using federated sources does not support caching. It also processes all the data, and not just the columns that you reference like in native tables. So keep an eye on your query size/cost!
  • Testing the JavaScript was a little tricky. I hadn’t noticed at the time, but there is a online tool available to help you do this. However, it only supports testing JSON.
  • BigQuery’s web UI does not support pointing federated sources to compressed files. It is possible via the API though. See my post here on Stackoverflow about this.
  • If you can, you should pre-filter your data before running it though the UDF. This will save processing time. Do this by passing a sub-query as the parameter to the UDF (like in my example), instead of just the table/federated source.
  • I’d love to see more languages supported for UDFs e.g Python, Go, and Java.

I may be a little late to the party with testing out these BigQuery features given the fact that they were released last year! But I felt it was still worthy of a blog post because it ties both features together to perform massive scale ETL. I wasn’t able to find any other material that did the same. Please correct me if I’m wrong.

server.admin@shinesolutions.com
10 Comments
  • Chris Harrington
    Posted at 11:47h, 13 June Reply

    I don’t see that you showed writing the results back to BigQuery. How is that accomplished in this serverless model?

    • Graham Polley
      Posted at 14:09h, 13 June Reply

      You’re right. I didn’t make this clear. I’ve updated the post (“Step 3 – SQL joins the party”). Thanks!

      • Chris Harrington
        Posted at 22:57h, 13 June

        Thanks. That a start. But for someone like my self without any BigQuery experience, I still have no context for that screenshot. I assume it is some web interface to BigQuery used for testing? I know it takes more space, but not clipping the screenshot would leave useful context.
        Next question. What about doing this for real (ie, not in a test harness)? How might I configure this to run every day at 2am? Or do I have to write code? Consider updating your article with that detail – then you’d be showing a complete serverless ETL.
        Thanks again for the great article.

      • Graham Polley
        Posted at 23:14h, 13 June

        Correct Chris, there is a web UI for BigQuery. If you wanted to do something on schedule, you’ve a few options, all of which would require you coding something using the BigQuery API. For example, you could use a Lambda or cloud function. Or you could use a big standard cron on a Linux box. Hope this helps.

  • Kieran Clulow
    Posted at 15:46h, 11 September Reply

    Fantastic post Graham. Question – can you show the API call syntax for writing the table result back to BigQuery?

  • Pingback:Will Athena slay BigQuery? – Shine Solutions Group
    Posted at 11:47h, 09 December Reply

    […] is quite powerful functionality that allows you to mix SQL and code, you can read more about it here. With Athena, however, UDFs are still not supported. They are on the roadmap which is great to […]

  • Pingback:A top-10 list of Google BigQuery user experiences in 2016 | Google Cloud Big Data and Machine Learning Blog  |  Google Cloud Platform – Cloud Data Architect
    Posted at 20:40h, 17 December Reply

    […] Creating a serverless ETL nirvana using Google BigQuery, by Graham PolleyIn this post, the author explains how he combined the BigQuery federated sources feature, which supports queries across Google Cloud Storage and Google Drive, with UDFs to build a serverless ETL pipeline. […]

  • Pingback:TEL highlights for 2016 – Shine Solutions Group
    Posted at 15:09h, 21 December Reply

    […] Google BigQuery user experiences for 2016. In it, two blogs from Graham Polley made the cut (number 2 & 7). Yes we know, Graham’s such a Google fan […]

  • sanjeebspakrml
    Posted at 06:51h, 06 October Reply

    bq load –source_format=CSV ‘market.cust’ 2017-01-02_mkt_gbl_cust.csv
    bq load –source_format=CSV ‘market.cust’ gs://sp2040/raw/cards/cust/20170101/20170101_cust.csv
    IN this example , does the 2nd one is using the federated source? Can you elaborate my loading data from Google cloud Storage is same as building the external table?

  • Ankit Bansal
    Posted at 17:01h, 24 October Reply

    Thanks for writing this blog. My question is if your client data sits into some RDBMS system, then we need some other ETL also to bring that data into google cloud. In that case we can do our transformations in that ETL only, Why i would need bigquery. So would you suggest using big query ETL only when your data sits in GCS?

Leave a Reply

%d bloggers like this: