Untitled

 avatar
unknown
plain_text
4 months ago
4.2 kB
5
Indexable
const { SQSClient, SendMessageCommand, DeleteMessageCommand } = require('@aws-sdk/client-sqs');
const { S3Client, PutObjectCommand } = require('@aws-sdk/client-s3');
const { LambdaClient, InvokeCommand } = require('@aws-sdk/client-lambda');
const https = require('https');

// Initialize AWS clients
const sqs = new SQSClient();
const s3 = new S3Client();
const lambda = new LambdaClient();

// Configuration
const MAX_RETRIES = process.env.MAX_RETRIES || 3;
const RETRY_DELAY_SECONDS = 900; // 15 minutes
const QUEUE_URL = process.env.SQS_QUEUE_URL;
const POST_LAMBDA_ARN = process.env.POST_LAMBDA_ARN;
const OUTPUT_BUCKET = process.env.OUTPUT_BUCKET;

async function fetchPaginatedAPI(fromDate, toDate) {
    let allResults = [];
    let index = 0;
    
    while(true) {
        const url = new URL(process.env.API_ENDPOINT);
        url.searchParams.append('from', fromDate);
        url.searchParams.append('to', toDate);
        url.searchParams.append('index', index);
        
        const response = await new Promise((resolve, reject) => {
            https.get(url, res => {
                let data = '';
                res.on('data', chunk => data += chunk);
                res.on('end', () => resolve(JSON.parse(data)));
            }).on('error', reject);
        });
        
        allResults = allResults.concat(response.items);
        index += response.items.length;
        
        if(index >= response.total) break;
    }
    
    return allResults;
}

async function uploadToS3(data, filename) {
    await s3.send(new PutObjectCommand({
        Bucket: OUTPUT_BUCKET,
        Key: filename,
        Body: JSON.stringify(data),
        ContentType: 'application/json'
    }));
    return filename;
}

async function handleSuccess(id, receiptHandle) {
    await lambda.send(new InvokeCommand({
        FunctionName: POST_LAMBDA_ARN,
        Payload: JSON.stringify({ id, status: 'SUCCESS' })
    }));
    
    if(receiptHandle) {
        await sqs.send(new DeleteMessageCommand({
            QueueUrl: QUEUE_URL,
            ReceiptHandle: receiptHandle
        }));
    }
}

async function handleFailure(id, params, retryCount) {
    if(retryCount >= MAX_RETRIES) {
        await lambda.send(new InvokeCommand({
            FunctionName: POST_LAMBDA_ARN,
            Payload: JSON.stringify({ id, status: 'FAILED' })
        }));
        return;
    }

    await sqs.send(new SendMessageCommand({
        QueueUrl: QUEUE_URL,
        MessageBody: JSON.stringify({
            id,
            params,
            retryCount: retryCount + 1
        }),
        DelaySeconds: RETRY_DELAY_SECONDS
    }));
}

exports.handler = async (event) => {
    let params, id, receiptHandle;
    
    try {
        // Determine event source
        if(event.Records?.[0]?.eventSource === 'aws:s3') {
            // Initial execution from S3
            params = {
                fromDate: process.env.LAST_RUN_DATE,
                toDate: new Date().toISOString()
            };
            id = `exec-${Date.now()}`;
        } else if(event.Records?.[0]?.eventSource === 'aws:sqs') {
            // Retry execution from SQS
            const message = JSON.parse(event.Records[0].body);
            params = message.params;
            id = message.id;
            receiptHandle = event.Records[0].receiptHandle;
        } else {
            throw new Error('Unknown event source');
        }

        // Core processing
        const apiData = await fetchPaginatedAPI(params.fromDate, params.toDate);
        const filename = `data-${params.toDate}.json`;
        await uploadToS3(apiData, filename);
        
        // Update last run date for initial executions
        if(!receiptHandle) {
            process.env.LAST_RUN_DATE = params.toDate;
        }

        await handleSuccess(id, receiptHandle);
        return { status: 'COMPLETED' };
        
    } catch(error) {
        console.error('Processing failed:', error);
        await handleFailure(id, params, receiptHandle ? event.Records[0].body.retryCount : 0);
        return { status: 'FAILED', error: error.message };
    }
};
Editor is loading...
Leave a Comment