Untitled

 avatar
unknown
plain_text
4 months ago
3.8 kB
5
Indexable
import java.util.Collection;
import java.util.List;
import java.util.concurrent.*;
import java.util.function.Supplier;
import java.util.stream.Collectors;

public class VirtualThreadCompletableFutureUtils {
    private static final ExecutorService virtualExecutor = Executors.newVirtualThreadPerTaskExecutor();
    
    /**
     * Creates a CompletableFuture that runs the given task using a virtual thread
     */
    public static <T> CompletableFuture<T> supplyAsync(Supplier<T> supplier) {
        return CompletableFuture.supplyAsync(supplier, virtualExecutor);
    }
    
    /**
     * Executes multiple tasks concurrently using virtual threads and waits for all to complete
     */
    public static <T> List<T> executeAll(Collection<Supplier<T>> tasks) throws ExecutionException, InterruptedException {
        List<CompletableFuture<T>> futures = tasks.stream()
            .map(VirtualThreadCompletableFutureUtils::supplyAsync)
            .collect(Collectors.toList());
            
        CompletableFuture<Void> allOf = CompletableFuture.allOf(
            futures.toArray(new CompletableFuture[0])
        );
        
        allOf.join();
        
        return futures.stream()
            .map(CompletableFuture::join)
            .collect(Collectors.toList());
    }
    
    /**
     * Executes multiple tasks concurrently with a timeout
     */
    public static <T> List<T> executeAllWithTimeout(
            Collection<Supplier<T>> tasks, 
            long timeout, 
            TimeUnit unit) throws ExecutionException, InterruptedException, TimeoutException {
            
        List<CompletableFuture<T>> futures = tasks.stream()
            .map(VirtualThreadCompletableFutureUtils::supplyAsync)
            .collect(Collectors.toList());
            
        CompletableFuture<Void> allOf = CompletableFuture.allOf(
            futures.toArray(new CompletableFuture[0])
        );
        
        allOf.get(timeout, unit);
        
        return futures.stream()
            .map(CompletableFuture::join)
            .collect(Collectors.toList());
    }
    
    /**
     * Executes tasks with retry capability
     */
    public static <T> CompletableFuture<T> supplyAsyncWithRetry(
            Supplier<T> supplier,
            int maxRetries,
            long delay,
            TimeUnit unit) {
            
        return supplyAsync(supplier).handle((result, throwable) -> {
            if (throwable != null && maxRetries > 0) {
                try {
                    unit.sleep(delay);
                    return supplyAsyncWithRetry(supplier, maxRetries - 1, delay, unit).join();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new CompletionException(e);
                }
            }
            if (throwable != null) {
                throw new CompletionException(throwable);
            }
            return result;
        });
    }
    
    /**
     * Creates a batch processor that processes items in parallel using virtual threads
     */
    public static <T, R> List<R> processBatch(
            Collection<T> items,
            int batchSize,
            Function<T, R> processor) {
            
        return Lists.partition(new ArrayList<>(items), batchSize)
            .stream()
            .map(batch -> batch.stream()
                .map(item -> supplyAsync(() -> processor.apply(item)))
                .collect(Collectors.toList()))
            .flatMap(batchFutures -> {
                CompletableFuture.allOf(batchFutures.toArray(new CompletableFuture[0])).join();
                return batchFutures.stream().map(CompletableFuture::join);
            })
            .collect(Collectors.toList());
    }
}
Editor is loading...
Leave a Comment