python struct.error: 'i' format requires -2147483648 <= number <= 2147483647
Solution 1
The communication protocol between processes uses pickling, and the pickled data is prefixed with the size of the pickled data. For your method, all arguments together are pickled as one object.
You produced an object that when pickled is larger than fits in a i
struct formatter (a four-byte signed integer), which breaks the assumptions the code has made.
You could delegate reading of your dataframes to the child process instead, only sending across the metadata needed to load the dataframe. Their combined size is nearing 1GB, way too much data to share over a pipe between your processes.
Quoting from the Programming guidelines section:
Better to inherit than pickle/unpickle
When using the
spawn
orforkserver
start methods many types frommultiprocessing
need to be picklable so that child processes can use them. However, one should generally avoid sending shared objects to other processes using pipes or queues. Instead you should arrange the program so that a process which needs access to a shared resource created elsewhere can inherit it from an ancestor process.
If you are not running on Windows and use either the spawn
or forkserver
methods, you could load your dataframes as globals before starting your subprocesses, at which point the child processes will 'inherit' the data via the normal OS copy-on-write memory page sharing mechanisms.
Note that this limit was raised for non-Windows systems in Python 3.8, to an unsigned long long (8 bytes), and so you can now send and receive 4 EiB of data. See this commit, and Python issues #35152 and #17560.
If you can't upgrade and you can't make use of resource inheriting, and are not running on Windows, then use this patch:
import functools
import logging
import struct
import sys
logger = logging.getLogger()
def patch_mp_connection_bpo_17560():
"""Apply PR-10305 / bpo-17560 connection send/receive max size update
See the original issue at https://bugs.python.org/issue17560 and
https://github.com/python/cpython/pull/10305 for the pull request.
This only supports Python versions 3.3 - 3.7, this function
does nothing for Python versions outside of that range.
"""
patchname = "Multiprocessing connection patch for bpo-17560"
if not (3, 3) < sys.version_info < (3, 8):
logger.info(
patchname + " not applied, not an applicable Python version: %s",
sys.version
)
return
from multiprocessing.connection import Connection
orig_send_bytes = Connection._send_bytes
orig_recv_bytes = Connection._recv_bytes
if (
orig_send_bytes.__code__.co_filename == __file__
and orig_recv_bytes.__code__.co_filename == __file__
):
logger.info(patchname + " already applied, skipping")
return
@functools.wraps(orig_send_bytes)
def send_bytes(self, buf):
n = len(buf)
if n > 0x7fffffff:
pre_header = struct.pack("!i", -1)
header = struct.pack("!Q", n)
self._send(pre_header)
self._send(header)
self._send(buf)
else:
orig_send_bytes(self, buf)
@functools.wraps(orig_recv_bytes)
def recv_bytes(self, maxsize=None):
buf = self._recv(4)
size, = struct.unpack("!i", buf.getvalue())
if size == -1:
buf = self._recv(8)
size, = struct.unpack("!Q", buf.getvalue())
if maxsize is not None and size > maxsize:
return None
return self._recv(size)
Connection._send_bytes = send_bytes
Connection._recv_bytes = recv_bytes
logger.info(patchname + " applied")
Solution 2
this problem was fixed in a recent PR to python https://github.com/python/cpython/pull/10305
if you want, you can make this change locally to make it work for you right away, without waiting for a python and anaconda release.
SUNDONG
Updated on May 10, 2020Comments
-
SUNDONG about 4 years
Problem
I'm willing to do a feature engineering using multiprocessing module
(multiprocessing.Pool.starmap()
. However, it gives an error message as follows. I guess this error message is about the size of inputs (2147483647 = 2^31 − 1?), since the same code worked smoothly for a fraction(frac=0.05)
of input dataframes(train_scala, test, ts). I convert types of data frame as smallest as possible, however it does not get better.The anaconda version is 4.3.30 and the Python version is 3.6 (64 bit). And the memory size of the system is over 128GB with more than 20 cores. Would you like to suggest any pointer or solution to overcome this problem? If this problem is caused by a large data for a multiprocessing module, How much smaller data should I use to utilize the multiprocessing module on Python3?
Code:
from multiprocessing import Pool, cpu_count from itertools import repeat p = Pool(8) is_train_seq = [True]*len(historyCutoffs)+[False] config_zip = zip(historyCutoffs, repeat(train_scala), repeat(test), repeat(ts), ul_parts_path, repeat(members), is_train_seq) p.starmap(multiprocess_FE, config_zip)
Error Message:
Traceback (most recent call last): File "main_1210_FE_scala_multiprocessing.py", line 705, in <module> print('----Pool starmap start----') File "/home/dmlab/ksedm1/anaconda3/envs/py36/lib/python3.6/multiprocessing/pool.py", line 274, in starmap return self._map_async(func, iterable, starmapstar, chunksize).get() File "/home/dmlab/ksedm1/anaconda3/envs/py36/lib/python3.6/multiprocessing/pool.py", line 644, in get raise self._value File "/home/dmlab/ksedm1/anaconda3/envs/py36/lib/python3.6/multiprocessing/pool.py", line 424, in _handle_tasks put(task) File "/home/dmlab/ksedm1/anaconda3/envs/py36/lib/python3.6/multiprocessing/connection.py", line 206, in send self._send_bytes(_ForkingPickler.dumps(obj)) File "/home/dmlab/ksedm1/anaconda3/envs/py36/lib/python3.6/multiprocessing/connection.py", line 393, in _send_bytes header = struct.pack("!i", n) struct.error: 'i' format requires -2147483648 <= number <= 2147483647
Extra infos
- historyCutoffs is a list of integers
- train_scala is a pandas DataFrame (377MB)
- test is a pandas DataFrame (15MB)
- ts is a pandas DataFrame (547MB)
- ul_parts_path is a list of directories (string)
- is_train_seq is a list of booleans
Extra Code: Method multiprocess_FE
def multiprocess_FE(historyCutoff, train_scala, test, ts, ul_part_path, members, is_train): train_dict = {} ts_dict = {} msno_dict = {} ul_dict = {} if is_train == True: train_dict[historyCutoff] = train_scala[train_scala.historyCutoff == historyCutoff] else: train_dict[historyCutoff] = test msno_dict[historyCutoff] = set(train_dict[historyCutoff].msno) print('length of msno is {:d} in cutoff {:d}'.format(len(msno_dict[historyCutoff]), historyCutoff)) ts_dict[historyCutoff] = ts[(ts.transaction_date <= historyCutoff) & (ts.msno.isin(msno_dict[historyCutoff]))] print('length of transaction is {:d} in cutoff {:d}'.format(len(ts_dict[historyCutoff]), historyCutoff)) ul_part = pd.read_csv(gzip.open(ul_part_path, mode="rt")) ##.sample(frac=0.01, replace=False) ul_dict[historyCutoff] = ul_part[ul_part.msno.isin(msno_dict[historyCutoff])] train_dict[historyCutoff] = enrich_by_features(historyCutoff, train_dict[historyCutoff], ts_dict[historyCutoff], ul_dict[historyCutoff], members, is_train)
-
SUNDONG over 6 yearsWhat exactly is the sys.maxsize in this case? 2147483647 = 2.147GB? Am I able to control the size threshold?
-
Martijn Pieters over 6 years@SUNDONG: Sorry, it's not
sys.maxsize
, it's thei
struct formatter, so a 4-byte integer, signed. You can't control that size threshold. You are passing around objects that are really way, way too large for such sharing. -
SUNDONG over 6 yearsOKAY, I will try loading dataframes in the child method
multiprocess_FE
instead. However, I could pass smaller dataframes (about the size of row = 1,000-10,000) without any problems. -
Emmanuel-Lin over 6 yearsOkay, so one should share inputs. but what about results? (I have the same problem, but on the results of my computation that seems to be too big...)
-
Martijn Pieters over 6 years@Emmanuel-lin: if your results are that large, write them to some kind of shared storage. A file or a database.
-
dpb about 6 years@MartijnPieters Great answers, thank you! Just a comment, though - is this not massively frustrating? Very old mindset. If passing data to subprocesses over a network, for example, I understand the issue; but to do so between processes with local upwards of 50GB RAM, shared buses etc.. - who cares. Should be scalable. Issue a warning for Pete's sake. Don't hard break on a struct.error.
-
andrew over 4 yearswhy is my pickle so big
-
Martijn Pieters over 4 years@andrew: that's impossible to answer without knowing much more about how you are using
multiprocessing
. -
Julien Marrec over 4 years
-
TheTank about 4 years@JulienMarrec So to clarify, the struct.error() has been fixed in python 3.8.0? Correct?
-
Julien Marrec about 4 yearsYes that's what the links I gave say
-
Crispy13 about 4 yearsSo is it the solution to use 'fork' start method?
-
Martijn Pieters about 4 years@Crispy13: if you can load the data you want to share between the processes before starting the child processes, then that's preferable because that's just way, way faster than sending the data to those processes over a pipe. Otherwise, upgrade to Python 3.8 or backport the 3.8 fix into a monkeypatch.
-
information_interchange almost 4 yearsYes, can confirm the error disappears if you just upgrade your python