Untitled
unknown
plain_text
10 months ago
7.2 kB
1
Indexable
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder; import com.amazonaws.services.dynamodbv2.document.DynamoDB; import com.amazonaws.services.dynamodbv2.document.Item; import com.amazonaws.services.dynamodbv2.document.Table; import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.AmazonS3ClientBuilder; import com.amazonaws.services.s3.model.ListObjectsV2Request; import com.amazonaws.services.s3.model.ListObjectsV2Result; import com.amazonaws.services.s3.model.S3Object; import com.amazonaws.services.s3.model.S3ObjectSummary; import com.opencsv.CSVReader; import com.opencsv.CSVWriter; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.io.*; import java.util.*; import java.util.function.Predicate; import java.util.stream.Collectors; public class S3CSVReader { private static final Logger LOGGER = LogManager.getLogger(S3CSVReader.class); private static final String BUCKET_NAME = "your-bucket-name"; private static final String PREFIX = "folder/"; private static final String OUTPUT_BUCKET_NAME = "output-bucket-name"; private static final String OUTPUT_KEY = "output/valid_records.csv"; private static final String DYNAMO_TABLE_NAME = "invalid_records"; private static final Predicate<String[]> ROW_VALIDATOR = row -> { if (!row[0].endsWith(".csv")) { LOGGER.error("The file is not a CSV file."); return false; } if (row.length < 15) { LOGGER.error("Row does not have enough columns: {}", Arrays.toString(row)); return false; } for (String value : row) { if (value == null) { LOGGER.error("Null value found in row: {}", Arrays.toString(row)); return false; } } String column3 = row[2]; String column12 = row[11]; String column13 = row[12]; String column14 = row[13]; if (column3.length() != 2 || !column3.matches("[a-zA-Z]+")) { LOGGER.error("Invalid value in column 3 of row: {}", Arrays.toString(row)); return false; } if (!column12.matches("\\d+(\\.\\d+)?") || !column13.matches("\\d+(\\.\\d+)?")) { LOGGER.error("Invalid value in columns 12 or 13 of row: {}", Arrays.toString(row)); return false; } double column12Value = Double.parseDouble(column12); double column13Value = Double.parseDouble(column13); double column14Value = Double.parseDouble(column14); if (column13Value <= column12Value) { LOGGER.error("Column 13 value ({}) is not greater than column 12 value ({}) in row: {}", column13Value, column12Value, Arrays.toString(row)); return false; } if (column14Value >= 50.0) { LOGGER.error("Column 14 value ({}) is not less than 50% in row: {}", column14Value, Arrays.toString(row)); return false; } return true; }; public static void main(String[] args) { AmazonS3 s3Client = AmazonS3ClientBuilder.defaultClient(); Optional<S3ObjectSummary> latestFile = findLatestCSVFile(s3Client); if (latestFile.isPresent()) { S3Object s3Object = s3Client.getObject(BUCKET_NAME, latestFile.get().getKey()); try (BufferedReader reader = new BufferedReader(new InputStreamReader(s3Object.getObjectContent()))) { CSVReader csvReader = new CSVReader(reader); List<String[]> records = csvReader.readAll(); Map<Boolean, List<String[]>> partitionedRecords = records.stream() .collect(Collectors.partitioningBy(ROW_VALIDATOR)); List<String[]> validRecords = partitionedRecords.get(true); List<String[]> invalidRecords = partitionedRecords.get(false); LOGGER.info("Found {} valid records and {} invalid records in the CSV file.", validRecords.size(), invalidRecords.size()); // Write valid records to a CSV file in an S3 location ByteArrayOutputStream validOutputStream = new ByteArrayOutputStream(); try (CSVWriter csvWriter = new CSVWriter(new OutputStreamWriter(validOutputStream))) { csvWriter.writeAll(validRecords); } byte[] csvBytes = validOutputStream.toByteArray(); s3Client.putObject(OUTPUT_BUCKET_NAME, OUTPUT_KEY, new ByteArrayInputStream(csvBytes), new Object()); LOGGER.info("Valid records written to S3 location: {}/{}", OUTPUT_BUCKET_NAME, OUTPUT_KEY); // Write invalid records to a DynamoDB table AmazonDynamoDB dynamoDBClient = AmazonDynamoDBClientBuilder.defaultClient(); DynamoDB dynamoDB = new DynamoDB(dynamoDBClient); Table table = dynamoDB.getTable(DYNAMO_TABLE_NAME); for (String[] invalidRecord : invalidRecords) { Item item = new Item() .withPrimaryKey("id", UUID.randomUUID().toString()) .withList("record", Arrays.asList(invalidRecord)); table.putItem(item); } LOGGER.info("Invalid records written to DynamoDB table: {}", DYNAMO_TABLE_NAME); Map<String, Long> column14ValueCounts = validRecords.stream() .map(row -> row[13]) .collect(Collectors.groupingBy(value -> value, Collectors.counting())); long totalValues = validRecords.size(); long repeatedValues = column14ValueCounts.values().stream() .filter(count -> count > 1) .map(count -> count - 1) .mapToLong(value -> value) .sum(); double percentageRepeated = (double) repeatedValues / totalValues * 100; LOGGER.info("Percentage of values repeated in column 14: {:.2f}%", percentageRepeated); } catch (IOException e) { LOGGER.error("Error reading, parsing, or writing CSV file.", e); } } else { LOGGER.warn("No CSV files found in the bucket."); } } private static Optional<S3ObjectSummary> findLatestCSVFile(AmazonS3 s3Client) { ListObjectsV2Request req = new ListObjectsV2Request().withBucketName(BUCKET_NAME).withPrefix(PREFIX); ListObjectsV2Result result; List<S3ObjectSummary> csvFiles; do { result = s3Client.listObjectsV2(req); csvFiles = result.getObjectSummaries().stream() .filter(obj -> obj.getKey().endsWith(".csv")) .collect(Collectors.toList()); String token = result.getNextContinuationToken(); req.setContinuationToken(token); } while (result.isTruncated()); return csvFiles.stream() .max(Comparator.comparing(S3ObjectSummary::getLastModified)); } }
Editor is loading...
Leave a Comment