Untitled

 avatar
unknown
plain_text
a month ago
7.4 kB
2
Indexable
// src/inbox/services/message-processing.service.ts
import { Injectable, Logger } from '@nestjs/common';
import * as Imap from 'imap';
import { simpleParser } from 'mailparser';
import { InboxMessage } from './entities/inbox-message.entity';
import { Repository } from 'typeorm';
import { InjectRepository } from '@nestjs/typeorm';
import { EmailParserUtil } from 'src/util/email-parser';
import { EmailMetrics } from 'src/email/entities/email-metrics.entity';
import { InboxMetricsService } from './inbox-metrics.service';

interface MessageInfo {
  from: string;
  subject: string;
  date: Date;
  inReplyTo?: string;
  references?: string[];
  textBody?: string;
  htmlBody?: string;
}

@Injectable()
export class MessageProcessingService {
  private readonly logger = new Logger(MessageProcessingService.name);

  constructor(
    @InjectRepository(InboxMessage)
    private messageRepository: Repository<InboxMessage>,
    @InjectRepository(EmailMetrics)
    private emailMetricsRepository: Repository<EmailMetrics>,
    private readonly inboxMetricsService: InboxMetricsService
  ) {}

  /**
   * Poll for new messages SINCE the lastPollDate, then process any that
   * reference previous outbound emails (via inReplyTo).
   *
   * @param imap         The connected IMAP instance
   * @param lastPollDate Only fetch messages *after* this date/time
   */
  async searchAndProcessMessages(imap: Imap, lastPollDate: Date): Promise<void> {
    try {
      const searchCriteria = this.buildDateSearchQuery(lastPollDate);
      this.logger.debug(`Searching for messages since: ${lastPollDate.toISOString()}`);
      this.logger.debug('Final IMAP Search Criteria:', JSON.stringify(searchCriteria));

      await this.executeSearch(imap, searchCriteria);
    } catch (error) {
      this.logger.error('Failed to search and process messages:', error);
      throw error;
    }
  }

  /**
   * Returns an IMAP search query that finds only messages since the given date.
   * node-imap requires that 'SINCE' plus its argument be in a sub-array, so
   * we return [ ['SINCE', <date> ] ] rather than ['SINCE', <date> ].
   */
  private buildDateSearchQuery(lastPollDate: Date): any[] {
    // We'll pass the date as a native Date object, letting node-imap convert it.
    // The sub-array ensures "SINCE" sees exactly one argument => no "Incorrect number" errors.
    return [
      ['SINCE', lastPollDate],
    ];
  }

  /**
   * Perform the actual IMAP search and, if there are results, fetch them.
   */
  private async executeSearch(imap: Imap, criteria: any[]): Promise<void> {
    return new Promise((resolve, reject) => {
      this.logger.debug('Executing IMAP search with criteria:', JSON.stringify(criteria));

      imap.search(criteria, async (err, results) => {
        if (err) {
          this.logger.error('IMAP search failed:', err);
          return reject(err);
        }

        this.logger.debug(`Found ${results?.length || 0} matching messages`);

        if (!results?.length) {
          return resolve(); // No new messages found
        }

        try {
          // If we have results, let's fetch them
          await this.fetchAndProcessResults(imap, results);
          resolve();
        } catch (error) {
          reject(error);
        }
      });
    });
  }

  /**
   * Given a list of UIDs from the IMAP search, fetch the header/body
   * and process each message (e.g. handle replies).
   */
  private async fetchAndProcessResults(imap: Imap, uids: number[]): Promise<void> {
    return new Promise((resolve, reject) => {
      const fetch = imap.fetch(uids, {
        bodies: ['HEADER.FIELDS (FROM TO SUBJECT DATE REFERENCES IN-REPLY-TO)', 'TEXT'],
        struct: true,
      });

      let processedCount = 0;
      const totalCount = uids.length;

      fetch.on('message', async (msg) => {
        try {
          const messageInfo = await this.processMessage(msg);
          if (messageInfo?.inReplyTo) {
            // If there's an inReplyTo, see if it matches an outbound message
            await this.handleReply(messageInfo);
          }
          processedCount++;
          this.logger.debug(`Processed ${processedCount}/${totalCount} messages`);
        } catch (error) {
          this.logger.error('Error processing message:', error);
        }
      });

      fetch.once('error', (err) => {
        this.logger.error('Fetch error:', err);
        reject(err);
      });

      fetch.once('end', () => {
        this.logger.debug('All fetch operations complete');
        resolve();
      });
    });
  }

  /**
   * Parse each message using mailparser, extracting:
   *  - from, subject, date, inReplyTo, references
   *  - textBody, htmlBody
   */
  private async processMessage(msg: Imap.ImapMessage): Promise<MessageInfo | null> {
    return new Promise((resolve) => {
      const messageInfo: Partial<MessageInfo> = {};
      let partsProcessed = 0;

      msg.on('body', async (stream, info) => {
        try {
          const parsed = await simpleParser(stream);

          if (info.which.startsWith('HEADER')) {
            Object.assign(messageInfo, {
              from: parsed.from?.text || '',
              subject: parsed.subject || '',
              date: parsed.date || new Date(),
              inReplyTo: parsed.inReplyTo,
              references: parsed.references || [],
            });
          } else {
            // It's the text (or HTML) part
            Object.assign(messageInfo, {
              textBody: parsed.text,
              htmlBody: parsed.html,
            });
          }

          partsProcessed++;
          // Once we've parsed both header and text, we can resolve
          if (partsProcessed === 2) {
            resolve(messageInfo as MessageInfo);
          }
        } catch (error) {
          this.logger.error('Error parsing message part:', error);
          if (partsProcessed === 0) {
            resolve(null);
          }
        }
      });

      msg.once('error', (err) => {
        this.logger.error('Message processing error:', err);
        if (partsProcessed === 0) {
          resolve(null);
        }
      });
    });
  }

  /**
   * If the message has an inReplyTo that matches a tracked email, mark it
   * as replied in the DB (or update any relevant metrics).
   */
  private async handleReply(messageInfo: MessageInfo): Promise<void> {
    const parsedContent = EmailParserUtil.parseEmailBody({
      textBody: messageInfo.textBody,
      htmlBody: messageInfo.htmlBody,
    });

    // Try to find an existing EmailMetrics row by the inReplyTo value
    const originalMetrics = await this.emailMetricsRepository.findOne({
      where: [
        { messageId: messageInfo.inReplyTo },
        { providerMessageId: messageInfo.inReplyTo },
      ],
      relations: ['campaign'],
    });

    // If we found a matching campaign email that hasn't been replied to yet,
    // update the record. (Your existing logic.)
    if (originalMetrics?.campaign && !originalMetrics.replied) {
      await this.inboxMetricsService.updateCampaignMetrics(
        originalMetrics.campaign.id,
        originalMetrics.messageId,
        {
          content: parsedContent.reply,
          replierEmail: messageInfo.from,
        },
      );

      this.logger.debug('Recorded reply:', {
        originalMessageId: originalMetrics.messageId,
        replierEmail: messageInfo.from,
      });
    }
  }
}
Leave a Comment