Untitled
unknown
plain_text
2 years ago
35 kB
7
Indexable
"use node";
import { api, internal } from "../_generated/api";
import { Doc, Id } from "../_generated/dataModel";
import { action, internalAction } from "../_generated/server";
import { PMSPlatform, SlackMultiChannelConfig } from "../types";
import { stringToNumber } from "../utils/helpers";
import SlackIntegration, {
getSlackAccessToken,
revokeAccessToken,
} from "./utils/slackIntegration";
import { logSlackError } from "./utils/slackLogger";
export const fetchAndStoreAccessToken = internalAction(
async (
{ runQuery, runMutation },
{ userId, code }: { userId: string; code: string }
): Promise<any> => {
const user: Doc<"users"> | null = await runQuery(
internal.users.getByUserId,
{
userId: userId as Id<"users">,
}
);
if (!user) throw new Error("User not found");
const response: any = await getSlackAccessToken(code);
const { access_token, incoming_webhook } = response;
if (!access_token || !incoming_webhook)
throw new Error("Invalid response from Slack");
const SlackMessenger: SlackIntegration = new SlackIntegration(
userId,
access_token
);
await SlackMessenger.joinChannel(incoming_webhook.channel_id);
await runMutation(
internal.slackIntegration.slackIntegrations.addIntegration,
{
userId: userId as Id<"users">,
accessToken: access_token,
channelId: incoming_webhook.channel_id,
status: "on",
}
);
return response;
}
);
export const revokeAndRemoveAccessToken = action(
async ({ runQuery, runMutation }): Promise<any> => {
const user: Doc<"users"> | null = await runQuery(api.users.getUser);
if (!user) throw new Error("User not found");
const userId = user._id;
const integration = await runQuery(
internal.slackIntegration.slackIntegrations.getByUserId,
{
userId,
}
);
if (!integration) throw new Error("Integration not found");
const response: any = await revokeAccessToken(integration.accessToken);
console.log(response);
// turn off all listings
await runMutation(internal.listings.turnOffChatForAllListings, {
userId,
});
// clear slack channel for all listings
await runMutation(internal.listings.clearSlackChannelIdForAllListings, {
userId,
});
await runMutation(
internal.slackIntegration.slackIntegrations.removeIntegration,
{
userId,
}
);
return response;
}
);
export const allocateChannelsAndUpdate = internalAction(
async (
{ runQuery, runMutation, runAction },
{
userId,
channelNameToListingIds,
}: {
userId: Id<"users">;
channelNameToListingIds: SlackMultiChannelConfig;
}
): Promise<{ updated: boolean }> => {
const integration = await runQuery(
internal.slackIntegration.slackIntegrations.getByUserId,
{
userId,
}
);
if (!integration) return { updated: false };
const channelNames: string[] = Object.keys(channelNameToListingIds);
const nameToId: { [key: string]: string } | null = await runAction(
internal.actions.slackIntegration.createChannels,
{
userId,
integration,
channelNames,
}
);
if (!nameToId) return { updated: false };
for (const channelName of channelNames) {
const listingIds = channelNameToListingIds[channelName];
for (const listingId of listingIds) {
await runMutation(internal.listings.updateSlackChannelId, {
userId,
listingId,
slackChannelId: nameToId[channelName],
});
}
}
return { updated: true };
}
);
export const addSingleChannelAndUpdate = action(
async (
{ runQuery, runMutation, runAction },
{
channelName,
listingIds,
}: {
channelName: string;
listingIds: (string | number)[];
}
): Promise<{ success: boolean }> => {
const user = await runQuery(api.users.getUser);
if (!user) return { success: false };
const userId = user._id;
const integration = await runQuery(
internal.slackIntegration.slackIntegrations.getByUserId,
{
userId,
}
);
if (!integration) return { success: false };
const existingChannelNames = integration.channels;
// if exists, fail
if (
existingChannelNames &&
existingChannelNames.find(
(channel: any) => channel.name === channelName
)
)
return { success: false };
const nameToId: { [key: string]: string } | null = await runAction(
internal.actions.slackIntegration.createChannels,
{
userId,
integration,
channelNames: [channelName],
}
);
if (!nameToId) return { success: false };
console.log("Successfully created channel");
await runMutation(
internal.slackIntegration.slackIntegrations.addChannels,
{
userId,
channels: [
{
id: nameToId[channelName],
name: channelName,
},
],
}
);
for (const listingId of listingIds) {
await runMutation(internal.listings.updateSlackChannelId, {
userId,
listingId,
slackChannelId: nameToId[channelName],
});
}
console.log("Successfully added channel");
return { success: true };
}
);
export const editSingleChannelAndUpdate = action(
async (
{ runQuery, runMutation },
{
channelName,
channelId,
listingIds,
}: {
channelName: string;
channelId: string;
listingIds: (string | number)[];
}
): Promise<{ success: boolean }> => {
const user = await runQuery(api.users.getUser);
if (!user) return { success: false };
const userId = user._id;
const integration = await runQuery(
internal.slackIntegration.slackIntegrations.getByUserId,
{
userId,
}
);
if (!integration) return { success: false };
const existingChannelNames = integration.channels;
// if doesn't exist, means its different from before so edit
if (
existingChannelNames &&
existingChannelNames.find(
(channel: any) =>
channel.id === channelId && channel.name !== channelName
)
) {
const SlackChannelManager: SlackIntegration = new SlackIntegration(
userId,
integration.accessToken
);
const resEdit = await SlackChannelManager.editChannelName(
channelId,
channelName
);
if (!resEdit || !resEdit.ok) return { success: false };
await runMutation(
internal.slackIntegration.slackIntegrations.editChannelName,
{
userId,
channelId,
channelName,
}
);
}
// existing listings in channel
const existingListingIds = await runQuery(
internal.listings.listByUserChannelId,
{
userId,
channelId,
}
);
// remove channelId for listings that are not in the new listingIds
const listingIdsToRemove = existingListingIds
.map((listing) => listing.listingId)
.filter(
(listingId: number | string) => !listingIds.includes(listingId)
);
for (const listingIdToRemove of listingIdsToRemove) {
await runMutation(internal.listings.updateSlackChannelId, {
userId,
listingId: listingIdToRemove,
slackChannelId: "",
});
}
// edit channelId for listings that are in the new listingIds
for (const listingId of listingIds) {
await runMutation(internal.listings.updateSlackChannelId, {
userId,
listingId,
slackChannelId: channelId,
});
}
return { success: true };
}
);
export const createChannels = internalAction(
async (
{ runQuery, runMutation },
{
userId,
integration,
channelNames,
}: {
userId: Id<"users">;
integration: Doc<"slackIntegrations">;
channelNames: string[];
}
): Promise<{ [key: string]: string } | null> => {
const SlackMessenger: SlackIntegration = new SlackIntegration(
userId,
integration.accessToken
);
const nameToId: { [key: string]: string } = {};
for (const name of channelNames) {
const createdRes = await SlackMessenger.createChannel(name);
console.log(createdRes);
if (!createdRes || !createdRes.ok) return null;
nameToId[name] = createdRes.channel.id;
const users = await SlackMessenger.listUsers();
if (!users || !users.members) return null;
const userCommaSep = users.members
.filter(
(user: any) =>
!user.is_bot &&
!user.deleted &&
!user.is_restricted &&
!user.is_ultra_restricted
)
.map((user: any) => user.id)
.join(",");
const res = await SlackMessenger.inviteToChannel(
createdRes.channel.id,
userCommaSep
);
console.log(res);
if (!res || !res.ok) console.error(res);
}
if (Object.keys(nameToId).length === 0) return null;
console.log(nameToId);
return nameToId;
}
);
export const sendSentReplyMessage = internalAction(
async (
{ runQuery },
{
userId,
response,
integration,
listing,
conversation,
}: {
userId: Id<"users">;
listing: Doc<"listings">;
integration: Doc<"slackIntegrations">;
conversation: Doc<"conversations">;
response: string;
}
): Promise<any> => {
const SlackMessenger: SlackIntegration = new SlackIntegration(
userId,
integration.accessToken
);
await SlackMessenger.sendSentMessageNotification({
basicSlackInfo: {
channelId: listing.slackChannelId || integration.channelId,
guestName: conversation.recipientName || "N/A",
guestMessage: "",
hostResponse: response,
listingAddress: listing.address,
listingCity: listing.city || "",
listingImageUrl: listing.listingImage.url,
},
});
}
);
export const sendAutopilotMessage = internalAction(
async (
{ scheduler },
{
userId,
integration,
autopilotSetting,
hours,
}: {
userId: Id<"users">;
integration: Doc<"slackIntegrations">;
autopilotSetting: "on" | "off";
hours: number;
}
): Promise<any> => {
const SlackMessenger: SlackIntegration = new SlackIntegration(
userId,
integration.accessToken
);
const res = await SlackMessenger.sendAutopilotNotification({
channelId: integration.channelId,
autopilotSetting,
hours,
});
const {
message: { ts },
} = res;
if (!ts) return;
const hoursToMs = hours * 60 * 60 * 1000;
const expireTime = hoursToMs || 30 * 1000;
await scheduler.runAfter(
expireTime,
internal.actions.slackIntegration.removeMessage,
{
userId,
integration,
ts,
}
);
}
);
export const removeMessage = internalAction(
async (
{},
{
integration,
userId,
ts,
}: {
userId: Id<"users">;
integration: Doc<"slackIntegrations">;
ts: string;
}
): Promise<any> => {
const SlackMessenger: SlackIntegration = new SlackIntegration(
userId,
integration.accessToken
);
await SlackMessenger.deleteMessage(integration.channelId, ts);
}
);
export const queueTrainingModeMessage = internalAction(
async (
{ runMutation },
{
userId,
listingSlackChannelId,
integration,
conversation,
listing,
combinedMessage,
combinedResponse,
lastMessageId,
}: {
userId: Id<"users">;
listingSlackChannelId: string | undefined;
integration: Doc<"slackIntegrations">;
conversation: Doc<"conversations">;
listing: Doc<"listings">;
combinedMessage: string;
combinedResponse: string;
lastMessageId: string | number;
}
): Promise<any> => {
const SlackMessenger: SlackIntegration = new SlackIntegration(
userId,
integration.accessToken
);
const response: any = await SlackMessenger.queueTrainingModeMessage({
userId,
conversationId: conversation.conversationId.toString(),
messageId: lastMessageId.toString(),
basicSlackInfo: {
channelId: listingSlackChannelId || integration.channelId,
guestName: conversation.recipientName || "N/A",
guestMessage: combinedMessage,
listingAddress: listing.address,
listingCity: listing.city || "",
listingImageUrl: listing.listingImage.url,
hostResponse: combinedResponse,
},
});
if (!response.ok) {
return;
}
const { message } = response;
await runMutation(
internal.slackIntegration.slackMessageQueue.addMessage,
{
userId,
conversationId: conversation.conversationId,
listingId: listing.listingId,
pmsPlatform: conversation.pmsPlatform as PMSPlatform,
lastMessageId,
guestMessage: combinedMessage,
hostResponse: combinedResponse,
slackTs: message.ts,
channelId: listingSlackChannelId || integration.channelId,
}
);
}
);
export const queueUrgentMessage = internalAction(
async (
{ runMutation },
{
userId,
lastMessageId,
integration,
conversation,
listing,
combinedMessage,
}: {
userId: Id<"users">;
lastMessageId: string | number;
integration: Doc<"slackIntegrations">;
conversation: Doc<"conversations">;
listing: Doc<"listings">;
combinedMessage: string;
}
): Promise<any> => {
const SlackMessenger: SlackIntegration = new SlackIntegration(
userId,
integration.accessToken
);
const response: any = await SlackMessenger.queueUrgentMessage({
userId,
conversationId: conversation.conversationId.toString(),
messageId: lastMessageId.toString(),
basicSlackInfo: {
channelId: listing.slackChannelId || integration.channelId,
guestName: conversation.recipientName || "N/A",
guestMessage: combinedMessage,
listingAddress: listing.address,
listingCity: listing.city || "",
listingImageUrl: listing.listingImage.url,
},
});
if (!response.ok) {
return;
}
const { message } = response;
await runMutation(
internal.slackIntegration.slackMessageQueue.addMessage,
{
userId,
conversationId: conversation.conversationId,
pmsPlatform: conversation.pmsPlatform as PMSPlatform,
listingId: listing.listingId,
lastMessageId,
guestMessage: combinedMessage,
slackTs: message.ts,
channelId: listing.slackChannelId || integration.channelId,
}
);
}
);
export const expireMessage = internalAction(
async (
{ runMutation, runQuery },
{
userId,
lastMessageId,
listing,
integration,
conversation,
}: {
userId: Id<"users">;
lastMessageId: string | number;
listing: Doc<"listings">;
integration: Doc<"slackIntegrations">;
conversation: Doc<"conversations">;
}
): Promise<any> => {
const SlackMessenger: SlackIntegration = new SlackIntegration(
userId,
integration.accessToken
);
const slackMessage = await runQuery(
internal.slackIntegration.slackMessageQueue
.getByUserConversationMessageId,
{
userId,
conversationId: conversation.conversationId,
messageId: lastMessageId,
}
);
if (!slackMessage) return;
await SlackMessenger.deleteMessage(
listing.slackChannelId || integration.channelId,
slackMessage.slackTs
);
await runMutation(internal.slackIntegration.slackMessageQueue.remove, {
_id: slackMessage._id,
});
// remove all previous messages from queue and slack that are older than this message in the same conversation
const allMessages = await runQuery(
internal.slackIntegration.slackMessageQueue
.listPastByUserConversationId,
{
userId,
conversationId: conversation.conversationId,
creationTime: slackMessage._creationTime,
}
);
for (let i = 0; i < allMessages.length; i++) {
await SlackMessenger.deleteMessage(
listing.slackChannelId || integration.channelId,
allMessages[i].slackTs
);
await runMutation(
internal.slackIntegration.slackMessageQueue.remove,
{
_id: allMessages[i]._id,
}
);
}
}
);
export const sendErrorMessage = internalAction(
async (
{},
{
userId,
conversation,
listing,
hostResponse,
integration,
}: {
userId: Id<"users">;
conversation: Doc<"conversations">;
listing: Doc<"listings">;
integration: Doc<"slackIntegrations">;
hostResponse: string;
}
): Promise<any> => {
const SlackMessenger: SlackIntegration = new SlackIntegration(
userId,
integration.accessToken
);
await SlackMessenger.sendErrorMessageNotification({
guestName: conversation.recipientName || "N/A",
hostResponse,
channelId: listing.slackChannelId || integration.channelId,
});
}
);
export const sendMessageNotification = internalAction(
async (
{},
{
userId,
listing,
conversation,
combinedMessage,
urgency,
integration,
}: {
userId: Id<"users">;
listing: Doc<"listings">;
conversation: Doc<"conversations">;
combinedMessage: string;
urgency: "not urgent" | "urgent";
integration: Doc<"slackIntegrations">;
}
): Promise<any> => {
const SlackMessenger: SlackIntegration = new SlackIntegration(
userId,
integration.accessToken
);
const textUrgency = urgency === "not urgent" ? "Not Urgent" : "Urgent";
await SlackMessenger.sendGuestMessageNotification({
urgency: textUrgency,
basicSlackInfo: {
channelId: listing.slackChannelId || integration.channelId,
guestName: conversation.recipientName || "N/A",
guestMessage: combinedMessage,
listingAddress: listing.address,
listingCity: listing.city || "",
listingImageUrl: listing.listingImage.url,
},
});
}
);
export const sendUrgentResponseNotification = internalAction(
async (
{},
{
dhours,
dmins,
userId,
guestMessage,
response,
integration,
listing,
conversation,
}: {
dhours: number;
dmins: number;
userId: Id<"users">;
guestMessage: string;
listing: Doc<"listings">;
integration: Doc<"slackIntegrations">;
conversation: Doc<"conversations">;
response: string;
}
): Promise<any> => {
const SlackMessenger: SlackIntegration = new SlackIntegration(
userId,
integration.accessToken
);
await SlackMessenger.sendUrgentResponseNotification({
dhours,
dmins,
basicSlackInfo: {
channelId: integration.channelId,
guestName: conversation.recipientName || "N/A",
guestMessage: guestMessage,
hostResponse: response,
listingAddress: listing.address,
listingCity: listing.city || "",
listingImageUrl: listing.listingImage.url,
},
});
}
);
export const sendReservationModificationNotification = internalAction(
async (
{},
{
userId,
listing,
conversation,
combinedMessage,
integration,
}: {
userId: Id<"users">;
listing: Doc<"listings">;
conversation: Doc<"conversations">;
combinedMessage: string;
integration: Doc<"slackIntegrations">;
}
): Promise<any> => {
const SlackMessenger: SlackIntegration = new SlackIntegration(
userId,
integration.accessToken
);
await SlackMessenger.sendReservationModificationRequestNotification({
basicSlackInfo: {
channelId: listing.slackChannelId || integration.channelId,
guestName: conversation.recipientName || "N/A",
guestMessage: combinedMessage,
listingAddress: listing.address,
listingCity: listing.city || "",
listingImageUrl: listing.listingImage.url,
},
});
}
);
export const processPayloadAction = internalAction(
async (
{ runQuery, runAction, scheduler },
{
payload,
}: {
payload: any;
}
): Promise<any> => {
const { value } = payload.actions[0];
const [userId, conversationId, messageId, action] = value.split(";");
console.log(action);
const queueItem = await runQuery(
internal.slackIntegration.slackMessageQueue
.getByUserConversationMessageId,
{
userId,
conversationId: stringToNumber(conversationId),
messageId: stringToNumber(messageId),
}
);
const integration = await runQuery(
internal.slackIntegration.slackIntegrations.getByUserId,
{
userId,
}
);
if (!queueItem || !integration) return;
const SlackMessenger: SlackIntegration = new SlackIntegration(
userId,
integration.accessToken
);
const listing = await runQuery(
internal.listings.getByUserAndListingId,
{
userId,
listingId: queueItem.listingId,
}
);
const conversation = await runQuery(api.conversations.getById, {
userId,
conversationId: queueItem.conversationId,
});
if (!listing || !conversation) return;
const {
address: listingAddress,
listingImage: { url: listingImageUrl },
} = listing;
const { recipientName: guestName } = conversation;
// Handle training mode message
if (
(action === "approve" || action === "reject") &&
queueItem.hostResponse
) {
const {
pmsPlatform,
hostResponse,
slackTs,
channelId,
guestMessage,
} = queueItem;
let sent: boolean = false;
// send message
if (action === "approve") {
sent = await runAction(
internal.actions.messages.sendMessageInternal,
{
userId,
message: hostResponse,
conversationId: stringToNumber(conversationId),
pmsPlatform: pmsPlatform as PMSPlatform,
generated: true,
}
);
}
// update slack message
await SlackMessenger.updateTrainingModeMessage({
ts: slackTs,
basicSlackInfo: {
channelId,
guestName: guestName || "N/A",
guestMessage,
hostResponse,
listingAddress,
listingCity: listing.city || "",
listingImageUrl,
},
isApproved: action === "approve",
isReply: false,
});
// remove from queue
// await scheduler.runAfter(
// 0,
// internal.slackIntegration.slackMessageQueue.remove,
// {
// _id: queueItem._id,
// }
// );
// schedule to expire message after 10 seconds
await scheduler.runAfter(
10 * 1000,
internal.actions.slackIntegration.expireMessage,
{
userId,
listing,
lastMessageId: stringToNumber(messageId),
integration,
conversation,
}
);
}
// maybe we can optimize this a bit more to pop modal earlier
if (action === "reject_and_reply" || action === "urgent") {
await SlackMessenger.openModal({
triggerId: payload.trigger_id,
basicSlackInfo: {
channelId: listing.slackChannelId || integration.channelId,
guestName: guestName || "N/A",
hostResponse: queueItem.hostResponse || "",
guestMessage: queueItem.guestMessage,
listingAddress,
listingCity: listing.city || "",
listingImageUrl,
},
value,
});
}
await scheduler.runAfter(
1000,
internal.trainingModeMessages.addByUserId,
{
userId,
item: {
pmsPlatform: conversation.pmsPlatform as PMSPlatform,
conversationId,
body: queueItem.hostResponse as string,
approvalStatus:
action === "approve" ? "approved" : "rejected",
},
}
);
}
);
export const processPayloadView = internalAction(
async (
{ runQuery, runAction, scheduler },
{
payload,
}: {
payload: any;
}
): Promise<any> => {
const {
private_metadata,
state: {
values: {
input_block: {
plain_text_input_action: { value: hostResponse },
},
},
},
} = payload.view;
const [userId, conversationId, messageId, action] =
private_metadata.split(";");
console.log(action);
const queueItem = await runQuery(
internal.slackIntegration.slackMessageQueue
.getByUserConversationMessageId,
{
userId,
conversationId: stringToNumber(conversationId),
messageId: stringToNumber(messageId),
}
);
const integration = await runQuery(
internal.slackIntegration.slackIntegrations.getByUserId,
{
userId,
}
);
if (!queueItem || !integration) return;
const SlackMessenger: SlackIntegration = new SlackIntegration(
userId,
integration.accessToken
);
const listing = await runQuery(
internal.listings.getByUserAndListingId,
{
userId,
listingId: queueItem.listingId,
}
);
const conversation = await runQuery(api.conversations.getById, {
userId,
conversationId: queueItem.conversationId,
});
if (!listing || !conversation) return;
const {
address: listingAddress,
listingImage: { url: listingImageUrl },
} = listing;
const { recipientName: guestName } = conversation;
const { pmsPlatform, slackTs, channelId, guestMessage } = queueItem;
if (action === "reject_and_reply" || action === "urgent") {
const sent: boolean = await runAction(
internal.actions.messages.sendMessageInternal,
{
userId,
message: hostResponse,
conversationId: stringToNumber(conversationId),
pmsPlatform: pmsPlatform as PMSPlatform,
generated: false,
}
);
if (action === "urgent") {
// update slack message
await SlackMessenger.updateUrgentMessage({
ts: slackTs,
basicSlackInfo: {
channelId,
guestName: guestName || "N/A",
guestMessage,
hostResponse,
listingCity: listing.city || "",
listingAddress,
listingImageUrl,
},
sent,
});
} else {
await SlackMessenger.updateTrainingModeMessage({
ts: slackTs,
basicSlackInfo: {
channelId,
guestName: guestName || "N/A",
guestMessage,
hostResponse,
listingAddress,
listingCity: listing.city || "",
listingImageUrl,
},
isApproved: false,
isReply: true,
});
}
// remove from queue
// await scheduler.runAfter(
// 0,
// internal.slackIntegration.slackMessageQueue.remove,
// {
// _id: queueItem._id,
// }
// );
// schedule to expire message after 10 seconds
await scheduler.runAfter(
10 * 1000,
internal.actions.slackIntegration.expireMessage,
{
userId,
listing,
lastMessageId: stringToNumber(messageId),
integration,
conversation,
}
);
}
}
);
export const processSlashCommand = internalAction(
async (
{ runQuery, runAction, runMutation },
{
body,
}: {
body: string;
}
): Promise<string> => {
const params = new URLSearchParams(body);
const command: string | null = params.get("command");
if (!command) return "Error: Please try again";
switch (command) {
case "/on_autopilot":
case "/off_autopilot":
const channelId = params.get("channel_id");
if (!channelId) return "Error: Channel not found";
let hours = Number(params.get("text"));
hours = isNaN(hours) ? 0 : hours;
const integration = await runQuery(
internal.slackIntegration.slackIntegrations.getByChannelId,
{
channelId,
}
);
if (!integration) return "Error: Integration not found";
await runMutation(internal.aiSettings.switchAutopilot, {
userId: integration.userId,
on: command === "/on_autopilot" ? true : false,
hours,
slackIntegration: integration,
});
return "Success: Autopilot mode has been turned " + command;
default:
return "Default case processed";
}
}
);
Editor is loading...
Leave a Comment