Source code for cbi_toolbox.parallel

"""
The parallel package provides tools to split parallel computations.
"""

# Copyright (c) 2020 Idiap Research Institute, http://www.idiap.ch/
# Written by François Marelli <francois.marelli@idiap.ch>
#
# This file is part of CBI Toolbox.
#
# CBI Toolbox is free software: you can redistribute it and/or modify
# it under the terms of the 3-Clause BSD License.
#
# CBI Toolbox is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# 3-Clause BSD License for more details.
#
# You should have received a copy of the 3-Clause BSD License along
# with CBI Toolbox. If not, see https://opensource.org/licenses/BSD-3-Clause.
#
# SPDX-License-Identifier: BSD-3-Clause

import os
import concurrent.futures


[docs]def distribute_bin(dimension, rank, workers): """ Computes the start index and bin size to evenly split array-like data into multiple bins. Parameters ---------- dimension : int The size of the array to distribute. rank : int, optional The rank of the worker. workers : int, optional The total number of workers. Returns ------- (int, int) The start index of this bin, and its size. The distributed data should be array[start:start + bin_size]. """ if workers > dimension: workers = dimension if rank >= workers: return 0, 0 bin_size = dimension // workers large_bin_number = dimension - bin_size * workers bin_index = 0 if rank < large_bin_number: bin_size += 1 else: bin_index += large_bin_number bin_index += rank * bin_size return bin_index, bin_size
[docs]def distribute_bin_all(dimension, workers): """ Computes the start indexes and bin sizes of all splits to distribute computations across multiple workers. Parameters ---------- dimension : int the size of the array to be distributed workers : int, optional the amount of workers Returns ------- ([int], [int]) The list of start indexes and the list of bin sizes to distribute data. """ original_size = workers if workers > dimension: workers = dimension bin_size = dimension // workers large_bin_number = dimension - bin_size * workers bin_index = 0 bin_indexes = [] bin_sizes = [] for j_index in range(original_size): if j_index >= workers: bin_indexes.append(0) bin_sizes.append(0) continue l_bin_size = bin_size if j_index < large_bin_number: l_bin_size += 1 bin_indexes.append(bin_index) bin_sizes.append(l_bin_size) bin_index += l_bin_size return bin_indexes, bin_sizes
[docs]def parallelize(func, size, workers=None): """ Launches a function multiple times in parallel using multithreading. Useful only if the GIL is released in the parallelized function (this is the case for many ``numpy`` and ``scipy`` routines). Parameters ---------- func : function (callable) The function that will be run in parallel. It must take 2 arguments, which are the returns of ``distribute_bin_all`` corresponding to the thread pool (the list of starting indexes of data bins, and the list of bin sizes). size : int The size of the array that will be split between workers. workers : int, optional The maximum number of workers, by default None (will be maximized for the system). Returns ------- iterator An iterator containing the results of the function calls, in a random order (see concurrent.futures.ThreadPoolExecutor.map). """ try: omp_threads = int(os.environ["OMP_NUM_THREADS"]) except KeyError: omp_threads = os.cpu_count() uworkers = os.cpu_count() if workers is None else workers try: affinity = len(os.sched_getaffinity(0)) except AttributeError: affinity = uworkers workers = min(omp_threads, affinity, size, uworkers) if workers == 1: out = func(0, size) outputs = [out] else: bins, bin_dims = distribute_bin_all(size, workers) with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as executor: outputs = executor.map(func, bins, bin_dims) return outputs