Untitled
"use node"; import { api, internal } from "../_generated/api"; import { Integration, PMSPlatform } from "./../types"; import { internalAction, action } from "../_generated/server"; import { sendMessage as sendHostawayMessage } from "./integrations/hostaway"; import { sendMessage as sendGuestyMessage } from "./integrations/guesty"; import { Id } from "../_generated/dataModel"; import { fetchMessagesByConversationId as fetchHostawayMessagesByConversationId } from "./integrations/hostaway"; import { fetchMessagesByConversationId as fetchGuestyMessagesByConversationId } from "./integrations/guesty"; import { logSlackError, withErrorLog } from "./utils/slackLogger"; import { sanitizeUrlFromText } from "../utils/helpers"; // add prohibited words here const PROHIBITED_WORDS_AND_REPLACEMENT: string[][] = [["Google Maps", "Maps"]]; function sanitizeText(text: string): string { const removedUrlText = sanitizeUrlFromText(text); let finalText = removedUrlText; PROHIBITED_WORDS_AND_REPLACEMENT.forEach((item: string[]) => { const [prohibitedWord, replacement] = item; finalText = finalText.replace(prohibitedWord, replacement); }); return finalText; } export const sendMessage = action( async ( { runQuery, runMutation }, { message, conversationId, pmsPlatform, }: { message: string; conversationId: number | string; pmsPlatform: PMSPlatform; } ): Promise<void> => { await withErrorLog(async () => { const integrations: Integration[] = await runQuery( api.integrations.listForUser ); const pmsIntegration: Integration | undefined = integrations.find( (integration) => integration.pmsPlatform === pmsPlatform ); const user = await runQuery(api.users.getUser); if (!pmsIntegration) { throw new Error( `No integration found for conversationId ${conversationId}. Please connect to ${pmsPlatform} first. User ${user?._id}.` ); } if (!user) throw new Error("No user found"); const conversation = await runQuery(api.conversations.getById, { userId: user._id, conversationId, }); if (!conversation) throw new Error("No conversation found"); switch (pmsPlatform) { case PMSPlatform.HOSTAWAY: const reservation = await runQuery( api.reservations.getForUserById, { reservationId: conversation.reservationId, userId: user._id, } ); if (!reservation) throw new Error("No reservation found"); const { channelName } = reservation; const channelsToSendByChannel = [ "airbnb", "airbnbOfficial", "bookingcom", ]; const isChannel = !conversation.recipientEmail || channelsToSendByChannel.includes(channelName || ""); const hostawayMessageResponse = await sendHostawayMessage( pmsIntegration.accessToken, conversationId as number, sanitizeText(message), isChannel ); await runMutation(api.messages.addMessageHostaway, { message: hostawayMessageResponse, }); break; case PMSPlatform.HOSTFULLY: return; case PMSPlatform.GUESTY: // TODO: refactor with new documenation const { conversationType } = conversation; if (!conversationType) throw new Error("No conversation type found"); const guestyMessageResponse = await sendGuestyMessage( pmsIntegration.accessToken, conversationId as string, message, conversationType as any ); await runMutation(api.messages.addMessageGuesty, { message: guestyMessageResponse, }); break; default: return; } }, `messages.ts: sendMessage, ${pmsPlatform}, conversationId: ${conversationId}`); } ); export const sendMessageInternal = internalAction( async ( { runQuery, runMutation, runAction, scheduler }, { userId, hostResponse, conversationId, pmsPlatform, generated, isAutoReply, }: { userId: Id<"users">; hostResponse: string; conversationId: number | string; pmsPlatform: PMSPlatform; generated: boolean; isAutoReply?: boolean; } ): Promise<boolean> => { return await withErrorLog(async () => { const pmsIntegration: Integration | null = await runQuery( internal.integrations.getForUser, { userId, pmsPlatform, } ); if (!pmsIntegration) { throw new Error( `No integration found for ${pmsPlatform}. Please connect to ${pmsPlatform} first.` ); } const conversation = await runQuery(api.conversations.getById, { userId, conversationId, }); if (!conversation) throw new Error("No conversation found"); // send error message to user const userPhoneNumber = await runQuery( internal.userPhoneNumbers.getPhoneByUserId, { userId, } ); // check slack integraiton const slackIntegration = await runQuery( internal.slackIntegration.slackIntegrations.getByUserId, { userId, } ); const listing = await runQuery( internal.listings.getByUserAndListingId, { userId, listingId: conversation.listingMapId, } ); if (!listing) throw new Error("No listing found"); try { switch (pmsPlatform) { case PMSPlatform.HOSTAWAY: const reservation = await runQuery( api.reservations.getForUserById, { reservationId: conversation.reservationId, userId, } ); if (!reservation) throw new Error("No reservation found"); const { channelName } = reservation; const channelsToSendByChannel = [ "airbnb", "airbnbOfficial", "bookingcom", ]; const isChannel = !conversation.recipientEmail || channelsToSendByChannel.includes(channelName || ""); const hostawayMessageResponse = await sendHostawayMessage( pmsIntegration.accessToken, conversationId as number, sanitizeText(hostResponse), isChannel ); await scheduler.runAfter( 0, internal.messages.addMessageHostawayInternal, { userId, message: hostawayMessageResponse, generated, } ); break; case PMSPlatform.HOSTFULLY: break; case PMSPlatform.GUESTY: // TODO: refactor with new documenation const { conversationType } = conversation; if (!conversationType) throw new Error("No conversation type found"); const guestyMessageResponse = await sendGuestyMessage( pmsIntegration.accessToken, conversationId as string, hostResponse, conversationType as any ); await scheduler.runAfter( 0, internal.messages.addMessageGuestyInternal, { userId: userId, message: guestyMessageResponse, generated, } ); break; default: break; } // Send sms or slack update if auto reply if (isAutoReply) { if (userPhoneNumber) { await scheduler.runAfter( 5000, api.actions.twilio.sendSMS, { body: `HostAI replied:\n\n"${hostResponse}"`, to: userPhoneNumber, userId, } ); } if (slackIntegration) { const listing = await runQuery( internal.listings.getByUserAndListingId, { userId, listingId: conversation.listingMapId, } ); if (!listing) throw new Error("No listing found"); await scheduler.runAfter( 3000, internal.actions.slackIntegration .sendSentReplyMessage, { userId, integration: slackIntegration, response: hostResponse, conversation, listing, } ); } } return true; } catch (e: any) { await logSlackError( e.message, "messages.ts: sendMessageInternal", userId ); if (userPhoneNumber) { await runAction(api.actions.twilio.sendSMS, { to: userPhoneNumber, body: `HostAI can't reply to ${conversation?.recipientName}.\n\nWe will look into this.`, userId, }); } if (slackIntegration) { await runAction( internal.actions.slackIntegration.sendErrorMessage, { userId, listing, conversation, hostResponse: "", integration: slackIntegration, } ); } return false; } }, "messages.ts: sendMessageInternal, pmsPlatform: " + pmsPlatform + `, userId: ${userId}, body: ${hostResponse}`); } ); export const updateMessagesForConversation = action( async ( { runQuery, runMutation }, { conversationId, pmsPlatform, }: { conversationId: number | string; pmsPlatform: PMSPlatform } ): Promise<void> => { await withErrorLog(async () => { const integrations: Integration[] = await runQuery( api.integrations.listForUser ); const pmsIntegration: Integration | undefined = integrations.find( (integration) => integration.pmsPlatform === pmsPlatform ); if (!pmsIntegration) { throw new Error( `No integration found for ${pmsPlatform}. Please connect to ${pmsPlatform} first.` ); } const user = await runQuery(api.users.getUser); if (!user) { throw new Error("No user found"); } const userId = user._id; const conversation = await runQuery(api.conversations.getById, { userId, conversationId, }); if (!conversation) { throw new Error("No conversation found"); } const guestMessageBufferExists = await runQuery( internal.guestMessageBuffer.getByProperties, { userId, pmsPlatform, conversationId, listingMapId: conversation.listingMapId, } ); // if we're processing messages, don't fetch updates messages if (guestMessageBufferExists) return; switch (pmsPlatform) { case PMSPlatform.HOSTAWAY: const hostawayMessages = await fetchHostawayMessagesByConversationId( pmsIntegration?.accessToken, conversationId as number ); const rawHostawayMessages = await runMutation( internal.integrationsStore.hostaway.storeRawMessages, { userId: userId, messages: hostawayMessages.filter( (message: any) => message.body !== "" && message.attachments.length === 0 ), } ); await runMutation( internal.integrationsStore.hostaway .storeUnifiedMessages, { userId: userId, rawMessages: rawHostawayMessages, } ); const hostawayConversation = await runQuery( api.conversations.getById, { userId: userId, conversationId, } ); if ( rawHostawayMessages.length > 0 && hostawayConversation && rawHostawayMessages[0].messageId !== hostawayConversation.lastMessageId ) { await runMutation( internal.conversations.updateLastMessage, { id: hostawayConversation._id, lastMessageId: rawHostawayMessages[0] .messageId as number, } ); } break; case PMSPlatform.HOSTFULLY: return; case PMSPlatform.GUESTY: const conversation = await runQuery( api.conversations.getById, { userId: userId, conversationId, } ); if (!conversation) return; const guestyMessages = await fetchGuestyMessagesByConversationId( pmsIntegration?.accessToken, conversationId as string ); const guestyRawMessages = await runMutation( internal.integrationsStore.guesty.storeRawMessages, { userId: userId, messages: guestyMessages.filter( (message: any) => message.body !== "" ), listingId: conversation.listingMapId as string, reservationId: conversation.reservationId as string, } ); await runMutation( internal.integrationsStore.guesty.storeUnifiedMessages, { userId: userId, rawMessages: guestyRawMessages, } ); if ( guestyRawMessages.length > 0 && guestyRawMessages[0].messageId !== conversation.lastMessageId ) { await runMutation( internal.conversations.updateLastMessage, { id: conversation._id, lastMessageId: guestyRawMessages[0] .messageId as string, } ); } break; default: return; } }, `messages.ts: updateMessagesForConversation, ${pmsPlatform}, conversationId: ${conversationId}`); } ); export const updateMessagesForConversationByUserId = action( async ( { runQuery, runMutation, scheduler }, { conversationId, pmsPlatform, userId, onlyGetHostMessages, }: { conversationId: number | string; pmsPlatform: PMSPlatform; userId: Id<"users">; onlyGetHostMessages?: boolean; } ): Promise<void> => { await withErrorLog(async () => { const pmsIntegration: Integration | null = await runQuery( internal.integrations.getForUser, { userId, pmsPlatform, } ); if (!pmsIntegration) { throw new Error( `No integration found for ${pmsPlatform}. Please connect to ${pmsPlatform} first.` ); } switch (pmsPlatform) { case PMSPlatform.HOSTAWAY: const hostawayConversation = await runQuery( api.conversations.getById, { userId: userId as Id<"users">, conversationId, } ); if (!hostawayConversation) return; const hostawayMessages = await fetchHostawayMessagesByConversationId( pmsIntegration?.accessToken, conversationId as number, 10 ); const hostawayRawMessages = await runMutation( internal.integrationsStore.hostaway.storeRawMessages, { userId: userId as Id<"users">, messages: hostawayMessages.filter( (message: any) => message.body && message.body !== "" && message.attachments.length === 0 ), } ); await runMutation( internal.integrationsStore.hostaway .storeUnifiedMessages, { userId: userId as Id<"users">, rawMessages: !onlyGetHostMessages ? hostawayRawMessages : hostawayRawMessages.filter( (message: any) => !message.responseData.isIncoming ), } ); // using scheduler to reduce queue load if ( hostawayRawMessages.length > 0 && hostawayRawMessages[0].messageId !== hostawayConversation.lastMessageId ) { await scheduler.runAfter( 0, internal.conversations.updateLastMessage, { id: hostawayConversation._id, lastMessageId: hostawayRawMessages[0] .messageId as number, } ); } break; case PMSPlatform.HOSTFULLY: return; case PMSPlatform.GUESTY: const conversation = await runQuery( api.conversations.getById, { userId, conversationId, } ); if (!conversation) return; const guestyMessages = await fetchGuestyMessagesByConversationId( pmsIntegration?.accessToken, conversationId as string, 10 ); const rawMessages = await runMutation( internal.integrationsStore.guesty.storeRawMessages, { userId, messages: guestyMessages.filter( (message: any) => message.body && message.body !== "" ), listingId: conversation.listingMapId as string, reservationId: conversation.reservationId as string, } ); await runMutation( internal.integrationsStore.guesty.storeUnifiedMessages, { userId, rawMessages: !onlyGetHostMessages ? rawMessages : rawMessages.filter( (message: any) => message.responseData.sentBy !== "guest" ), } ); if ( rawMessages.length > 0 && rawMessages[0].messageId !== conversation.lastMessageId ) { await runMutation( internal.conversations.updateLastMessage, { id: conversation._id, lastMessageId: rawMessages[0] .messageId as string, } ); } break; default: return; } }, `messages.ts: updateMessagesForConversationByUserId, ${pmsPlatform}, user: ${userId}`); } );
Leave a Comment