Skip to main content

pandas performance improvements

option 1:

Break down the large csv file into small chunks
(using pd.read_csv has a parameter -"chunksize")

specified the chunksize as 1million rows, each time

so 69mill rows, will be broken down into 7 pieces

# read the large csv file with specified chunksize
# read_chunk.py
df_chunk = pd.read_csv(r'../input/data.csv', chunksize=1000000)

df_chunk - not a DF, but a TextFileReader object for iteration

workflow:
perform operation on each chunk and
concatenate each of them to form a dataframe in the end
By iterating each chunk,
we can filter or process -> chunk_processing.py before appending each chunk to alist
Finally, i concatenated the list into a final dataframe to fit into the local memory

#chunk_operation.py
chunk_list = []  # append each chunk df here

# Each chunk is in df format
for chunk in df_chunk:
    # perform data filtering
    chunk_filter = chunk_preprocessing(chunk)
 
    # Once the data filtering is done, append the chunk to list
    chunk_list.append(chunk_filter)
 
# concat the list into dataframe
df_concat = pd.concat(chunk_list)


option 2: filter out unimportant columns to save memory

option3: change dtypes for columns

changing dytpes is exteremely helpful to save memory, especially when we have large data for intense analysis or computation

by reducing the bits required to store the data, reduce the overall memory usage by the data upto 50%


# change the dtypes(int64-> int32)
# Change the dtypes (int64 -> int32)
df[['col_1','col_2',
    'col_3', 'col_4', 'col_5']] = df[['col_1','col_2',
                                      'col_3', 'col_4', 'col_5']].astype('int32')

# Change the dtypes (float64 -> float32)
df[['col_6', 'col_7',
    'col_8', 'col_9', 'col_10']] = df[['col_6', 'col_7',
                                       'col_8', 'col_9', 'col_10']].astype('float32')


option 4: use dask library: Dask- robust python library for performing distributed and parallel computations
- useful for numpy and pandas *that lazily loads*, to perform dataframe operations in chunks and in parallel
- supports grouping by performing data shuffling under the hood

from dask import dataframe as dd
dfd = dd.read_csv(
    ‘data/clickstream_data.tsv’,
    delimiter=’\t’,
    names=[‘coming_from’, ‘article’, ‘referrer_type’, ‘n’],
    dtype={
        ‘referrer_type’: ‘category’,
        ’n’: ‘uint32’},
    blocksize=64000000 # = 64 Mb chunks
)

try to get memory usage: df.memory_usage()



Techniques for ML on large datasets:

# Sampling: (when we need not read the complete file)

import pandas
import random

filename = "data.csv"
n = sum(1 for line in open(filename))-1  # Calculate number of rows in file
s = n//10  # sample size of 10%
skip = sorted(random.sample(range(1, n+1), n-s))  # n+1 to compensate for header
df = pandas.read_csv(filename, skiprows=skip)

# chunks or iteration:
import pandas
from sklearn.linear_model import LogisticRegression
datafile = "data.csv"
chunksize = 100000
models = []
for chunk in pd.read_csv(datafile, chunksize=chunksize):
    chunk = pre_process_and_feature_engineer(chunk)
    # A function to clean my data and create my features
    model = LogisticRegression()
    model.fit(chunk[features], chunk['label'])
    models.append(model)
df = pd.read_csv("data_to_score.csv")
df = pre_process_and_feature_engineer(df)
predictions = mean([model.predict(df[features]) for model in models], axis=0)


# optimize data types
usually when we load up a file, pandas infer the datatypes
these datatypes are not optimal, and will take more space than needed
3 most common datatype sued - int, float, object

default: Pandas sets dtype of integers to int64 (takes in 8 bytes), if they fit in the range of -32768 to 32767 cahnge to int 16, reduce your memory by 75%

transform columns of dtype Object to category - rather than having copies of same string at many positions in your data frame, pandas will have a single copy from each string


# Dask dataframe
- while handling large datasets, is by exploiting the fact that our machine has more than one core
- for this purpose, we use Dask, an open-source python project which parallelizes Numpy and Pandas
- Under the hood, a dask datagrame consists of many pandas dataframes that are manipulated in parallel
- How can data that does not fit in memory while using Pandas, fit in memory using Dask:
- because, Dask and Dataframe and operations are lazy
- this means, operations are not executed immediately, but rather build up task graph
- upon calling the compute method, built in Dask task scheduler co-ordinates partial loading and manipulation of data while making use of all cores
- after computing, dask combines them and returns the final result


Comments

Popular posts from this blog

AWS Connect: Reporting and Visualizations

Amazon connect offers: - built in reports i.e., historical and real-time reports.  We can customize these reports, schedule them and can integrate with any BI tool of our requirement to query and view the connect data.  Sample solution provided by AWS: 1. Make sure Connect is exporting the CTR data using Kinesis Data Stream 2. Use Kinesis Firehose to deliver the CTR that are in KDS to S3. (CTR's can be delivered as batch of records, so one s3 object might have multiple CTR's). AWS Lambda is used to add a new line character to each record, which makes object easier to parse.  3. s3 Event Notifications are used to send an event to modify the CTR record and saves it in S3. 4. Athena queries the modified CTR's using SQL. Use partitions to restrict the amount of data scanned by each query, improving performance and reducing cost. Lambda function is used to maintain the partitions.  5. Quicksight is used to visualize the modified CTRs.  Solution variations: Convert re...

Databricks: Job aborted due to stage failure. Total size of serialized results is bigger that spark driver memory.

  While running a databricks job, especially running a job with large datasets and longer running queries that creates a lot of temp space - we might be facing below issue if we have a minimal configuration set to the cluster.  The simple way to fix this would be changing the spark driver config in the databricks cluster tab spark.driver.maxResultSize = 100G (change the GB based on your cluster size)

Terraform lifecycle

 If we are using terraform, terraform state file is the heart of all the infrastructure that we spin up using terraform templates.  There are several ways to deploy the infrastructure using terraform: 1. Using CLI (setup terraform and then run terraform commands) 2. Automated Build (terraform scripts integrated as part of your jenkins pipeline) No matter of the way we chose, we must make sure that we are using the same terraform state file, so that we are having a sync and proper checklists of the resources that we used.  I would like to share the terraform commands that we do on a daily basis: terraform init = the basic/starting command which initializes the terraform (make sure the proper provider is provided. In my case, I use AWS).  terraform workspace select <workspace name > (creates a new workspace, useful in scenarios where we have different terraform modules - database, servers, logs, storage) terraform state list = shows the list of terraform resour...