Untitled

 avatar
unknown
golang
a month ago
3.5 kB
1
Indexable
func main() {
	ctx := context.Background()

	// Initialize clients
	pubsubClient, storageClient, err := initClients(ctx)
	if err != nil {
		log.Fatalf("failed to initialize clients: %v", err)
	}
	defer pubsubClient.Close()
	defer storageClient.Close()

	// Initialize downloader
	bucket := storageClient.Bucket(config.GetString(gcpBucketName))

	// Set up response topic
	responseTopic, err := setupResponseTopic(ctx, pubsubClient)
	if err != nil {
		log.Fatalf("failed to setup response topic: %v", err)
	}

	dl := downloader.New(bucket, responseTopic)

	// Cloud Run HTTP server
	http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
		slog.Info("received request",
			"method", r.Method,
			"content_length", r.ContentLength,
			"headers", r.Header,
		)

		var m PubSubMessage
		if err := json.NewDecoder(r.Body).Decode(&m); err != nil {
			slog.Error("failed to decode message",
				"error", err,
				"error_type", fmt.Sprintf("%T", err),
			)
			http.Error(w, "Bad Request", http.StatusBadRequest)
			return
		}

		slog.Info("decoded pub/sub message",
			"message_id", m.Message.ID,
			"subscription", m.Subscription,
			"data_size", len(m.Message.Data),
		)

		var msg downloader.DownloadMessage
		if err := json.Unmarshal(m.Message.Data, &msg); err != nil {
			slog.Error("failed to unmarshal message",
				"error", err,
				"error_type", fmt.Sprintf("%T", err),
				"raw_data", string(m.Message.Data),
			)
			http.Error(w, "Bad Request", http.StatusBadRequest)
			return
		}

		slog.Info("received message",
			"message_id", m.Message.ID,
			"video_id", msg.ID,
			"provider", msg.VideoProvider,
			"url", msg.VideoURL,
			"is_live", msg.IsLive,
			"created_at", msg.CreatedAt,
			"metadata", msg.Metadata,
		)

		// Create a new background context for processing
		// This ensures the processing continues even if the HTTP request times out
		processCtx := context.Background()

		opts := downloader.DownloadOptions{
			ID:            msg.ID,
			VideoURL:      msg.VideoURL,
			VideoProvider: msg.VideoProvider,
		}

		// Use a goroutine to handle the processing asynchronously
		go func() {
			if err := dl.ProcessVideo(processCtx, opts); err != nil {
				slog.Error("failed to process video in background",
					"error", err,
					"error_type", fmt.Sprintf("%T", err),
					"error_stack", fmt.Sprintf("%+v", err),
					"id", msg.ID,
					"provider", msg.VideoProvider,
					"url", msg.VideoURL,
				)

				// Log if the error is permanent
				if strings.Contains(err.Error(), "failed to generate signed URL") ||
					strings.Contains(err.Error(), "invalid URL") ||
					strings.Contains(err.Error(), "unsupported format") {
					slog.Info("permanent error detected in background processing",
						"error", err,
						"id", msg.ID,
					)
				} else {
					slog.Info("transient error detected in background processing",
						"error", err,
						"id", msg.ID,
					)
				}
				return
			}

			slog.Info("successfully processed video in background",
				"id", msg.ID,
				"provider", msg.VideoProvider,
				"url", msg.VideoURL,
			)
		}()

		// Immediately acknowledge the message
		slog.Info("acknowledged message, processing continues in background",
			"id", msg.ID,
			"provider", msg.VideoProvider,
			"url", msg.VideoURL,
		)
		w.WriteHeader(http.StatusOK)
	})

	port := os.Getenv("PORT")
	if port == "" {
		port = "8080"
	}

	slog.Info("starting HTTP server", "port", port)
	if err := http.ListenAndServe(":"+port, nil); err != nil {
		log.Fatal(err)
	}
}
Editor is loading...
Leave a Comment