Untitled
unknown
plain_text
a year ago
3.8 kB
9
Indexable
import java.util.Collection;
import java.util.List;
import java.util.concurrent.*;
import java.util.function.Supplier;
import java.util.stream.Collectors;
public class VirtualThreadCompletableFutureUtils {
private static final ExecutorService virtualExecutor = Executors.newVirtualThreadPerTaskExecutor();
/**
* Creates a CompletableFuture that runs the given task using a virtual thread
*/
public static <T> CompletableFuture<T> supplyAsync(Supplier<T> supplier) {
return CompletableFuture.supplyAsync(supplier, virtualExecutor);
}
/**
* Executes multiple tasks concurrently using virtual threads and waits for all to complete
*/
public static <T> List<T> executeAll(Collection<Supplier<T>> tasks) throws ExecutionException, InterruptedException {
List<CompletableFuture<T>> futures = tasks.stream()
.map(VirtualThreadCompletableFutureUtils::supplyAsync)
.collect(Collectors.toList());
CompletableFuture<Void> allOf = CompletableFuture.allOf(
futures.toArray(new CompletableFuture[0])
);
allOf.join();
return futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
}
/**
* Executes multiple tasks concurrently with a timeout
*/
public static <T> List<T> executeAllWithTimeout(
Collection<Supplier<T>> tasks,
long timeout,
TimeUnit unit) throws ExecutionException, InterruptedException, TimeoutException {
List<CompletableFuture<T>> futures = tasks.stream()
.map(VirtualThreadCompletableFutureUtils::supplyAsync)
.collect(Collectors.toList());
CompletableFuture<Void> allOf = CompletableFuture.allOf(
futures.toArray(new CompletableFuture[0])
);
allOf.get(timeout, unit);
return futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
}
/**
* Executes tasks with retry capability
*/
public static <T> CompletableFuture<T> supplyAsyncWithRetry(
Supplier<T> supplier,
int maxRetries,
long delay,
TimeUnit unit) {
return supplyAsync(supplier).handle((result, throwable) -> {
if (throwable != null && maxRetries > 0) {
try {
unit.sleep(delay);
return supplyAsyncWithRetry(supplier, maxRetries - 1, delay, unit).join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new CompletionException(e);
}
}
if (throwable != null) {
throw new CompletionException(throwable);
}
return result;
});
}
/**
* Creates a batch processor that processes items in parallel using virtual threads
*/
public static <T, R> List<R> processBatch(
Collection<T> items,
int batchSize,
Function<T, R> processor) {
return Lists.partition(new ArrayList<>(items), batchSize)
.stream()
.map(batch -> batch.stream()
.map(item -> supplyAsync(() -> processor.apply(item)))
.collect(Collectors.toList()))
.flatMap(batchFutures -> {
CompletableFuture.allOf(batchFutures.toArray(new CompletableFuture[0])).join();
return batchFutures.stream().map(CompletableFuture::join);
})
.collect(Collectors.toList());
}
}Editor is loading...
Leave a Comment