Untitled
unknown
plain_text
a year ago
5.3 kB
4
Indexable
package main import ( "context" "fmt" "log" "time" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/event" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" ) func main() { // Set up MongoDB client cmdMonitor := &event.CommandMonitor{ Started: func(_ context.Context, evt *event.CommandStartedEvent) { log.Println(evt.Command.String()) }, } clientOptions := options.Client().ApplyURI("mongodb://root:example@localhost:27017/local?tls=false&authSource=admin").SetMonitor(cmdMonitor) client, err := mongo.Connect(context.TODO(), clientOptions) if err != nil { log.Fatal(err) } // Check the connection err = client.Ping(context.TODO(), nil) if err != nil { log.Fatal(err) } fmt.Println("Connected to MongoDB!") // Set up collections conversationsCollection := client.Database("chat").Collection("conversation") // messagesCollection := client.Database("chat").Collection("message") // Define the time 30 minutes ago thirtyMinutesAgo := time.Now().UTC().Add(-30 * time.Minute) fmt.Println(thirtyMinutesAgo) _ = thirtyMinutesAgo // Aggregation pipeline // { // "$lookup": bson.M{ // "from": "conversation", // "localField": "chatUserId", // "foreignField": "participants.chatUserId", // "as": "conversations", // }, // } // bson.M{"$unwind": "$conversations"}, // pipeline := bson.A{ // // lookup chat_user in participant // bson.M{ // "$lookup": bson.M{ // "from": "conversation", // "localField": "conversationId", // "foreignField": "conversationId", // "as": "recent_messages", // }, // }, // bson.M{ // "$addFields": bson.D{ // { // Key: "recent_messages", Value: bson.D{ // {Key: "$filter", Value: bson.D{ // {Key: "input", Value: "$recent_messages"}, // {Key: "as", Value: "message"}, // {Key: "cond", Value: bson.D{ // {Key: "$gte", Value: bson.A{"$$message.updatedAt", thirtyMinutesAgo}}}, // }, // }, // }, // }, // }, // }, // }, // bson.M{ // "$match": bson.D{ // {Key: "recent_messages", Value: bson.D{{Key: "$eq", Value: bson.A{}}}}, // }}, // bson.M{ // "$project": bson.D{ // {Key: "_id", Value: 1}, // {Key: "title", Value: 1}, // }}, // } // pipeline := mongo.Pipeline{ // // Step 1: Join with messages collection // {{"$lookup", bson.M{ // "from", "messages", // "localField", "_id", // "foreignField", "conversation_id", // "as", "recent_messages", // }}}, // // // Step 2: Filter messages from the last 30 minutes // {{"$addFields", bson.D{ // {"recent_messages", bson.D{ // {"$filter", bson.D{ // {"input", "$recent_messages"}, // {"as", "message"}, // {"cond", bson.D{{"$gte", bson.A{"$$message.created_at", thirtyMinutesAgo}}}}, // }}, // }}, // }}}, // // Step 3: Match conversations with no recent messages // {{"$match", bson.D{ // {"recent_messages", bson.D{{"$eq", bson.A{}}}}, // }}}, // // Optionally, project only needed fields // {{"$project", bson.D{ // {"_id", 1}, // {"title", 1}, // }}}, // } // pipeline := mongo.Pipeline{ // // Unwind the participants array // { // {Key: "$match", Value: bson.M{"updatedAt": bson.M{"$gte": thirtyMinutesAgo}}}, // }, // // filter conversation status and list participants // { // {Key: "$group", Value: bson.M{"_id": "$conversationId"}}, // }, // { // {Key: "$group", // Value: bson.M{ // "_id": nil, // "conversation_ids": bson.M{"$addToSet": "$_id"}, // }, // }, // }, // } // Aggregation pipeline // Aggregation pipeline to find inactive messages pipeline := mongo.Pipeline{ // Step 1: Lookup messages collection to find messages created in the last 30 minutes { {Key: "$lookup", Value: bson.M{ "from": "message", "localField": "conversationId", "foreignField": "conversationId", "as": "messages", }}, }, // Step 2: Unwind the messages array to flatten it {{Key: "$unwind", Value: "$messages"}}, // Step 3: Match messages created in the last 30 minutes { {Key: "$match", Value: bson.M{ "messages.createdAt": bson.M{"$gte": thirtyMinutesAgo}, }}, }, // Step 4: Group by conversation ID to count messages per conversation { {Key: "$group", Value: bson.M{ "_id": "$conversationId", "messageCount": bson.M{"$sum": 1}, }}, }, // // Step 5: Match conversations with no messages in the last 30 minutes // {{Key: "$match", Value: bson.M{ // "count": bson.M{"$eq": 0}, // }}, // }, // // Optionally, project only needed fields // { // {Key: "$project", Value: bson.M{ // "_id": 1, // "participants": 1, // "createdAt": 1, // "updatedAt": 1, // "company_id": 1, // }, // }}, } // Execute the aggregation pipeline cursor, err := conversationsCollection.Aggregate(context.TODO(), pipeline) if err != nil { log.Fatal(err) } defer cursor.Close(context.TODO()) // Iterate over the results for cursor.Next(context.Background()) { var conversation bson.M if err := cursor.Decode(&conversation); err != nil { log.Fatal(err) } fmt.Println("Inactive conversation:", conversation) } }
Editor is loading...
Leave a Comment