In this video, I will show , how we can create a ETL pipeline that processes CSV file uploaded to GCS bucket, using Bigquery Spark Stored Procedure and cloud function.
This is simple Data Engineering Project use case.
Cloud function code:
import functions_framework
from google.cloud import bigquery
Triggered by a change in a storage bucket
@functions_framework.cloud_event
def hello_gcs(cloud_event):
data = cloud_event.data
bucket = data["bucket"]
name = data["name"]
print(f"Bucket: {bucket}")
print(f"File: {name}")
client=bigquery.Client()
query_string="""CALL `{PROJECT_ID}.spark_proc_dataset.spark_proc`("{}","{}")""".format(bucket,name)
query_job=client.query(query_string)
-------------------------------------
APACHE SPARK STORED PROCEDURE
CREATE OR REPLACE PROCEDURE spark_proc_dataset.spark_proc(bucket STRING, file STRING)
WITH CONNECTION `{CONNECTION_ID}`
OPTIONS(engine="SPARK",runtime_version="2.1")
LANGUAGE PYTHON AS R'''
import json
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import col,count
spark=SparkSession.builder.appName("spark-bigquery-demo").getOrCreate()
bucket_name=str(json.loads(os.environ["BIGQUERY_PROC_PARAM.bucket"]))
file=str(json.loads(os.environ["BIGQUERY_PROC_PARAM.file"]))
file_uri='gs://{}/{}'.format(bucket_name,file)
customers=spark.read.csv(file_uri,inferSchema=True,header=True)
customers_filtered=customers.filter(~col("Country").isin(["Tuvalu"]))
customers_agg=customers_filtered.groupby("Country").agg(count("Customer_Id").alias("Customer_count"))
customers_agg.write.mode("append").format("bigquery").option("temporaryGcsbucket","spark_bq_temp_321").save("output_dataset.customer_agg")
'''
Негізгі бет ETL Pipeline using Bigquery Apache Spark Stored Procedure and Cloud Function | Data Engineering
Пікірлер