Untitled
unknown
plain_text
4 months ago
3.8 kB
5
Indexable
import java.util.Collection; import java.util.List; import java.util.concurrent.*; import java.util.function.Supplier; import java.util.stream.Collectors; public class VirtualThreadCompletableFutureUtils { private static final ExecutorService virtualExecutor = Executors.newVirtualThreadPerTaskExecutor(); /** * Creates a CompletableFuture that runs the given task using a virtual thread */ public static <T> CompletableFuture<T> supplyAsync(Supplier<T> supplier) { return CompletableFuture.supplyAsync(supplier, virtualExecutor); } /** * Executes multiple tasks concurrently using virtual threads and waits for all to complete */ public static <T> List<T> executeAll(Collection<Supplier<T>> tasks) throws ExecutionException, InterruptedException { List<CompletableFuture<T>> futures = tasks.stream() .map(VirtualThreadCompletableFutureUtils::supplyAsync) .collect(Collectors.toList()); CompletableFuture<Void> allOf = CompletableFuture.allOf( futures.toArray(new CompletableFuture[0]) ); allOf.join(); return futures.stream() .map(CompletableFuture::join) .collect(Collectors.toList()); } /** * Executes multiple tasks concurrently with a timeout */ public static <T> List<T> executeAllWithTimeout( Collection<Supplier<T>> tasks, long timeout, TimeUnit unit) throws ExecutionException, InterruptedException, TimeoutException { List<CompletableFuture<T>> futures = tasks.stream() .map(VirtualThreadCompletableFutureUtils::supplyAsync) .collect(Collectors.toList()); CompletableFuture<Void> allOf = CompletableFuture.allOf( futures.toArray(new CompletableFuture[0]) ); allOf.get(timeout, unit); return futures.stream() .map(CompletableFuture::join) .collect(Collectors.toList()); } /** * Executes tasks with retry capability */ public static <T> CompletableFuture<T> supplyAsyncWithRetry( Supplier<T> supplier, int maxRetries, long delay, TimeUnit unit) { return supplyAsync(supplier).handle((result, throwable) -> { if (throwable != null && maxRetries > 0) { try { unit.sleep(delay); return supplyAsyncWithRetry(supplier, maxRetries - 1, delay, unit).join(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new CompletionException(e); } } if (throwable != null) { throw new CompletionException(throwable); } return result; }); } /** * Creates a batch processor that processes items in parallel using virtual threads */ public static <T, R> List<R> processBatch( Collection<T> items, int batchSize, Function<T, R> processor) { return Lists.partition(new ArrayList<>(items), batchSize) .stream() .map(batch -> batch.stream() .map(item -> supplyAsync(() -> processor.apply(item))) .collect(Collectors.toList())) .flatMap(batchFutures -> { CompletableFuture.allOf(batchFutures.toArray(new CompletableFuture[0])).join(); return batchFutures.stream().map(CompletableFuture::join); }) .collect(Collectors.toList()); } }
Editor is loading...
Leave a Comment