Multiprocessing in Python for Data Science

For loops are very convenient. With a combination of a for loop and some if-else statements you can do just about anything. There’s just one problem though, for loops are kinda slow. There are many ways to speed up the code, the best usually, in the data science world, to use vectorized NumPy or Pandas solutions. Yet if you are stuck with a for loop, you are in what’s called an ’embarrassingly parallel’ situation. That is, a for loop can easily be performed by many workers, not just one, and thus be made much faster.

First off, some background. There are two related forms of parallelization: multithreading and multiprocessing. Multithreading has lower overhead and uses shared memory, while multiprocessing uses different environments entailing more overhead both in compute time and memory. For reasons of simplicity and robustness, Python was built with the Global Interpreter Lock (GLI). It prevents multithreading, but allows multiprocessing. Except Python does allow multithreading, as long as your code doesn’t actually use any Python code (hint: Cython and stuff). Confused yet? The conclusion of all this is that in Python, unless you are a highly advanced user, you can use multiprocessing (but not threading) to help make your code faster.

Things get even more confusing because your code may already be using multiprocessing or multithreading. Here’s where you might expect to see it:

  1. BLAS level multithreading, ie in Intel MKL
  2. Lower-level library multithreading/multiprocessing ie in scikit-learn
  3. Your program multiprocessing

The BLAS (Basic Linear Algebra Subprogram) varies depending on your system, and some of them have automatic multithreading. The BLAS takes care of feeding linear algebra instructions to the CPU and is what actually performs your matrix calculations regardless of whether you are in Python, Matlab, or Excel. Intel MKL (Intel’s optimized BLAS) is built into Anaconda distributions of Python and will use multithreading by default. You can check your numpy’s BLAS in np.show_config(). By the way, if you are not on an Intel system, avoid Intel MKL as it is designed to throttle itself if it fails to detect an Intel CPU. The next level up, packages like scikit-learn often have the option to use parallelization. Some packages have multiprocessing on by default.

It’s important to run you code in an un-parallelized way first and see if CPU use is above what you would expect for a single core. Say you are on a four core system, and some sections of your code already use 4 threads. If you in turn add parallelization, your program may well be trying to ask for 4 * 4 cores/threads. Going above your expected core count can lead to the code actually getting slower as processes fight among themselves for resources. It may also outright crash your computer. Crashing in weird and funky ways is all too often what happens when CPU or GPU parallelization is attempted.

At this point, you are still using a for loop and you have checked to see that your current for loop uses just around 1 core of CPU for itself. To make it parallel, you have three basic choices: concurrent.futures and the multiprocessing libraries built into Python 3, and joblib – a relatively minimal third-party package used by (indeed built for) scikit-learn.

The most flexible and most well-documented option is multiprocessing. The newer built-in option is concurrent.futures, and it is a little faster than multiprocessing. On the other hand, joblib is the smartest option – automatically reusing threads, attempting to reasonably compensate for BLAS level hyperthreading, and also allowing integration with Dask for distribution across a cluster.

Multiprocessing has its problems. It is a bit picky about how it runs on Windows, and it doesn’t work consistently in interactive (iPython) sessions. One thing to very much be aware of is that is takes a while to recruit a worker pool. On the two systems I have been testing on here, I saw it vary between 1s and 10s delays (with no clear correlation with why sometimes it was faster or slower). That means your for loop needs to run > ~20 seconds long before you will notice any return. Indeed, I would not usually trouble with multiprocessing unless your task is running for minutes, hours, or better yet days.

With a relatively simple problem, the overhead of multiprocessing far outweighs any speedup it delivers to the computation.

Running a comparison of DataFrame multiplication using a DataFrame of 12 columns, each with rows of the identical 1 to 2000 sequence, it is quite obvious that a vectorized solution is three orders of magnitude faster than multiprocessing because the overhead for creating the pool is so great. Joblib multiprocessing was nearly identical to a regular for loop as it benefits from automatically reusing an existing process pool. Yet on a much more computationally intensive problem, that of an ETS forecast on the same dataset, multiprocessing begins to stand out, as a 3 process job is almost exactly 3x faster than a conventional for loop (in this case joblib was running on an existing pool, hence minimal overhead).

Even in a relatively small dataset, multiprocessing with joblib shows a speedup of 3x for 3 processes on a more computationally intensive problem.

Side-note on hyperthreading: many CPUs use ‘hyperthreading’ which basically doubles the thread count so there are 2 threads for every 1 core. This allows the CPU overall to be faster, but it also confuses many programs into thinking you have twice as many CPUs as actually you have. For example, many systems have 4 cores but os.cpu_count() will report 8 cores – confused by the number of threads provided by hyperthreading.

In this example, the program is saturated at around 4 processes, and even begins to slow down again once num_processes > num_cpus.

In another example with 9 time series and running a number of models, there is little performance difference between using 4 processes and using 8 processes. While using 4 processes cut more than half of the runtime off a single CPU run, the remainder of the code in this program, running sequentially, will not see any performance gain, hence the ‘leveling out’ observed.

Overall, my recommendation, if multiprocessing is appropriate, is to use joblib for the best performance utilizing multiple parallelized loops, but for a single multiprocessing loop, either of the built in packages multiprocessing or concurrent.futures is more convenient, not requiring an external dependency.

Further Readings:
https://homes.cs.washington.edu/~jmschr/lectures/Parallel_Processing_in_Python.html
https://conference.scipy.org/proceedings/scipy2018/anton_malakhov.html

Code for Examples

mpsource.py

import numpy as np
import pandas as pd
from statsmodels.tsa.holtwinters import ExponentialSmoothing


def series_by_column(lst, col):
    out = (col + 2) ** 2
    lst.append(out)

def series_by_column_cf(col):
    return (col + 2) ** 2

def df_by_column(df, col):
    out = (df[col] + 2) ** 2
    return out


def forecast_by_column(df, col):
    current_series = df[col]
    series_name = current_series.name
    esModel = ExponentialSmoothing(
        current_series,
        damped=False,
        trend="additive",
        seasonal=None,
        seasonal_periods=7,
        freq="D",
    ).fit()
    srt = current_series.shape[0]
    esPred = esModel.predict(start=srt, end=srt + 10)
    esPred = pd.Series(esPred)
    esPred.name = series_name
    return esPred


def forecast_by_series(lst, col):
    current_series = col
    series_name = current_series.name
    esModel = ExponentialSmoothing(
        current_series,
        damped=False,
        trend="additive",
        seasonal=None,
        seasonal_periods=7,
        freq="D",
    ).fit()
    srt = current_series.shape[0]
    esPred = esModel.predict(start=srt, end=srt + 10)
    esPred = pd.Series(esPred)
    esPred.name = series_name
    lst.append(esPred)


def forecast_from_series(col):
    current_series = col
    series_name = current_series.name
    esModel = ExponentialSmoothing(
        current_series,
        damped=False,
        trend="additive",
        seasonal=None,
        seasonal_periods=7,
        freq="D",
    ).fit()
    srt = current_series.shape[0]
    esPred = esModel.predict(start=srt, end=srt + 10)
    esPred = pd.Series(esPred)
    esPred.name = series_name
    return esPred

mptest.py

import numpy as np
import pandas as pd
import timeit
from mpsource import (
    series_by_column,
    series_by_column_cf,
    df_by_column,
    forecast_by_column,
    forecast_by_series,
    forecast_from_series
)


n_jobs = 3
len_idx = 2000
ltrs = ["A", "B", "C", "D", "E", "F", "G", "H", "I", "J", "K", "L"]
idx = pd.date_range(start="2020-01-01", periods=len_idx, freq="D")
df = pd.DataFrame(
    np.linspace(
        np.repeat([1], len(ltrs)), np.repeat([len_idx], len(ltrs)), num=len_idx, axis=0
    ),
    columns=ltrs,
    index=idx,
)
df = df.astype(int)
col_order = df.columns

# For loop no parallel
start_time_for = timeit.default_timer()
out_list = []
for col in df.columns:
    out_list.append((df[col] + 2) ** 2)
full_df = pd.concat(out_list, axis=1)
elapsed_for = timeit.default_timer() - start_time_for

# This is how you'd normally do a simple thing like this.
start_time_pd = timeit.default_timer()
full_df = (df + 2) ** 2
elapsed_pd = timeit.default_timer() - start_time_pd


start_time_cf = timeit.default_timer()
series_list = [df[col] for col in df.columns]
if __name__ == "__main__":
    from concurrent.futures import ProcessPoolExecutor
    from functools import partial
    with ProcessPoolExecutor(max_workers=n_jobs) as executor:
        res = executor.map(series_by_column_cf, series_list)
        dfs = pd.concat(res, axis=1)
    dfs = dfs[col_order]
else:
    print("not __main__!")
elapsed_cf = timeit.default_timer() - start_time_cf


start_time_mp = timeit.default_timer()
series_list = [df[col] for col in df.columns]
if __name__ == "__main__":
    from multiprocessing import Pool, Manager
    from functools import partial
    with Pool(processes=n_jobs) as pool, Manager() as manager:
        dfs_list = manager.list()
        res = pool.map_async(partial(series_by_column, dfs_list), series_list)
        res.wait()
        dfs = pd.concat(dfs_list, axis=1)
    dfs = dfs[col_order]
else:
    print("not __main__!")
elapsed_mp = timeit.default_timer() - start_time_mp


# JOBLIB Parallelization
start_time_jl = timeit.default_timer()
if __name__ == "__main__":
    cols = df.columns.tolist()
    from joblib import Parallel, delayed

    df_list = Parallel(n_jobs=n_jobs)(
        delayed(df_by_column, check_pickle=False)(df, col) for (col) in cols
    )
    dfs_joblib = pd.concat(df_list, axis=1)
else:
    print("not __main__!")
elapsed_jl = timeit.default_timer() - start_time_jl


runs = 3
# JOBLIB reuse of pool by default
start_time_jl2 = timeit.default_timer()
for i in range(runs):
    if __name__ == "__main__":
        cols = df.columns.tolist()
        from joblib import Parallel, delayed

        df_list = Parallel(n_jobs=n_jobs)(
            delayed(df_by_column, check_pickle=False)(df=df, col=col) for (col) in cols
        )
        dfs_joblib = pd.concat(df_list, axis=1)
elapsed_jl2 = (timeit.default_timer() - start_time_jl2) / runs

# MP without any reuse of pool
start_time_mp2 = timeit.default_timer()
series_list = [df[col] for col in df.columns]
for i in range(runs):
    if __name__ == "__main__":
        from multiprocessing import Pool, Manager
        from functools import partial

        with Pool(processes=n_jobs) as pool, Manager() as manager:
            dfs_list = manager.list()
            res = pool.map_async(partial(series_by_column, dfs_list), series_list)
            res.wait()
            dfs = pd.concat(dfs_list, axis=1)
        dfs = dfs[col_order]
elapsed_mp2 = (timeit.default_timer() - start_time_mp2) / runs

# CF without any reuse of pool
start_time_cf2 = timeit.default_timer()
series_list = [df[col] for col in df.columns]
for i in range(runs):
    if __name__ == "__main__":
        from concurrent.futures import ProcessPoolExecutor
        from functools import partial
        executor = ProcessPoolExecutor(max_workers=n_jobs)
        res = executor.map(series_by_column_cf, series_list)
        dfs = pd.concat(res, axis=1)
        dfs = dfs[col_order]
elapsed_cf2 = (timeit.default_timer() - start_time_cf2) / runs

print(
    f"Vectorized is {elapsed_pd}\nFor loop is {elapsed_for}\n\nMultiprocessing lib is {elapsed_mp}\nConcurrent.futures is {elapsed_cf}\nJoblib is {elapsed_jl}\n\nMP multiple runs {elapsed_mp2}\nCF multiple runs {elapsed_cf2}\nJoblib multiple runs {elapsed_jl2}\n "
)


# yes a linear reg would be fine on this data, but ETS needs more parallel
# For Loop Forecasting
start_time_for_f = timeit.default_timer()
out_list = []
for col in df.columns:
    out_list.append(forecast_by_series(out_list, df[col]))
forecast_df = pd.concat(out_list, axis=1)
elapsed_for_f = timeit.default_timer() - start_time_for_f

# Joblib Forecasting
# additional arguments passed through joblib need to have explicity keywords=keyword_value
start_time_jl_f = timeit.default_timer()
if __name__ == "__main__":
    cols = df.columns.tolist()
    from joblib import Parallel, delayed

    df_list = Parallel(n_jobs=n_jobs)(
        delayed(forecast_by_column, check_pickle=False)(df=df, col=col) for (col) in cols
    )
    forecast_df_joblib = pd.concat(df_list, axis=1)
elapsed_jl_f = timeit.default_timer() - start_time_jl_f

# MP Forecasting
start_time_mp_f = timeit.default_timer()
series_list = [df[col] for col in df.columns]
if __name__ == "__main__":
    from multiprocessing import Pool, Manager
    from functools import partial

    with Pool(processes=n_jobs) as pool, Manager() as manager:
        dfs_list = manager.list()
        res = pool.map_async(partial(forecast_by_series, dfs_list), series_list)
        res.wait()
        dfs = pd.concat(dfs_list, axis=1)
    dfs = dfs[col_order]
elapsed_mp_f = timeit.default_timer() - start_time_mp_f

# Apply Forecast
start_time_apply_f = timeit.default_timer()
forecast_apply = df.apply(forecast_from_series, raw=False, axis=0)
elapsed_apply_f = timeit.default_timer() - start_time_apply_f

# give it time to get the rest of the printing out
import time
time.sleep(5)
print(
    f"Vectorized is {elapsed_pd}\nFor loop is {elapsed_for}\n\nMultiprocessing lib is {elapsed_mp}\nConcurrent.futures is {elapsed_cf}\nJoblib is {elapsed_jl}\n\nMP multiple runs {elapsed_mp2}\nCF multiple runs {elapsed_cf2}\nJoblib multiple runs {elapsed_jl2}\n "
)
print(
    f"\nFor loop forecasting {elapsed_for_f}\nApply forecasting {elapsed_apply_f}\nJoblib forecasting {elapsed_jl_f}\nMP forecasting {elapsed_mp_f}"
)

Leave a Comment

Your email address will not be published. Required fields are marked *