Untitled

mail@pastecode.io avatar
unknown
plain_text
a month ago
5.3 kB
2
Indexable
Never
package main

import (
	"context"
	"fmt"
	"log"
	"time"

	"go.mongodb.org/mongo-driver/bson"
	"go.mongodb.org/mongo-driver/event"
	"go.mongodb.org/mongo-driver/mongo"
	"go.mongodb.org/mongo-driver/mongo/options"
)

func main() {
	// Set up MongoDB client
	cmdMonitor := &event.CommandMonitor{
		Started: func(_ context.Context, evt *event.CommandStartedEvent) {
			log.Println(evt.Command.String())
		},
	}
	clientOptions := options.Client().ApplyURI("mongodb://root:example@localhost:27017/local?tls=false&authSource=admin").SetMonitor(cmdMonitor)
	client, err := mongo.Connect(context.TODO(), clientOptions)
	if err != nil {
		log.Fatal(err)
	}

	// Check the connection
	err = client.Ping(context.TODO(), nil)
	if err != nil {
		log.Fatal(err)
	}
	fmt.Println("Connected to MongoDB!")

	// Set up collections
	conversationsCollection := client.Database("chat").Collection("conversation")
	// messagesCollection := client.Database("chat").Collection("message")

	// Define the time 30 minutes ago
	thirtyMinutesAgo := time.Now().UTC().Add(-30 * time.Minute)
	fmt.Println(thirtyMinutesAgo)
	_ = thirtyMinutesAgo

	// Aggregation pipeline
	// {
	// 	"$lookup": bson.M{
	// 		"from":         "conversation",
	// 		"localField":   "chatUserId",
	// 		"foreignField": "participants.chatUserId",
	// 		"as":           "conversations",
	// 	},
	// }
	// bson.M{"$unwind": "$conversations"},

	// pipeline := bson.A{
	// 	// lookup chat_user in participant
	// 	bson.M{
	// 		"$lookup": bson.M{
	// 			"from":         "conversation",
	// 			"localField":   "conversationId",
	// 			"foreignField": "conversationId",
	// 			"as":           "recent_messages",
	// 		},
	// 	},
	// 	bson.M{
	// 		"$addFields": bson.D{
	// 			{
	// 				Key: "recent_messages", Value: bson.D{
	// 					{Key: "$filter", Value: bson.D{
	// 						{Key: "input", Value: "$recent_messages"},
	// 						{Key: "as", Value: "message"},
	// 						{Key: "cond", Value: bson.D{
	// 							{Key: "$gte", Value: bson.A{"$$message.updatedAt", thirtyMinutesAgo}}},
	// 						},
	// 					},
	// 					},
	// 				},
	// 			},
	// 		},
	// 	},

	// 	bson.M{
	// 		"$match": bson.D{
	// 			{Key: "recent_messages", Value: bson.D{{Key: "$eq", Value: bson.A{}}}},
	// 		}},
	// 	bson.M{
	// 		"$project": bson.D{
	// 			{Key: "_id", Value: 1},
	// 			{Key: "title", Value: 1},
	// 		}},
	// }
	// pipeline := mongo.Pipeline{
	// 	// Step 1: Join with messages collection
	// 	{{"$lookup", bson.M{
	// 		"from", "messages",
	// 		"localField", "_id",
	// 		"foreignField", "conversation_id",
	// 		"as", "recent_messages",
	// 	}}},
	// 	// // Step 2: Filter messages from the last 30 minutes
	// 	{{"$addFields", bson.D{
	// 		{"recent_messages", bson.D{
	// 			{"$filter", bson.D{
	// 				{"input", "$recent_messages"},
	// 				{"as", "message"},
	// 				{"cond", bson.D{{"$gte", bson.A{"$$message.created_at", thirtyMinutesAgo}}}},
	// 			}},
	// 		}},
	// 	}}},
	// 	// Step 3: Match conversations with no recent messages
	// 	{{"$match", bson.D{
	// 		{"recent_messages", bson.D{{"$eq", bson.A{}}}},
	// 	}}},
	// 	// Optionally, project only needed fields
	// 	{{"$project", bson.D{
	// 		{"_id", 1},
	// 		{"title", 1},
	// 	}}},
	// }
	// pipeline := mongo.Pipeline{
	// 	// Unwind the participants array
	// 	{
	// 		{Key: "$match", Value: bson.M{"updatedAt": bson.M{"$gte": thirtyMinutesAgo}}},
	// 	},
	// 	// filter conversation status and list participants
	// 	{
	// 		{Key: "$group", Value: bson.M{"_id": "$conversationId"}},
	// 	},
	// 	{
	// 		{Key: "$group",
	// 			Value: bson.M{
	// 				"_id":              nil,
	// 				"conversation_ids": bson.M{"$addToSet": "$_id"},
	// 			},
	// 		},
	// 	},
	// }
	// Aggregation pipeline
	// Aggregation pipeline to find inactive messages
	pipeline := mongo.Pipeline{
		// Step 1: Lookup messages collection to find messages created in the last 30 minutes
		{
			{Key: "$lookup", Value: bson.M{
				"from":         "message",
				"localField":   "conversationId",
				"foreignField": "conversationId",
				"as":           "messages",
			}},
		},
		// Step 2: Unwind the messages array to flatten it
		{{Key: "$unwind", Value: "$messages"}},
		// Step 3: Match messages created in the last 30 minutes
		{
			{Key: "$match", Value: bson.M{
				"messages.createdAt": bson.M{"$gte": thirtyMinutesAgo},
			}},
		},
		// Step 4: Group by conversation ID to count messages per conversation
		{
			{Key: "$group", Value: bson.M{
				"_id":          "$conversationId",
				"messageCount": bson.M{"$sum": 1},
			}},
		},
		// // Step 5: Match conversations with no messages in the last 30 minutes
		// {{Key: "$match", Value: bson.M{
		// 	"count": bson.M{"$eq": 0},
		// }},
		// },
		// // Optionally, project only needed fields
		// {
		// 	{Key: "$project", Value: bson.M{
		// 		"_id":          1,
		// 		"participants": 1,
		// 		"createdAt":    1,
		// 		"updatedAt":    1,
		// 		"company_id":   1,
		// 	},
		// 	}},
	}

	// Execute the aggregation pipeline
	cursor, err := conversationsCollection.Aggregate(context.TODO(), pipeline)
	if err != nil {
		log.Fatal(err)
	}
	defer cursor.Close(context.TODO())

	// Iterate over the results
	for cursor.Next(context.Background()) {
		var conversation bson.M
		if err := cursor.Decode(&conversation); err != nil {
			log.Fatal(err)
		}
		fmt.Println("Inactive conversation:", conversation)
	}
}
Leave a Comment