Untitled
unknown
plain_text
2 years ago
5.3 kB
7
Indexable
package main
import (
"context"
"fmt"
"log"
"time"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/event"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)
func main() {
// Set up MongoDB client
cmdMonitor := &event.CommandMonitor{
Started: func(_ context.Context, evt *event.CommandStartedEvent) {
log.Println(evt.Command.String())
},
}
clientOptions := options.Client().ApplyURI("mongodb://root:example@localhost:27017/local?tls=false&authSource=admin").SetMonitor(cmdMonitor)
client, err := mongo.Connect(context.TODO(), clientOptions)
if err != nil {
log.Fatal(err)
}
// Check the connection
err = client.Ping(context.TODO(), nil)
if err != nil {
log.Fatal(err)
}
fmt.Println("Connected to MongoDB!")
// Set up collections
conversationsCollection := client.Database("chat").Collection("conversation")
// messagesCollection := client.Database("chat").Collection("message")
// Define the time 30 minutes ago
thirtyMinutesAgo := time.Now().UTC().Add(-30 * time.Minute)
fmt.Println(thirtyMinutesAgo)
_ = thirtyMinutesAgo
// Aggregation pipeline
// {
// "$lookup": bson.M{
// "from": "conversation",
// "localField": "chatUserId",
// "foreignField": "participants.chatUserId",
// "as": "conversations",
// },
// }
// bson.M{"$unwind": "$conversations"},
// pipeline := bson.A{
// // lookup chat_user in participant
// bson.M{
// "$lookup": bson.M{
// "from": "conversation",
// "localField": "conversationId",
// "foreignField": "conversationId",
// "as": "recent_messages",
// },
// },
// bson.M{
// "$addFields": bson.D{
// {
// Key: "recent_messages", Value: bson.D{
// {Key: "$filter", Value: bson.D{
// {Key: "input", Value: "$recent_messages"},
// {Key: "as", Value: "message"},
// {Key: "cond", Value: bson.D{
// {Key: "$gte", Value: bson.A{"$$message.updatedAt", thirtyMinutesAgo}}},
// },
// },
// },
// },
// },
// },
// },
// bson.M{
// "$match": bson.D{
// {Key: "recent_messages", Value: bson.D{{Key: "$eq", Value: bson.A{}}}},
// }},
// bson.M{
// "$project": bson.D{
// {Key: "_id", Value: 1},
// {Key: "title", Value: 1},
// }},
// }
// pipeline := mongo.Pipeline{
// // Step 1: Join with messages collection
// {{"$lookup", bson.M{
// "from", "messages",
// "localField", "_id",
// "foreignField", "conversation_id",
// "as", "recent_messages",
// }}},
// // // Step 2: Filter messages from the last 30 minutes
// {{"$addFields", bson.D{
// {"recent_messages", bson.D{
// {"$filter", bson.D{
// {"input", "$recent_messages"},
// {"as", "message"},
// {"cond", bson.D{{"$gte", bson.A{"$$message.created_at", thirtyMinutesAgo}}}},
// }},
// }},
// }}},
// // Step 3: Match conversations with no recent messages
// {{"$match", bson.D{
// {"recent_messages", bson.D{{"$eq", bson.A{}}}},
// }}},
// // Optionally, project only needed fields
// {{"$project", bson.D{
// {"_id", 1},
// {"title", 1},
// }}},
// }
// pipeline := mongo.Pipeline{
// // Unwind the participants array
// {
// {Key: "$match", Value: bson.M{"updatedAt": bson.M{"$gte": thirtyMinutesAgo}}},
// },
// // filter conversation status and list participants
// {
// {Key: "$group", Value: bson.M{"_id": "$conversationId"}},
// },
// {
// {Key: "$group",
// Value: bson.M{
// "_id": nil,
// "conversation_ids": bson.M{"$addToSet": "$_id"},
// },
// },
// },
// }
// Aggregation pipeline
// Aggregation pipeline to find inactive messages
pipeline := mongo.Pipeline{
// Step 1: Lookup messages collection to find messages created in the last 30 minutes
{
{Key: "$lookup", Value: bson.M{
"from": "message",
"localField": "conversationId",
"foreignField": "conversationId",
"as": "messages",
}},
},
// Step 2: Unwind the messages array to flatten it
{{Key: "$unwind", Value: "$messages"}},
// Step 3: Match messages created in the last 30 minutes
{
{Key: "$match", Value: bson.M{
"messages.createdAt": bson.M{"$gte": thirtyMinutesAgo},
}},
},
// Step 4: Group by conversation ID to count messages per conversation
{
{Key: "$group", Value: bson.M{
"_id": "$conversationId",
"messageCount": bson.M{"$sum": 1},
}},
},
// // Step 5: Match conversations with no messages in the last 30 minutes
// {{Key: "$match", Value: bson.M{
// "count": bson.M{"$eq": 0},
// }},
// },
// // Optionally, project only needed fields
// {
// {Key: "$project", Value: bson.M{
// "_id": 1,
// "participants": 1,
// "createdAt": 1,
// "updatedAt": 1,
// "company_id": 1,
// },
// }},
}
// Execute the aggregation pipeline
cursor, err := conversationsCollection.Aggregate(context.TODO(), pipeline)
if err != nil {
log.Fatal(err)
}
defer cursor.Close(context.TODO())
// Iterate over the results
for cursor.Next(context.Background()) {
var conversation bson.M
if err := cursor.Decode(&conversation); err != nil {
log.Fatal(err)
}
fmt.Println("Inactive conversation:", conversation)
}
}
Editor is loading...
Leave a Comment