I have a pretty plain vanilla implementation of concurrent.futures.ProcessPoolExecutor -- something like (using Python 3.6):
files = get_files()
processor = get_processor_instance()
with concurrent.futures.ProcessPoolExecutor() as executor:
list(executor.map(processor.process, files))
While the processor is an instance of any of a number of available processor classes, they all share the process method, which looks roughly like this:
def process(self, file):
log.debug(f"Processing source file {file.name}.")
with DBConnection(self.db_url) as session:
file = session.merge(file)
session.refresh(file)
self._set_file(file)
timer = perf_counter()
try:
self.records = self._get_records()
self._save_output()
except Exception as ex:
log.warning(f"Failed to process source file {file.ORIGINAL_NAME}: {ex}")
self.error_time = time.time()
self.records = None
else:
process_duration = perf_counter() - timer
log.info(f'File {file.name} processed in {process_duration:.6f} seconds.')
file.process_duration = process_duration
session.commit()
Implementation of the _get_records and _save_output methods vary per class, but my problem is with handling of errors. I'm deliberately testing it so that one of those two methods runs out of memory, but I would expect the except block above to catch it and move the the next file -- and this is precisely what happens when I run the code in a single process.
If I use ProcessPoolExecutor as described above, it raises the BrokenProcessPool exception and kills all execution:
Traceback (most recent call last):
File "/vagrant/myapp/myapp.py", line 94, in _process
list(executor.map(processor.process, files))
File "/home/ubuntu/.pyenv/versions/3.6.3/lib/python3.6/concurrent/futures/process.py", line 366, in _chain_from_iterable_of_lists
for element in iterable:
File "/home/ubuntu/.pyenv/versions/3.6.3/lib/python3.6/concurrent/futures/_base.py", line 586, in result_iterator
yield fs.pop().result()
File "/home/ubuntu/.pyenv/versions/3.6.3/lib/python3.6/concurrent/futures/_base.py", line 432, in result
return self.__get_result()
File "/home/ubuntu/.pyenv/versions/3.6.3/lib/python3.6/concurrent/futures/_base.py", line 384, in __get_result
raise self._exception
concurrent.futures.process.BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.
I can of course catch the BrokenProcessPool in the calling code, but I'd prefer to handle the error internally and proceed to the next file.
I also tried using the standard multiprocessing.Pool object, like this:
with multiprocessing.Pool() as pool:
pool.map(processor.process, files)
In this case, the behaviour is even weirder: after starting to process the first two files, which raise the out of memory error, it moves on to processing the later files, which are smaller so get processed completely. However, the except block apparently never gets triggered (no log messages, no error_time), and the application just hangs, neither finishing nor doing anything, until killed manually.
I was hoping that the try..except block would make each process self-contained, handling its own errors without affecting the main application. Any ideas how to achieve that?