Untitled

 avatar
unknown
plain_text
24 days ago
1.4 kB
4
Indexable
rank = self.comm.Get_rank()
        size = self.comm.Get_size()

        bytes_transferred = src_array.itemsize * src_array.size
        
        if rank == 0:
            # Root process: accumulate the reduction result
            np.copyto(dest_array, src_array)
            temp_buffer = np.empty_like(src_array)
            for i in range(1, size):
                self.comm.Recv(temp_buffer, source=i)
                # Apply reduction
                if op == MPI.SUM:
                    np.add(dest_array, temp_buffer, out=dest_array)
                elif op == MPI.MIN:
                    np.minimum(dest_array, temp_buffer, out=dest_array)
                elif op == MPI.MAX:
                    np.maximum(dest_array, temp_buffer, out=dest_array)
                else:
                    raise NotImplementedError("Only MPI.SUM, MPI.MIN, and MPI.MAX are supported.")

            # Broadcast the final reduced result
            for i in range(1, size):
                self.comm.Send(dest_array, dest=i)
            # Update total bytes transferred
            self.total_bytes_transferred += 2 * bytes_transferred * (size - 1)
        else:
            # Non-root processes: send data to root, then receive the final result
            self.comm.Send(src_array, dest=0)
            self.comm.Recv(dest_array, source=0)
            # Update total bytes transferred
            self.total_bytes_transferred += 2 * bytes_transferred
Editor is loading...
Leave a Comment