Untitled
unknown
plain_text
4 months ago
4.2 kB
5
Indexable
const { SQSClient, SendMessageCommand, DeleteMessageCommand } = require('@aws-sdk/client-sqs'); const { S3Client, PutObjectCommand } = require('@aws-sdk/client-s3'); const { LambdaClient, InvokeCommand } = require('@aws-sdk/client-lambda'); const https = require('https'); // Initialize AWS clients const sqs = new SQSClient(); const s3 = new S3Client(); const lambda = new LambdaClient(); // Configuration const MAX_RETRIES = process.env.MAX_RETRIES || 3; const RETRY_DELAY_SECONDS = 900; // 15 minutes const QUEUE_URL = process.env.SQS_QUEUE_URL; const POST_LAMBDA_ARN = process.env.POST_LAMBDA_ARN; const OUTPUT_BUCKET = process.env.OUTPUT_BUCKET; async function fetchPaginatedAPI(fromDate, toDate) { let allResults = []; let index = 0; while(true) { const url = new URL(process.env.API_ENDPOINT); url.searchParams.append('from', fromDate); url.searchParams.append('to', toDate); url.searchParams.append('index', index); const response = await new Promise((resolve, reject) => { https.get(url, res => { let data = ''; res.on('data', chunk => data += chunk); res.on('end', () => resolve(JSON.parse(data))); }).on('error', reject); }); allResults = allResults.concat(response.items); index += response.items.length; if(index >= response.total) break; } return allResults; } async function uploadToS3(data, filename) { await s3.send(new PutObjectCommand({ Bucket: OUTPUT_BUCKET, Key: filename, Body: JSON.stringify(data), ContentType: 'application/json' })); return filename; } async function handleSuccess(id, receiptHandle) { await lambda.send(new InvokeCommand({ FunctionName: POST_LAMBDA_ARN, Payload: JSON.stringify({ id, status: 'SUCCESS' }) })); if(receiptHandle) { await sqs.send(new DeleteMessageCommand({ QueueUrl: QUEUE_URL, ReceiptHandle: receiptHandle })); } } async function handleFailure(id, params, retryCount) { if(retryCount >= MAX_RETRIES) { await lambda.send(new InvokeCommand({ FunctionName: POST_LAMBDA_ARN, Payload: JSON.stringify({ id, status: 'FAILED' }) })); return; } await sqs.send(new SendMessageCommand({ QueueUrl: QUEUE_URL, MessageBody: JSON.stringify({ id, params, retryCount: retryCount + 1 }), DelaySeconds: RETRY_DELAY_SECONDS })); } exports.handler = async (event) => { let params, id, receiptHandle; try { // Determine event source if(event.Records?.[0]?.eventSource === 'aws:s3') { // Initial execution from S3 params = { fromDate: process.env.LAST_RUN_DATE, toDate: new Date().toISOString() }; id = `exec-${Date.now()}`; } else if(event.Records?.[0]?.eventSource === 'aws:sqs') { // Retry execution from SQS const message = JSON.parse(event.Records[0].body); params = message.params; id = message.id; receiptHandle = event.Records[0].receiptHandle; } else { throw new Error('Unknown event source'); } // Core processing const apiData = await fetchPaginatedAPI(params.fromDate, params.toDate); const filename = `data-${params.toDate}.json`; await uploadToS3(apiData, filename); // Update last run date for initial executions if(!receiptHandle) { process.env.LAST_RUN_DATE = params.toDate; } await handleSuccess(id, receiptHandle); return { status: 'COMPLETED' }; } catch(error) { console.error('Processing failed:', error); await handleFailure(id, params, receiptHandle ? event.Records[0].body.retryCount : 0); return { status: 'FAILED', error: error.message }; } };
Editor is loading...
Leave a Comment