Applied Data Science Case Study, Part 4: Performant Processing

Reading Time: 8 minutes

In this series, we’re walking through a data science business problem from start to finish. We have introduced the business caseassessed the data available to us, and performed some exploratory analysis.

During that exploratory analysis, we needed to do some time-intensive data processing. We have a dataframe with three columns: physician_id, procedure_code, and number_of_patients. It contains the number of times each physician has performed each procedure. If a physician performs multiple procedures, their id will appear in the physician_id column once for each procedure they perform.

before

We’d like to convert this into a dataframe with one row for each physician and one column for each procedure, with the intersection displaying the number of times the physician has performed that procedure.

after

The first iteration of this transformation was time-intensive. This post shares how I improved the performance of that transformation.

A special thank-you goes out to my mentor and friend, Q Ethan McCallum, for recommending I try the thing I ultimately did to solve this. If you’re looking for someone to help determine your company’s data strategy, you’ve gotta give him a call.

The Invocation

Here is the part of the last notebook where we run the aforementioned conversion:

from specialty_data import extract_procedure_features

combined_numerized_procedures = extract_procedure_features(physician_procedures)
combined_numerized_procedures.head(10)
Out[7]:
Unnamed: 0 physician_id procedure_00100 procedure_00103 procedure_00104 procedure_00120 procedure_00140 procedure_00142 procedure_00144 procedure_00145 procedure_Q9961 procedure_Q9962 procedure_Q9963 procedure_Q9965 procedure_Q9966 procedure_Q9967 procedure_Q9969 procedure_Q9970 procedure_Q9974 procedure_S0281
0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
1 1 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
2 2 2 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
3 3 3 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
4 4 4 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
5 5 5 0 0 0 0 0 0 0 0 0 0 0 0 0 25 0 0 0 0
6 6 6 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
7 7 7 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
8 8 8 0 0 0 0 0 17 0 0 0 0 0 0 0 0 0 0 0 0
9 9 9 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0

10 rows × 3142 columns

I wrote a function called extract_procedure_features and stored it in a file called specialty_data. That function takes in the dataframe that tells us how many of each procedure a physician has performed, and it returns a dataframe with a row for each physician and a column for each procedure, with the number of times the physician has performed that procedure at the intersection.

First Implementation: 40 Minute Run Time

This is the way that I originally wrote this function:


import numpy as np
import pandas as pd
from datetime import date
def extract_procedure_features(physician_procedures):
one_hot_procedures = pd.get_dummies(physician_procedures.procedure_code, prefix='procedure')
dummied_procedures = pd.concat([physician_procedures.number_of_patients, one_hot_procedures], axis=1)
def numerize(row):
return np.asarray(row.number_of_patients) * np.asarray(row)
numerized_procedures = dummied_procedures.apply(numerize, axis=1)\
.drop('number_of_patients', axis=1)
combined_numerized_procedures = numerized_procedures.assign(physician_id=physician_procedures.physician_id)\
.groupby('physician_id')\
.sum()\
.reset_index()
combined_numerized_procedures.to_csv('path/to/data.csv')
return combined_numerized_procedures

Why does this code take so long to run?

See the .apply on line 12? Pandas doesn’t parallelize that; it’s an iterative operation. In our case, it’s iterating through about 6 million rows. So even though the apply method, numerize, is not computationally expensive, it has to happen a lot of times.

I didn’t want this process to take 40 minutes, so I messed around for a day or two to increase the efficiency of the process. Most of the time I ran into the same obstacle:  I couldn’t figure out how to count vectorize by a number instead of by a collection of instances of a procedure_id. Could I have dropped into C and gotten this to go a lot faster, the way scikit-learn’s CountVectorizer and TfidfVectorizer do? Maybe I could have, but I needed to move on from this. If you’d like to implement that, I’d love to see the result!

Second Implementation: <10 Minute Run Time

Instead, I parallelized the process described above across the cores of my local machine using dask, a library for parallel processing in pandas.

Here’s how that looks:


import numpy as np
import pandas as pd
from dask import dataframe as dd
from dask.multiprocessing import get
from multiprocessing import cpu_count
num_cores = cpu_count()
from datetime import date
def extract_procedure_features(physician_procedures):
one_hot_procedures = pd.get_dummies(physician_procedures.procedure_code, prefix='procedure')
dummied_procedures = pd.concat([physician_procedures.number_of_patients, one_hot_procedures], axis=1)
def numerize(row):
return np.asarray(row.number_of_patients) * np.asarray(row)
numerized_procedures = dd.from_pandas(dummied_procedures, npartitions=num_cores)\
.map_partitions(numerize)\
.compute(get=get)\
.drop('number_of_patients', axis=1)
combined_numerized_procedures = numerized_procedures.assign(physician_id=physician_procedures.physician_id)\
.groupby('physician_id')\
.sum()\
.reset_index()
combined_numerized_procedures.to_csv(processed_data_path)
return combined_numerized_procedures

We’ve passed our operation over to dask to perform (see lines 16-19). Steps:

  1. dd.from_pandas converts a pandas dataframe into the dask format that allows parallelization. The first argument is the dataframe (though dask also supports other types of collections). The second argument, num_partitions, indicates the number of partitions into which the collection should be divided. We divide across the number of cores on the machine we’re using, drawn from cpu_count() in the python multiprocessing module.
  2. map_partitions indicates the operation to perform. Its name comes from map, the operation we perform on collections to do something to each element and return a collection with the transformed value. Our transformation, numerize, will be performed on all the partitions.
  3. The compute operation comes built into dask collections, and it runs the transformation specified in map_partitions. They’re separate calls because you can have compute run several transformations at once, if you like.
  4. The get call specifies which scheduler to use for the computation. We are using a scheduler backed by a process pool that we got from dask.multiprocessing. The other options are, from the documentation:
    • dask.threaded.get: a scheduler backed by a thread pool
    • dask.get: a synchronous scheduler, good for debugging
    • distributed.Client.get: a distributed scheduler for executing graphs

When we run this dask version, the transformation still takes some time, but even on the smallest EC2 instances it takes fewer than 10 minutes.

I found the dask documentation articulate, helpful, and up to date as I executed this change.

But what about pandas parallelization?

I want to emphasize something here, because I have had four separate data scientists tell me that pandas speeds up operations by running them in parallel. Pandas does not do this.Not for str methods, not for apply, none of it. Pandas shells some stuff out to numpy, which iterates quickly for reasons that are also not parallelization (we discussed them here). In order to execute parallel processing, we need something more than pandas.

As far as I can tell, the reason people think pandas does parallelization comes from the changes to in version 0.18 that released the global interpreter lock (GIL) on certain operations like groupby and value_counts.

What is the GIL? Python implements a global interpreter lock, which means that it only allows access to the Python interpreter on one thread at a time. The maintainers made this decision to simplify memory management, and it means that parallelizing across multiple cores doesn’t work in pure Python.

When pandas released the GIL on some operations, that means that those operations use (or were changed to use) collections in a language other than python—usually C, via cython. Cython still assumes that python operations are not thread-safe unless the programmer explicitly releases the GIL. Thomas Nyberg explains this really well.

This change allows parallelization to work on those specific operations, but it doesn’t do parallelization. Data scientists and engineers need to execute their own parallelization, either through ThreadPoolExecutor or via a library that does it, like dask.

Conclusion

Friends don’t let friends tell people that pandas parallelizes operations. It doesn’t.

In this case, though, we needed to perform a combination of one-hot encoding and multiplying by an instance count on a dataframe. Our individual steps are small and fast, but the dataframe is huge. With millions of rows of data, the transformation still took a long time.

To speed it up, I went with a parallel processing library called dask. It has excellent documentation and, in my view, an intuitive API. Could more clever things be done to speed this up? Maybe. To me, this was the best bang-for-buck solution.

Now it’s time to get back to our data science case study! Are you ready to move on to prototyping models? Here we go.

If you liked this post, you might also like:

Another time I talked about performance — example in ruby

A risk-oriented testing strategy that mentions performance — example, again, in ruby

This keynote about refactoring — which discusses the other qualities of code that I think are important

Leave a Reply

This site uses Akismet to reduce spam. Learn how your comment data is processed.