Skip to main content

Posts

Showing posts from 2020

Databricks: Job aborted due to stage failure. Total size of serialized results is bigger that spark driver memory.

  While running a databricks job, especially running a job with large datasets and longer running queries that creates a lot of temp space - we might be facing below issue if we have a minimal configuration set to the cluster.  The simple way to fix this would be changing the spark driver config in the databricks cluster tab spark.driver.maxResultSize = 100G (change the GB based on your cluster size)

How to become a data engineer?

 To everyone out there, who wants to become a Data Engineer, keep following this blog as I am on the same path as you are. Interested in solving any data challenges (big/small). Having exposure on many tools and technologies is a nice to have, but what's must is to understand the underlying concepts or technical architectures or the internals of a tool. It makes us a better data engineer only if we try things out, learn something new, gain new tech experience. Only if we know what each tool does, the pros and cons of using it, only then we can select the right tools to solve the right problems. So I want to catalog all the learnings as it helps someone out there who is on the same path as me. Just sharing :)  Primary skills to become a data engineer: 1. Programming skills (Java/Python/Scala) 2. Querying Skills (SQL/Hive QL/Spark SQL) 3. ETL architectures (Batch/Streaming) 4. Data warehousing concepts / Database Design 5. Cloud computing (AWS/GCP/Azure) 6. Big Data (Hadoop/Spar...

Connecting to Salesforce using Python [aiosfstream]

Connect to Salesforce Streaming Library using python to consume Salesforce Objects.  Library used: aiosfstream Ref link: https://aiosfstream.readthedocs.io/en/latest/quickstart.html#connecting   Quick start:   Authentication : To connect to salesforce streaming API, all clients must authenticate themselves. supports various ways: Username - Password authentication (using SalesforceStreamingClient )   client= SalesforceStreamingClient ( consumer_key="<consumer key>", consumer_secret = "<consumer secret>", username="<username>", password = "<password>" ) # client = Client(auth)     Refresh token authentication   auth = RefreshTokenAuthenticator ( consumer_key = "<consumer key>", consumer_secret = "<consumer secret>", refresh_token = "<refresh_token>" ) client = Client(auth)   Authentication on sand...

Copy data from S3 to Aurora Postgres

Scenario 1: To copy data from S3 to Aurora Postgres (greater than v9 or latest) How ?: We can use aws_s3.table_import_from_s3 function (to migrate the data from S3 to Aurora Postgres).  Steps: A sample file with columns - id, prefix, mstr_id is copied to S3.  Create schema on Aurora Postgres (with the required columns).  drop table core . mstr ; CREATE TABLE core .mstr ( id varchar ( 300 ) NULL , prefix varchar ( 300 ) NULL , mstr_id float8 NULL ); Copy command to transfer the data from S3 to Aurora Postgres SELECT aws_s3 . table_import_from_s3 ( ' core.MSTR ' , ' id,prefix,mstr_id ' , ' (format csv, header true) ' , '<bucket-name> ' , ' MSTR_DATA/part_file_00.csv ' , ' us-east-2 ' , '<secret key> ' , '<access key> ' ); Note: If IAM roles are given, we need not specify access keys.  SELECT aws_s3 . table_import_from_s3 ( ' core.MSTR ' , ' id,prefix,mst...

Getting started with apache-airflow (Part1)

# Apache airflow quick start link:  https://airflow.apache.org/docs/stable/start.html # export the AIRFLOW_HOME vi ~/.bash_profile # setting AIRFLOW HOME export AIRFLOW_HOME=/User/Desktop/airflow/ cd ~AIRFLOW_HOME # start the virtual environment python3 -m venv ./venv # to show the list of dependencies pip3 list # install apache airflow pip3 install apache-airflow # initialize the airflow database $ airflow initdb # starting the webserver on port 8080 $ airflow webserver -p 8080 Now, we must be able to see Airflow-DAG's on local URL : http://localhost:8080/admin/ # start the scheduler $ airflow scheduler # Try to review the airflow config file found under AIRFLOW_HOME dir or go to UI and then follow the Admin -> Configuration menu. $ cat airflow.cfg We can learn more about airflow features from the configuration files as below: It can store logs remotely in AWS S3 , Google Cloud Storage or Elastic Search ( remote_logs , j...

Getting started with terraform

Install terraform: brew install terraform Terraform - infrastructure as code. - To build, manage and modify the infrastructure in a safe and repeatable way Why terraform? - to manage environments using configuration language - here it uses HCL - HashiCorp Configuration Language Infrastructure as a code? - Instead of using UI to create resources, we use a file/files to mange infrastructure - Resource: Any piece of infrastructure (Ex: Virtual machine, security group, network interface)' - Provider - AWS, GCP, GitHub, Docker - automates the creation of resources at the time of apply Advantages of IAC: - Easily repeatable - easily readable - operational certainty with "terraform plan" - standardized environment builds - quickly provisioned development environments - disaster recovery provider "aws" {       access_key = "ACCESS_KEY",       secret_key = "SECRET_KEY",       region = "us-east-1" } # specific...

Explode function using PySpark

Sometimes, the data frame which we get by reading/parsing JSON, cannot be used as-is for our processing or analysis. Explode function to the rescue. When our df.printSchema( ) , returns as an array of structs, then using explode function is little tricky compared to using array of elements Sample script which worked for me to solve the explode for array of structs: """python from pyspark.sql import SQLContext, SparkSession from pyspark.sql.functions import explode spark = SparkSession.builder.appName('test-explode').getOrCreate() sqlContext = SQLContext(spark) df = sqlContext.read.json("<json file name>") exploded_df = df.select("id", explode("names")).select("id", "col.first_name", "col.middle_name", "col.last_name") exploded_df.show() """ To filter out based on a condition: male_names_list = exploded_df.filter(exploded_df.GENDER=='M')...