Untitled
unknown
plain_text
a year ago
7.4 kB
6
Indexable
// src/inbox/services/message-processing.service.ts
import { Injectable, Logger } from '@nestjs/common';
import * as Imap from 'imap';
import { simpleParser } from 'mailparser';
import { InboxMessage } from './entities/inbox-message.entity';
import { Repository } from 'typeorm';
import { InjectRepository } from '@nestjs/typeorm';
import { EmailParserUtil } from 'src/util/email-parser';
import { EmailMetrics } from 'src/email/entities/email-metrics.entity';
import { InboxMetricsService } from './inbox-metrics.service';
interface MessageInfo {
from: string;
subject: string;
date: Date;
inReplyTo?: string;
references?: string[];
textBody?: string;
htmlBody?: string;
}
@Injectable()
export class MessageProcessingService {
private readonly logger = new Logger(MessageProcessingService.name);
constructor(
@InjectRepository(InboxMessage)
private messageRepository: Repository<InboxMessage>,
@InjectRepository(EmailMetrics)
private emailMetricsRepository: Repository<EmailMetrics>,
private readonly inboxMetricsService: InboxMetricsService
) {}
/**
* Poll for new messages SINCE the lastPollDate, then process any that
* reference previous outbound emails (via inReplyTo).
*
* @param imap The connected IMAP instance
* @param lastPollDate Only fetch messages *after* this date/time
*/
async searchAndProcessMessages(imap: Imap, lastPollDate: Date): Promise<void> {
try {
const searchCriteria = this.buildDateSearchQuery(lastPollDate);
this.logger.debug(`Searching for messages since: ${lastPollDate.toISOString()}`);
this.logger.debug('Final IMAP Search Criteria:', JSON.stringify(searchCriteria));
await this.executeSearch(imap, searchCriteria);
} catch (error) {
this.logger.error('Failed to search and process messages:', error);
throw error;
}
}
/**
* Returns an IMAP search query that finds only messages since the given date.
* node-imap requires that 'SINCE' plus its argument be in a sub-array, so
* we return [ ['SINCE', <date> ] ] rather than ['SINCE', <date> ].
*/
private buildDateSearchQuery(lastPollDate: Date): any[] {
// We'll pass the date as a native Date object, letting node-imap convert it.
// The sub-array ensures "SINCE" sees exactly one argument => no "Incorrect number" errors.
return [
['SINCE', lastPollDate],
];
}
/**
* Perform the actual IMAP search and, if there are results, fetch them.
*/
private async executeSearch(imap: Imap, criteria: any[]): Promise<void> {
return new Promise((resolve, reject) => {
this.logger.debug('Executing IMAP search with criteria:', JSON.stringify(criteria));
imap.search(criteria, async (err, results) => {
if (err) {
this.logger.error('IMAP search failed:', err);
return reject(err);
}
this.logger.debug(`Found ${results?.length || 0} matching messages`);
if (!results?.length) {
return resolve(); // No new messages found
}
try {
// If we have results, let's fetch them
await this.fetchAndProcessResults(imap, results);
resolve();
} catch (error) {
reject(error);
}
});
});
}
/**
* Given a list of UIDs from the IMAP search, fetch the header/body
* and process each message (e.g. handle replies).
*/
private async fetchAndProcessResults(imap: Imap, uids: number[]): Promise<void> {
return new Promise((resolve, reject) => {
const fetch = imap.fetch(uids, {
bodies: ['HEADER.FIELDS (FROM TO SUBJECT DATE REFERENCES IN-REPLY-TO)', 'TEXT'],
struct: true,
});
let processedCount = 0;
const totalCount = uids.length;
fetch.on('message', async (msg) => {
try {
const messageInfo = await this.processMessage(msg);
if (messageInfo?.inReplyTo) {
// If there's an inReplyTo, see if it matches an outbound message
await this.handleReply(messageInfo);
}
processedCount++;
this.logger.debug(`Processed ${processedCount}/${totalCount} messages`);
} catch (error) {
this.logger.error('Error processing message:', error);
}
});
fetch.once('error', (err) => {
this.logger.error('Fetch error:', err);
reject(err);
});
fetch.once('end', () => {
this.logger.debug('All fetch operations complete');
resolve();
});
});
}
/**
* Parse each message using mailparser, extracting:
* - from, subject, date, inReplyTo, references
* - textBody, htmlBody
*/
private async processMessage(msg: Imap.ImapMessage): Promise<MessageInfo | null> {
return new Promise((resolve) => {
const messageInfo: Partial<MessageInfo> = {};
let partsProcessed = 0;
msg.on('body', async (stream, info) => {
try {
const parsed = await simpleParser(stream);
if (info.which.startsWith('HEADER')) {
Object.assign(messageInfo, {
from: parsed.from?.text || '',
subject: parsed.subject || '',
date: parsed.date || new Date(),
inReplyTo: parsed.inReplyTo,
references: parsed.references || [],
});
} else {
// It's the text (or HTML) part
Object.assign(messageInfo, {
textBody: parsed.text,
htmlBody: parsed.html,
});
}
partsProcessed++;
// Once we've parsed both header and text, we can resolve
if (partsProcessed === 2) {
resolve(messageInfo as MessageInfo);
}
} catch (error) {
this.logger.error('Error parsing message part:', error);
if (partsProcessed === 0) {
resolve(null);
}
}
});
msg.once('error', (err) => {
this.logger.error('Message processing error:', err);
if (partsProcessed === 0) {
resolve(null);
}
});
});
}
/**
* If the message has an inReplyTo that matches a tracked email, mark it
* as replied in the DB (or update any relevant metrics).
*/
private async handleReply(messageInfo: MessageInfo): Promise<void> {
const parsedContent = EmailParserUtil.parseEmailBody({
textBody: messageInfo.textBody,
htmlBody: messageInfo.htmlBody,
});
// Try to find an existing EmailMetrics row by the inReplyTo value
const originalMetrics = await this.emailMetricsRepository.findOne({
where: [
{ messageId: messageInfo.inReplyTo },
{ providerMessageId: messageInfo.inReplyTo },
],
relations: ['campaign'],
});
// If we found a matching campaign email that hasn't been replied to yet,
// update the record. (Your existing logic.)
if (originalMetrics?.campaign && !originalMetrics.replied) {
await this.inboxMetricsService.updateCampaignMetrics(
originalMetrics.campaign.id,
originalMetrics.messageId,
{
content: parsedContent.reply,
replierEmail: messageInfo.from,
},
);
this.logger.debug('Recorded reply:', {
originalMessageId: originalMetrics.messageId,
replierEmail: messageInfo.from,
});
}
}
}
Editor is loading...
Leave a Comment