In this series, we’re walking through a data science business problem from start to finish. We have introduced the business case, assessed the data available to us, and performed some exploratory analysis.
During that exploratory analysis, we needed to do some time-intensive data processing. We have a dataframe with three columns: physician_id
, procedure_code
, and number_of_patients
. It contains the number of times each physician has performed each procedure. If a physician performs multiple procedures, their id will appear in the physician_id
column once for each procedure they perform.
We’d like to convert this into a dataframe with one row for each physician and one column for each procedure, with the intersection displaying the number of times the physician has performed that procedure.
The first iteration of this transformation was time-intensive. This post shares how I improved the performance of that transformation.
A special thank-you goes out to my mentor and friend, Q Ethan McCallum, for recommending I try the thing I ultimately did to solve this. If you’re looking for someone to help determine your company’s data strategy, you’ve gotta give him a call.
The Invocation
Here is the part of the last notebook where we run the aforementioned conversion:
from specialty_data import extract_procedure_features
combined_numerized_procedures = extract_procedure_features(physician_procedures)
combined_numerized_procedures.head(10)
We’ve passed our operation over to dask to perform (see lines 16-19). Steps:
dd.from_pandas
converts a pandas dataframe into the dask format that allows parallelization. The first argument is the dataframe (though dask also supports other types of collections). The second argument,num_partitions
, indicates the number of partitions into which the collection should be divided. We divide across the number of cores on the machine we’re using, drawn fromcpu_count()
in the pythonmultiprocessing
module.map_partitions
indicates the operation to perform. Its name comes frommap
, the operation we perform on collections to do something to each element and return a collection with the transformed value. Our transformation,numerize
, will be performed on all the partitions.- The
compute
operation comes built into dask collections, and it runs the transformation specified inmap_partitions
. They’re separate calls because you can havecompute
run several transformations at once, if you like. - The
get
call specifies which scheduler to use for the computation. We are using a scheduler backed by a process pool that we got fromdask.multiprocessing
. The other options are, from the documentation:dask.threaded.get
: a scheduler backed by a thread pooldask.get
: a synchronous scheduler, good for debuggingdistributed.Client.get
: a distributed scheduler for executing graphs
When we run this dask version, the transformation still takes some time, but even on the smallest EC2 instances it takes fewer than 10 minutes.
I found the dask documentation articulate, helpful, and up to date as I executed this change.
But what about pandas parallelization?
I want to emphasize something here, because I have had four separate data scientists tell me that pandas speeds up operations by running them in parallel. Pandas does not do this.Not for str
methods, not for apply
, none of it. Pandas shells some stuff out to numpy
, which iterates quickly for reasons that are also not parallelization (we discussed them here). In order to execute parallel processing, we need something more than pandas.
As far as I can tell, the reason people think pandas does parallelization comes from the changes to in version 0.18 that released the global interpreter lock (GIL) on certain operations like groupby
and value_counts
.
What is the GIL? Python implements a global interpreter lock, which means that it only allows access to the Python interpreter on one thread at a time. The maintainers made this decision to simplify memory management, and it means that parallelizing across multiple cores doesn’t work in pure Python.
When pandas
released the GIL on some operations, that means that those operations use (or were changed to use) collections in a language other than python—usually C, via cython. Cython still assumes that python operations are not thread-safe unless the programmer explicitly releases the GIL. Thomas Nyberg explains this really well.
This change allows parallelization to work on those specific operations, but it doesn’t do parallelization. Data scientists and engineers need to execute their own parallelization, either through ThreadPoolExecutor
or via a library that does it, like dask
.
Conclusion
Friends don’t let friends tell people that pandas parallelizes operations. It doesn’t.
In this case, though, we needed to perform a combination of one-hot encoding and multiplying by an instance count on a dataframe. Our individual steps are small and fast, but the dataframe is huge. With millions of rows of data, the transformation still took a long time.
To speed it up, I went with a parallel processing library called dask
. It has excellent documentation and, in my view, an intuitive API. Could more clever things be done to speed this up? Maybe. To me, this was the best bang-for-buck solution.
Now it’s time to get back to our data science case study! Are you ready to move on to prototyping models? Here we go.
If you liked this post, you might also like:
Another time I talked about performance — example in ruby
A risk-oriented testing strategy that mentions performance — example, again, in ruby
This keynote about refactoring — which discusses the other qualities of code that I think are important