In this article, we will see how to read and analyse large CSV or text files with Pandas. This can be used when your machine doesn't have enough memory to process the whole file.
So you will learn at the end how to use Pandas for big data.
In short you can try by using Dask which is a wrapper of Pandas:
import dask.dataframe as dd
df = dd.read_csv('huge_file.csv')
Setup
Often genome data has huge files often more than 30 GB. To read such files from a laptop or machine with less than 30 GB we will need to use a library like Dask.
To get sample huge files for tests we can get from:
- Genome data - https://www.sanger.ac.uk/resources/downloads/human/
- Kaggle filters - https://www.kaggle.com/datasets?fileType=csv&sizeStart=30%2CGB
To download such file we can use:
wget ftp://ftp.ensembl.org/pub/release-77/fasta/homo_sapiens/dna/Homo_sapiens.GRCh38.dna.toplevel.fa.gz gunzip Homo_sapiens.GRCh38.dna.toplevel.fa.gz
The file will download as an archive. After uncompressing the file size will be 37GB.
Install Dask
First we need to install Dask library:
pip install dask
To read more about this library from the official documentation we can visit: Get Started with Dask
Benefits of using Dask are:
- similar API like Pandas
- cluster scaling
- parallel computations
- read "big data files"
- Python compatible
You can read more here: Why Dask?
Read a Large CSV File
To read large CSV file with Dask in Pandas similar way we can do:
import dask.dataframe as dd
df = dd.read_csv('huge_file.csv')
We can also read archived files directly without uncompression but often there are problems. So when possible try to uncompress the file before reading it.
Work with Dask DataFrame
Some operation like:
df.head()
will work fast and without errors.
Other like:
df.tail()
may lead to errors:
ValueError: Mismatched dtypes found in
pd.read_csv
/pd.read_table
.
Check the final section in this article to solve them.
Other will result into:
df.shape
into delayed operations:
(Delayed('int-af70a7d8-597c-4864-a876-2d74003f0e97'), 125)
Get shape of Dask DataFrame
To find number of the rows of a Dask DataFrame or the df.shape
we can use:
t = df.shape
t[0].compute(),t[1]
this requires scan of whole data range:
result:
(622038697, 1)
Initial investigation on Dask DataFrame
To start analyzing Dask DataFrame we can start by:
df.head()
which will work normally as in Pandas.
We can also use it for other operation in order to analyze higher number of rows:
df.head(1000000).iloc[:, 0].str.find('AAACCCAAA')
this results into:
0 -1
1 -1
2 -1
3 -1
4 -1
..
999995 -1
999996 -1
999997 -1
999998 -1
999999 -1
Name: >1 dna:chromosome chromosome:GRCh38:1:1:248956422:1 REF, Length: 1000000, dtype: int64
If we try the same - search the first column of DataFrame for string pattern - for the whole DataFrame we will got:
df.iloc[:, 0].str.find('AAACCCAAA')
result:
Dask Series Structure:
npartitions=593
int64
...
...
...
...
Name: >1 dna:chromosome chromosome:GRCh38:1:1:248956422:1 REF, dtype: int64
Dask Name: str-find, 1779 tasks
Information about the partitions and number of tasks but not the actual result.
Dask .compute()
To get results from Dask operations we use .compute()
. Let's check how to search the whole Dask DataFrame for a given string:
t = df.iloc[:, 0].str.find('AAACCCAAA')
t.compute()
Now we will get results for the whole DataFrame if string 'AAACCCAAA' is part of the first column or not:
0 -1
1 -1
2 -1
3 -1
4 -1
..
923939 -1
923940 -1
923941 -1
923942 -1
923943 -1
Name: >1 dna:chromosome chromosome:GRCh38:1:1:248956422:1 REF, Length: 622038697, dtype: int64
Note that those operations can take longer time due to the size, memory required and number of tasks.
Dask show progress bar
Finally let's cover how to show progress bar for the operations of Dask. To do so we can use tqdm
or Dask diagnostic - ProgressBar. There is dask integration for it: from dask.diagnostics import ProgressBar
.
So suppose that we have:
t = df.iloc[:, 0].str.find('AAACCCAAA')
ProgressBar
So to get progress bar while working with big CSV files we can do:
from dask.diagnostics import ProgressBar
from dask import delayed,compute
with ProgressBar():
compute(t)
This would show progress bar like:
[### ] | 8% Completed | 1min 43.7s
[###### ] | 15% Completed | 2min 35.8s
[########## ] | 25% Completed | 3min 55.6s
tqdm
So with tqdm we can try something like:
from tqdm.dask import TqdmCallback
from dask.diagnostics import ProgressBar
ProgressBar().register()
t = df.iloc[:, 0].str.find('AAACCCAAA')
with TqdmCallback(desc="compute"):
t.compute()
result:
[ ] | 0% Completed | 0.0s
compute: 0%| | 0/593 [00:00<?, ?it/s]
[## ] | 5% Completed | 1min 16.7s
For more information you can check:
Dask best practices
In order to work with Dask I recommend reading best practices for Dask. Some of them are listed below:
- Call delayed on the function, not the result
- Compute on lots of computation at once
- Don’t mutate inputs
- Avoid global state
- Don’t rely on side effects
- Break up computations into many pieces
- Avoid too many tasks
- Avoid calling delayed within delayed functions
- Don’t call dask.delayed on other Dask collections
- Avoid repeatedly putting large inputs into delayed calls
Refer to Dask Best Practices
Dask Errors
ValueError: Mismatched dtypes found in pd.read_csv
/pd.read_table
.
This error happens when dtypes
should be explicitly added to the dd.read_csv()
method.
So to fix it check the error response and manually add the dtypes
for each column:
df = dd.read_csv('huge_file.csv', dtype={'col1': 'object'})
DtypeWarning: Columns have mixed types. Specify dtype option on import or set low_memory=False.
To solve this problem use option: low_memory=False
:
df = dd.read_csv('huge_file.csv', `low_memory=False`)