How to create a simple and powerful data pipeline in Google Cloud using Big Query, Cloud Functions, and Cloud Storage.

Extract, Load, and Transform data in Google Cloud with few lines of code.

Rodolfo Marcos
7 min readApr 23, 2023

In this post, we’ll learn how to create a simple and powerful data pipeline using only Google Cloud managed services. You'll only need Python and SQL basics to make it!

Currently, there are many tools that can be used to build data pipelines. Some of them are full-customized, others are fully-managed and expensive, and another requires specific language expertise.

Besides all the options on the market if you choose a Cloud solution you are on the way to success. Data and AI are very tied to cloud solutions nowadays because it's scalable, fast, and if well-developed, cost-effective.

Simplicity, carried to an extreme, becomes elegance.

The solution described in this article can be used successfully as a starting point for complex data pipelines or to solve specific ELTs jobs. No dataframes or collections here. You can start with basic Python and SQL languages in Google Cloud.

Problem to be solved

Suppose you have to ingest employee information in a Big Query table. You want that every time a .csv file is placed in a Cloud Storage bucket it automatically uploads the information to the final table and processes it accordingly based on rules on employee ID.

Sample employee.csv file

If, for instance, we upload a file with a line changed (employee id 20)

  • name: Luana
  • birth_date: 07/15/1976
  • occupation: Teacher
  • gender: F

Look that just the occupation changed and now we need to update the specific employee row to reflect this. This is our pipeline goal; when a file is loaded we update existing employee's information, and, if an employee doesn't exist create an entry for it.

Pipeline Overview

Pipeline Overview
  1. When a file is placed in Google Cloud Storage (GCS) it triggers a cloud function passing the GCS bucket and file name as parameters.
  2. The raw information from the file is loaded in the raw_employee Big Query table.
  3. A deduplication SQL is used on top of raw_employee to merge data from the new file with existing rows on the final employee table.
  4. The final employee table is saved with up-to-date information.

Benefits of the solution

  • Since both Cloud Function and Big Query are fully managed you don't need to worry about configuring any infrastructure. Also, the solution is Big Data ready.
  • Compared to other pipeline options in Google Cloud: Composer + DataFlow or DataProc this simple pipeline is more cost-effective and simpler as a starting point.
  • You can visualize Big Query data in DataStudio, or feed machine learning models hosted in GCP.

Implementation

  1. Create a cloud storage bucket for input files where we are going to place the files we want to ingest. In Google Cloud, go to Cloud StorageBucketCreate. You can name as you want and keep the default options. Keep note of the region you choose, you'll need it later. If you are not sure which region to use I recommend set (US multiple regions in the United States).
    Create another Cloud Storage bucket that will be used as a temporary resource for Big Query jobs, using the same region. You can name it temp_[GCP project ID].
  2. Now, in the same Google Cloud project go to Cloud Functions (You can find it in the search bar). Enable any required API if prompted to. Click on Create Function.
  3. In ConfigurationBasics you should set up the Cloud Function as in the image below. Just be aware that the cloud function region should be the same as the cloud storage bucket created in the first step.

4. Hit the "Add Eventarc Trigger". Set the event provider as "Cloud Storage" and the event as "google.cloud.storage.object.v1.finalized", then choose the input file bucket and finally "Save trigger". Now, every time a file is created inside the bucket the cloud function runs. It may popup approval for some APIs, hit "enable". Finally, hit the "next" button.

5. Under runtime select "Python 3.11" and fill entry point as "main". Then on the left panel let's create each file listed, you can find the code for copy/paste below. Replace where necessary with your Google Cloud Project ID and Big Query dataset.

  • main.py (Already exists by default we are going to replace its content)
  • requirements.txt (Already exists by default we are going to replace its content)
  • merge.sql (hit the cross sign to create the file)

When you finish it hit "Deploy". It will take some minutes after the conclusion.

main.py

main.py. Replace lines 10 and 11

merge.sql

merge.sql

requirements.txt

requirements.txt

Replace the variables at lines 10, 11, and 46 in main.py. Before testing let's discuss some points in the code.

  • main.py — line 34: The cloud function is triggered every time a file is placed in the bucket (This is the default behavior for Google Cloud) but we want to filter the files we are going to process. We can use a regex to filter the ones we want to process or stop otherwise. In our scenario, the files should match the format sample_[date].csv (line 13)
  • main.py — line 40: We use the skip_leading_rows=1 option to not load into Big Query the header row of the .csv file. If your file doesn't have a header set to 0.
  • main.py — line 46: Big Query jobs need a temporary cloud storage bucket for internal operations. Once the job is terminated the contents are deleted. Set the one you created in Step 1.
  • main.py — line 52: Wait for the load job to complete.
  • main.py — lines 58 to 61: Once the load job is completed the table [gcp-project].[dataset].employee_stage is created with all employee records from the file, the next step is to merge them into the final table. Here we read the merge SQL template (merge.sql) and replace the variables with the actual tables we want to use.
  • main.py — line 73: Wait for the merge job to complete. Results are saved in the final table.

Testing

Open your Cloud Function in the Google Cloud console. Go to the "LOGS" section. Keep it there.

Now, open another tab in your navigator, open your cloud storage bucket, and drop a .csv file there (You can download the example .csv from this article here. Remember to name it in the right format sample_20230101.csv for example before loading into Cloud Storage).

Once you place the file the Cloud Function execution begins, switch back to the logs tab and follow until it finishes, then, open Big Query and check your final table. If it's the first time you run the script the table will be created brand new with the initial records.

Now, if you change your employee's CVS file by switching a random column and upload it again you should see that just that column is updated for a given employee ID. If you include a new employee in your file with a non-existing employee ID a new row is created in the table.

Congratulations!

In this article, you learned how to create a simple pipeline that can import, process, and deduplicate data in Google Cloud. This automated solution is very cost-effective, flexible and can take advantage of Big Query SQL directly in your pipeline. You can try to change your merge.sql file with new features and explore possibilities!

Post-notes

  • Cloud Functions have a maximum execution time (at the moment of this article) of 540 seconds. So, if the entire pipeline execution takes more than the limit consider using Cloud Functions + DataFlow for your jobs. You can focus the data process and load steps to DataFlow and use Cloud Function as a trigger for the DataFlow job.
  • If your data is date partitioned, consider creating partitioned Big Query tables. It will enhance performance and save costs. Make sure to make the necessary adjustments in the code, specifically in the load job (line 38).
  • If your pipeline grows and you have several files that go through several tables and need more complex processing requirements consider using Cloud Composer as the Orchestrator. It is the fully managed Apache AirFlow for Google Cloud. It has many GCP connectors by default including Big Query, Cloud Storage, DataFlow, and more.

You can connect me on Linkedin at: https://www.linkedin.com/in/rodolfo-marcos-41ab3198/

Thank you so much! I hope it will be useful for you!

Photo by Mimi Thian on Unsplash

--

--

Rodolfo Marcos
Rodolfo Marcos

Written by Rodolfo Marcos

Cloud specialist - Sr. Data Engineer AWS/GCP Certified.

No responses yet