Untitled
unknown
golang
a month ago
3.5 kB
1
Indexable
func main() { ctx := context.Background() // Initialize clients pubsubClient, storageClient, err := initClients(ctx) if err != nil { log.Fatalf("failed to initialize clients: %v", err) } defer pubsubClient.Close() defer storageClient.Close() // Initialize downloader bucket := storageClient.Bucket(config.GetString(gcpBucketName)) // Set up response topic responseTopic, err := setupResponseTopic(ctx, pubsubClient) if err != nil { log.Fatalf("failed to setup response topic: %v", err) } dl := downloader.New(bucket, responseTopic) // Cloud Run HTTP server http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { slog.Info("received request", "method", r.Method, "content_length", r.ContentLength, "headers", r.Header, ) var m PubSubMessage if err := json.NewDecoder(r.Body).Decode(&m); err != nil { slog.Error("failed to decode message", "error", err, "error_type", fmt.Sprintf("%T", err), ) http.Error(w, "Bad Request", http.StatusBadRequest) return } slog.Info("decoded pub/sub message", "message_id", m.Message.ID, "subscription", m.Subscription, "data_size", len(m.Message.Data), ) var msg downloader.DownloadMessage if err := json.Unmarshal(m.Message.Data, &msg); err != nil { slog.Error("failed to unmarshal message", "error", err, "error_type", fmt.Sprintf("%T", err), "raw_data", string(m.Message.Data), ) http.Error(w, "Bad Request", http.StatusBadRequest) return } slog.Info("received message", "message_id", m.Message.ID, "video_id", msg.ID, "provider", msg.VideoProvider, "url", msg.VideoURL, "is_live", msg.IsLive, "created_at", msg.CreatedAt, "metadata", msg.Metadata, ) // Create a new background context for processing // This ensures the processing continues even if the HTTP request times out processCtx := context.Background() opts := downloader.DownloadOptions{ ID: msg.ID, VideoURL: msg.VideoURL, VideoProvider: msg.VideoProvider, } // Use a goroutine to handle the processing asynchronously go func() { if err := dl.ProcessVideo(processCtx, opts); err != nil { slog.Error("failed to process video in background", "error", err, "error_type", fmt.Sprintf("%T", err), "error_stack", fmt.Sprintf("%+v", err), "id", msg.ID, "provider", msg.VideoProvider, "url", msg.VideoURL, ) // Log if the error is permanent if strings.Contains(err.Error(), "failed to generate signed URL") || strings.Contains(err.Error(), "invalid URL") || strings.Contains(err.Error(), "unsupported format") { slog.Info("permanent error detected in background processing", "error", err, "id", msg.ID, ) } else { slog.Info("transient error detected in background processing", "error", err, "id", msg.ID, ) } return } slog.Info("successfully processed video in background", "id", msg.ID, "provider", msg.VideoProvider, "url", msg.VideoURL, ) }() // Immediately acknowledge the message slog.Info("acknowledged message, processing continues in background", "id", msg.ID, "provider", msg.VideoProvider, "url", msg.VideoURL, ) w.WriteHeader(http.StatusOK) }) port := os.Getenv("PORT") if port == "" { port = "8080" } slog.Info("starting HTTP server", "port", port) if err := http.ListenAndServe(":"+port, nil); err != nil { log.Fatal(err) } }
Editor is loading...
Leave a Comment