I’m new to understanding multiprocessing pool. Here is a function which I want to be run and try to find a match of a desired hash. I want the pool to be terminated as soon as the result is found, as other processes may still be running, which is not needed now.
def test(selector, bounds):
k = keccak.new(digest_bits=256)
k.update(bytes(selector, 'utf-8'))
selectorSig = k.hexdigest()[:8]
for i in range(bounds[0], bounds[1]):
str = selector.split('(')[0] + f'_{i}(' + selector.split('(')[1]
k = keccak.new(digest_bits=256)
k.update(bytes(str, 'utf-8'))
if k.hexdigest()[:8] == 'ae026468':
return click.echo(f'Found collision with {str}')
=> This is where I want the pool to be terminated and close any other parallely running process <=
break
# Below is the pseudocode where I'm creating a pool
def callTest():
a_pool = multiprocessing.Pool()
a_pool.starmap(test, iters)
a_pool.close()
a_pool.join()
I have also tried using starmap_async to get the callbacks from function test(return True if hash matches) into results and calling a_pool.terminate() if the result is true (from the results.get()). However, it seems like the remaining processes are still executing their tasks
Thanks for the suggestion, I’ll definitely try it out:
This is what I tried recently:
Using Multiprocessing.Value
def test(selector, bounds):
k = keccak.new(digest_bits=256)
k.update(bytes(selector, 'utf-8'))
selectorSig = k.hexdigest()[:8]
for i in range(bounds[0], bounds[1]):
if cnt.value == 0:
str = selector.split('(')[0] + f'_{i}(' + selector.split('(')[1]
k = keccak.new(digest_bits=256)
k.update(bytes(str, 'utf-8'))
if k.hexdigest()[:8] == 'f1f12cf3':
click.echo(f'Found collision with {str}')
cnt.value = 1
return
else:
return
return
def init_globals(counter):
global cnt
cnt = counter
def callTest():
...
iters = [
tuple([selector, [
int(lower_bound+(floor(number*i)/(cpus))), int(lower_bound+(floor(number*(i+1))/(cpus)))
]]) for i in range(cpus)]
click.echo(iters)
cnt = multiprocessing.Value('i',0)
with multiprocessing.Pool(initializer=init_globals, initargs=(cnt,), processes=10) as a_pool:
a_pool.starmap(test, iters, chunksize=2)
a_pool.close()
a_pool.join()
cpus is the outcome of multiprocessing.cpu_count(). But, it seems like the less number is producing faster result. Is it because a more number of iterates are creating less CPU-intensive tasks?
Although, I’ll definitely try out your suggestion, do you think my implementation can be optimized to produce faster results? Thanks!