Use multiple lists to collect multiprocessing results with one callback function while using python multiprocessing module pool.apply_async function

The following is the code. Because the input file is kindof big, I use pool.apply_async function from python multiprocessing module. I was just wondering, does this make sense?

if name == ‘main’:
pool = mp.Pool(mp.cpu_count()-1)
print(“cpu counts:” + str(mp.cpu_count()))
with open("./test.txt") as f:
nextLineByte = f.tell()
for line in iter(f.readline, ‘’):
pool.apply_async(processWrapper, args=(nextLineByte,), callback=logResult)
nextLineByte = f.tell()
pool.close()
pool.join()

def processWrapper(lineByte):
with open("./test.txt") as filHandle:
filHandle.seek(lineByte)
line = filHandle.readline()
dict = { “ID”:ID, “DT”:DT, “SQ”:SQ, “DE”:DE }
lineProcedure = dict.get(line[:2],0)
if lineProcedure:
return lineProcedure(line, filHandle)

def logResult(result):
if result == None:
return
if result[0] == “ID”:
strainNameIDsLst.append(result[1])
elif result[0] == “DT”:
samplingDatesLst.append(result[1])
elif result[0] == “SQ”:
seqLengthsLst.append(result[1])
aCountsLst.append(result[2])
cCountsLst.append(result[3])
gCountsLst.append(result[4])
tCountsLst.append(result[5])
seqsLst.append(result[6])
elif result[0] == “DE”:
countriesLst.append(result[1])
virusLst.append(result[2])

There are also the definition of ID, DT,SQ,DE functions, which are omitted.
Because the data in the input file is ordered in pieces/blocks. In the end, I’d like the i-th items of the ten lists correspond to each other and concatenate them together to form a piece / block.
So, what do you think then? Any comments are greatly appreciated. :wink:

The following is the code. Because the input file is kindof big, I use
pool.apply_async function from python multiprocessing module. I was
just wondering, does this make sense?

Not quite. Python is not a “static” language like C - when you import a
file it is executed. So your function definitions have to occur
before you use them or they won’t yet exist.

So your “if name == ‘main’:” section needs to be after the
function definitions, not before. Conventionally it is the last thing in
the file because that way you know everything has been done by then.

Now, I hear you saying “but then the main programme is so hard to see -
it should be up front”. True. When the “if …main” stuff is very
short, I leave it at the bottom. But as soon as it gets nontrivial I
tend to do this:

def main(argv):
    ... the main function here ...

... the rest of the module ...

if __name__ == '__main__':
    import sys
    sys.exit(main(sys.argv))

(That also handily passes the command line arguments to main(), should
you want to use them.)

Here it is ok for main() to preceed the other functions because it
doesn’t get run until after those functions are defined, at the bottom
of the module.

It also has the advantage that the variable in main() are naturally
local variables like any other function, and d not pollute the global
variable area (generally a bad thing).

Let look at your code now, rather than the structure:

if name == ‘main’:
pool = mp.Pool(mp.cpu_count()-1)
print(“cpu counts:” + str(mp.cpu_count()))
with open("./test.txt") as f:
nextLineByte = f.tell()
for line in iter(f.readline, ‘’):
pool.apply_async(processWrapper, args=(nextLineByte,), callback=logResult)
nextLineByte = f.tell()

This has always seemed overly complex to me. In particular, why not just
pass easy line of text to processWrapper()?

# you don't need the leading "./" here - the current directory
# is what is used if you don't use an absolute path
with open("test.txt") as f:
    for line in f:
        line = line.rstrip()
        pool.apply_async(processWrapper, args=(line,), callback=logResult)

and trhen processWrapper() can just receive the line directly instead of
lineByte, and not bother with separately opening the text file, seeking
to some position, reading the line again.

pool.close()
pool.join()

That looks fine. Then:

def processWrapper(line):

and skip the whole open/seek/read bit.

def logResult(result):
if result == None:
return

Always test for “result is None”, not “result == None”. None is a
singleton, we test for that specific object, not its “value”. It has a
value, but in some contexts that is treated a lot like zero (in that it
is “falsey”) and using “==” with None just leads you into unreliable
thought processes.

The only remaining thing that occurs to me, and I have not tested
whether this is so with multiprocessing because I very rarely use it, is
that the callback functions may run concurrently in their own threads.
That would be a natural, to my mind, way of managing subprocesses: split
of a thread to do the (fire subprocess, collect result, run callback)
steps because that way it can wait individually for each subprocess. It
may do something cleverer.

Anyway, if it uses threads then you want the callbacks not to run in
paralllel but in series, because they manipulate the same lists.

You can do this with a Lock:

from threading import Lock

lock = Lock()
... issue subprocess stuff ...

def logResult(result):
    with lock:
        ... main body of logResult here ...

Someone who uses multiprocessing regularly may tell me my concerns are
unfounded.

Cheers,
Cameron Simpson cs@cskk.id.au