Untitled
unknown
plain_text
8 months ago
1.4 kB
5
Indexable
rank = self.comm.Get_rank()
size = self.comm.Get_size()
bytes_transferred = src_array.itemsize * src_array.size
if rank == 0:
# Root process: accumulate the reduction result
np.copyto(dest_array, src_array)
temp_buffer = np.empty_like(src_array)
for i in range(1, size):
self.comm.Recv(temp_buffer, source=i)
# Apply reduction
if op == MPI.SUM:
np.add(dest_array, temp_buffer, out=dest_array)
elif op == MPI.MIN:
np.minimum(dest_array, temp_buffer, out=dest_array)
elif op == MPI.MAX:
np.maximum(dest_array, temp_buffer, out=dest_array)
else:
raise NotImplementedError("Only MPI.SUM, MPI.MIN, and MPI.MAX are supported.")
# Broadcast the final reduced result
for i in range(1, size):
self.comm.Send(dest_array, dest=i)
# Update total bytes transferred
self.total_bytes_transferred += 2 * bytes_transferred * (size - 1)
else:
# Non-root processes: send data to root, then receive the final result
self.comm.Send(src_array, dest=0)
self.comm.Recv(dest_array, source=0)
# Update total bytes transferred
self.total_bytes_transferred += 2 * bytes_transferredEditor is loading...
Leave a Comment