In this article, we will see how to read and analyse large CSV or text files with Pandas. This can be used when your machine doesn't have enough memory to process the whole file.

So you will learn at the end how to use Pandas for big data.

In short you can try by using Dask which is a wrapper of Pandas:

import dask.dataframe as dd
df = dd.read_csv('huge_file.csv')

Setup

Often genome data has huge files often more than 30 GB. To read such files from a laptop or machine with less than 30 GB we will need to use a library like Dask.

To get sample huge files for tests we can get from:

To download such file we can use:

wget ftp://ftp.ensembl.org/pub/release-77/fasta/homo_sapiens/dna/Homo_sapiens.GRCh38.dna.toplevel.fa.gz gunzip Homo_sapiens.GRCh38.dna.toplevel.fa.gz

The file will download as an archive. After uncompressing the file size will be 37GB.

Install Dask

First we need to install Dask library:

pip install dask

To read more about this library from the official documentation we can visit: Get Started with Dask

Benefits of using Dask are:

  • similar API like Pandas
  • cluster scaling
  • parallel computations
  • read "big data files"
  • Python compatible

You can read more here: Why Dask?

Read a Large CSV File

To read large CSV file with Dask in Pandas similar way we can do:

import dask.dataframe as dd
df = dd.read_csv('huge_file.csv')

We can also read archived files directly without uncompression but often there are problems. So when possible try to uncompress the file before reading it.

Work with Dask DataFrame

Some operation like:

df.head()

will work fast and without errors.

Other like:

df.tail()

may lead to errors:

ValueError: Mismatched dtypes found in pd.read_csv/pd.read_table.

Check the final section in this article to solve them.

Other will result into:

df.shape

into delayed operations:

(Delayed('int-af70a7d8-597c-4864-a876-2d74003f0e97'), 125)

Get shape of Dask DataFrame

To find number of the rows of a Dask DataFrame or the df.shape we can use:

t = df.shape
t[0].compute(),t[1]

this requires scan of whole data range:

result:

(622038697, 1)

Initial investigation on Dask DataFrame

To start analyzing Dask DataFrame we can start by:

df.head()

which will work normally as in Pandas.

We can also use it for other operation in order to analyze higher number of rows:

df.head(1000000).iloc[:, 0].str.find('AAACCCAAA')

this results into:

0        -1
1        -1
2        -1
3        -1
4        -1
         ..
999995   -1
999996   -1
999997   -1
999998   -1
999999   -1
Name: >1 dna:chromosome chromosome:GRCh38:1:1:248956422:1 REF, Length: 1000000, dtype: int64

If we try the same - search the first column of DataFrame for string pattern - for the whole DataFrame we will got:

df.iloc[:, 0].str.find('AAACCCAAA')

result:

Dask Series Structure:
npartitions=593
    int64
      ...
    ...  
      ...
      ...
Name: >1 dna:chromosome chromosome:GRCh38:1:1:248956422:1 REF, dtype: int64
Dask Name: str-find, 1779 tasks

Information about the partitions and number of tasks but not the actual result.

Dask .compute()

To get results from Dask operations we use .compute(). Let's check how to search the whole Dask DataFrame for a given string:

t = df.iloc[:, 0].str.find('AAACCCAAA')
t.compute()

Now we will get results for the whole DataFrame if string 'AAACCCAAA' is part of the first column or not:

0        -1
1        -1
2        -1
3        -1
4        -1
         ..
923939   -1
923940   -1
923941   -1
923942   -1
923943   -1
Name: >1 dna:chromosome chromosome:GRCh38:1:1:248956422:1 REF, Length: 622038697, dtype: int64

Note that those operations can take longer time due to the size, memory required and number of tasks.

Dask show progress bar

Finally let's cover how to show progress bar for the operations of Dask. To do so we can use tqdm or Dask diagnostic - ProgressBar. There is dask integration for it: from dask.diagnostics import ProgressBar.

So suppose that we have:

t = df.iloc[:, 0].str.find('AAACCCAAA')

ProgressBar

So to get progress bar while working with big CSV files we can do:

from dask.diagnostics import ProgressBar
from dask import delayed,compute

with ProgressBar():
    compute(t)

This would show progress bar like:

[###                                     ] | 8% Completed |  1min 43.7s

[######                                  ] | 15% Completed |  2min 35.8s

[##########                              ] | 25% Completed |  3min 55.6s

tqdm

So with tqdm we can try something like:

from tqdm.dask import TqdmCallback
from dask.diagnostics import ProgressBar
ProgressBar().register()

t = df.iloc[:, 0].str.find('AAACCCAAA')

with TqdmCallback(desc="compute"):
    t.compute()

result:

[                                        ] | 0% Completed |  0.0s
compute:   0%|          | 0/593 [00:00<?, ?it/s]
[##                                      ] | 5% Completed |  1min 16.7s

For more information you can check:

Dask best practices

In order to work with Dask I recommend reading best practices for Dask. Some of them are listed below:

  • Call delayed on the function, not the result
  • Compute on lots of computation at once
  • Don’t mutate inputs
  • Avoid global state
  • Don’t rely on side effects
  • Break up computations into many pieces
  • Avoid too many tasks
  • Avoid calling delayed within delayed functions
  • Don’t call dask.delayed on other Dask collections
  • Avoid repeatedly putting large inputs into delayed calls

Refer to Dask Best Practices

Dask Errors

ValueError: Mismatched dtypes found in pd.read_csv/pd.read_table.

This error happens when dtypes should be explicitly added to the dd.read_csv() method.

So to fix it check the error response and manually add the dtypes for each column:

df = dd.read_csv('huge_file.csv', dtype={'col1': 'object'})

DtypeWarning: Columns have mixed types. Specify dtype option on import or set low_memory=False.

To solve this problem use option: low_memory=False:

df = dd.read_csv('huge_file.csv', `low_memory=False`)