Untitled
// src/inbox/services/message-processing.service.ts import { Injectable, Logger } from '@nestjs/common'; import * as Imap from 'imap'; import { simpleParser } from 'mailparser'; import { InboxMessage } from './entities/inbox-message.entity'; import { Repository } from 'typeorm'; import { InjectRepository } from '@nestjs/typeorm'; import { EmailParserUtil } from 'src/util/email-parser'; import { EmailMetrics } from 'src/email/entities/email-metrics.entity'; import { InboxMetricsService } from './inbox-metrics.service'; interface MessageInfo { from: string; subject: string; date: Date; inReplyTo?: string; references?: string[]; textBody?: string; htmlBody?: string; } @Injectable() export class MessageProcessingService { private readonly logger = new Logger(MessageProcessingService.name); constructor( @InjectRepository(InboxMessage) private messageRepository: Repository<InboxMessage>, @InjectRepository(EmailMetrics) private emailMetricsRepository: Repository<EmailMetrics>, private readonly inboxMetricsService: InboxMetricsService ) {} /** * Poll for new messages SINCE the lastPollDate, then process any that * reference previous outbound emails (via inReplyTo). * * @param imap The connected IMAP instance * @param lastPollDate Only fetch messages *after* this date/time */ async searchAndProcessMessages(imap: Imap, lastPollDate: Date): Promise<void> { try { const searchCriteria = this.buildDateSearchQuery(lastPollDate); this.logger.debug(`Searching for messages since: ${lastPollDate.toISOString()}`); this.logger.debug('Final IMAP Search Criteria:', JSON.stringify(searchCriteria)); await this.executeSearch(imap, searchCriteria); } catch (error) { this.logger.error('Failed to search and process messages:', error); throw error; } } /** * Returns an IMAP search query that finds only messages since the given date. * node-imap requires that 'SINCE' plus its argument be in a sub-array, so * we return [ ['SINCE', <date> ] ] rather than ['SINCE', <date> ]. */ private buildDateSearchQuery(lastPollDate: Date): any[] { // We'll pass the date as a native Date object, letting node-imap convert it. // The sub-array ensures "SINCE" sees exactly one argument => no "Incorrect number" errors. return [ ['SINCE', lastPollDate], ]; } /** * Perform the actual IMAP search and, if there are results, fetch them. */ private async executeSearch(imap: Imap, criteria: any[]): Promise<void> { return new Promise((resolve, reject) => { this.logger.debug('Executing IMAP search with criteria:', JSON.stringify(criteria)); imap.search(criteria, async (err, results) => { if (err) { this.logger.error('IMAP search failed:', err); return reject(err); } this.logger.debug(`Found ${results?.length || 0} matching messages`); if (!results?.length) { return resolve(); // No new messages found } try { // If we have results, let's fetch them await this.fetchAndProcessResults(imap, results); resolve(); } catch (error) { reject(error); } }); }); } /** * Given a list of UIDs from the IMAP search, fetch the header/body * and process each message (e.g. handle replies). */ private async fetchAndProcessResults(imap: Imap, uids: number[]): Promise<void> { return new Promise((resolve, reject) => { const fetch = imap.fetch(uids, { bodies: ['HEADER.FIELDS (FROM TO SUBJECT DATE REFERENCES IN-REPLY-TO)', 'TEXT'], struct: true, }); let processedCount = 0; const totalCount = uids.length; fetch.on('message', async (msg) => { try { const messageInfo = await this.processMessage(msg); if (messageInfo?.inReplyTo) { // If there's an inReplyTo, see if it matches an outbound message await this.handleReply(messageInfo); } processedCount++; this.logger.debug(`Processed ${processedCount}/${totalCount} messages`); } catch (error) { this.logger.error('Error processing message:', error); } }); fetch.once('error', (err) => { this.logger.error('Fetch error:', err); reject(err); }); fetch.once('end', () => { this.logger.debug('All fetch operations complete'); resolve(); }); }); } /** * Parse each message using mailparser, extracting: * - from, subject, date, inReplyTo, references * - textBody, htmlBody */ private async processMessage(msg: Imap.ImapMessage): Promise<MessageInfo | null> { return new Promise((resolve) => { const messageInfo: Partial<MessageInfo> = {}; let partsProcessed = 0; msg.on('body', async (stream, info) => { try { const parsed = await simpleParser(stream); if (info.which.startsWith('HEADER')) { Object.assign(messageInfo, { from: parsed.from?.text || '', subject: parsed.subject || '', date: parsed.date || new Date(), inReplyTo: parsed.inReplyTo, references: parsed.references || [], }); } else { // It's the text (or HTML) part Object.assign(messageInfo, { textBody: parsed.text, htmlBody: parsed.html, }); } partsProcessed++; // Once we've parsed both header and text, we can resolve if (partsProcessed === 2) { resolve(messageInfo as MessageInfo); } } catch (error) { this.logger.error('Error parsing message part:', error); if (partsProcessed === 0) { resolve(null); } } }); msg.once('error', (err) => { this.logger.error('Message processing error:', err); if (partsProcessed === 0) { resolve(null); } }); }); } /** * If the message has an inReplyTo that matches a tracked email, mark it * as replied in the DB (or update any relevant metrics). */ private async handleReply(messageInfo: MessageInfo): Promise<void> { const parsedContent = EmailParserUtil.parseEmailBody({ textBody: messageInfo.textBody, htmlBody: messageInfo.htmlBody, }); // Try to find an existing EmailMetrics row by the inReplyTo value const originalMetrics = await this.emailMetricsRepository.findOne({ where: [ { messageId: messageInfo.inReplyTo }, { providerMessageId: messageInfo.inReplyTo }, ], relations: ['campaign'], }); // If we found a matching campaign email that hasn't been replied to yet, // update the record. (Your existing logic.) if (originalMetrics?.campaign && !originalMetrics.replied) { await this.inboxMetricsService.updateCampaignMetrics( originalMetrics.campaign.id, originalMetrics.messageId, { content: parsedContent.reply, replierEmail: messageInfo.from, }, ); this.logger.debug('Recorded reply:', { originalMessageId: originalMetrics.messageId, replierEmail: messageInfo.from, }); } } }
Leave a Comment