Untitled
unknown
plain_text
24 days ago
1.4 kB
4
Indexable
rank = self.comm.Get_rank() size = self.comm.Get_size() bytes_transferred = src_array.itemsize * src_array.size if rank == 0: # Root process: accumulate the reduction result np.copyto(dest_array, src_array) temp_buffer = np.empty_like(src_array) for i in range(1, size): self.comm.Recv(temp_buffer, source=i) # Apply reduction if op == MPI.SUM: np.add(dest_array, temp_buffer, out=dest_array) elif op == MPI.MIN: np.minimum(dest_array, temp_buffer, out=dest_array) elif op == MPI.MAX: np.maximum(dest_array, temp_buffer, out=dest_array) else: raise NotImplementedError("Only MPI.SUM, MPI.MIN, and MPI.MAX are supported.") # Broadcast the final reduced result for i in range(1, size): self.comm.Send(dest_array, dest=i) # Update total bytes transferred self.total_bytes_transferred += 2 * bytes_transferred * (size - 1) else: # Non-root processes: send data to root, then receive the final result self.comm.Send(src_array, dest=0) self.comm.Recv(dest_array, source=0) # Update total bytes transferred self.total_bytes_transferred += 2 * bytes_transferred
Editor is loading...
Leave a Comment