Bootstrap Action:
-----------------------------
Basically, Bootstrap action is used to install required packages before the cluster is created.
It has one special advantage that it installs packages in each node in the cluster as specified in the script.
In this video , I have explained about this & also coverted how to use Secret Manager with PySpark in EMR.
Prerequisite:
----------------------
How to Use AWS Glue with Snowflake | PySpark-Snowflake Connectivity
• How to Use AWS Glue wi...
AWS Secrets Manager - Create Store and Retrieve a Secret
• AWS Secrets Manager - ...
Bootstrap Code:
------------------------------
#!/bin/bash
sudo pip3 install -U boto3
PySpark Code:
----------------------------
pyspark --packages net.snowflake:snowflake-jdbc:3.11.1,net.snowflake:spark-snowflake_2.11:2.5.7-spark_2.4
import boto3
import base64
from botocore.exceptions import ClientError
def get_secret():
secret_name = "{}"
region_name = "us-east-2"
Create a Secrets Manager client
session = boto3.session.Session()
client = session.client(
service_name='secretsmanager',
region_name=region_name
)
get_secret_value_response=""
try:
get_secret_value_response = client.get_secret_value(
SecretId=secret_name
)
except ClientError as e:
if e.response['Error']['Code'] == 'DecryptionFailureException':
Secrets Manager can't decrypt the protected secret text using the provided KMS key.
Deal with the exception here, and/or rethrow at your discretion.
raise e
elif e.response['Error']['Code'] == 'InternalServiceErrorException':
An error occurred on the server side.
Deal with the exception here, and/or rethrow at your discretion.
raise e
elif e.response['Error']['Code'] == 'InvalidParameterException':
You provided an invalid value for a parameter.
Deal with the exception here, and/or rethrow at your discretion.
raise e
elif e.response['Error']['Code'] == 'InvalidRequestException':
You provided a parameter value that is not valid for the current state of the resource.
Deal with the exception here, and/or rethrow at your discretion.
raise e
elif e.response['Error']['Code'] == 'ResourceNotFoundException':
We can't find the resource that you asked for.
Deal with the exception here, and/or rethrow at your discretion.
raise e
else:
Decrypts secret using the associated KMS key.
Depending on whether the secret is a string or binary, one of these fields will be populated.
if 'SecretString' in get_secret_value_response:
secret = get_secret_value_response['SecretString']
else:
decoded_binary_secret = base64.b64decode(get_secret_value_response['SecretBinary'])
Your code goes here.
return get_secret_value_response
import json
secret=json.loads(get_secret()['SecretString'])
from pyspark.sql import SparkSession
from pyspark import SparkContext
def main():
SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"
snowflake_database="*******"
snowflake_schema="*************"
source_table_name="**************"
snowflake_options = {
"sfUrl": secret['sfUrl'],
"sfUser": secret['sfUser'],
"sfPassword": secret['sfPassword'],
"sfDatabase": snowflake_database,
"sfSchema": snowflake_schema,
"sfWarehouse": "COMPUTE_WH",
"sfRole":"demo_role_yt"
}
df = spark.read \
.format(SNOWFLAKE_SOURCE_NAME) \
.options(**snowflake_options) \
.option("dbtable",snowflake_database+"."+snowflake_schema+"."+source_table_name) \
.load()
df1=df.groupBy("CITY").sum("quantity");
df1.write.format("snowflake") \
.options(**snowflake_options) \
.option("dbtable", "**********").mode("overwrite") \
.save()
main()
Check this playlist for more AWS Projects in Big Data domain:
• Demystifying Data Engi...
Негізгі бет Bootstrap Action & Managing secrets in AWS EMR PySpark job
Пікірлер: 9