Untitled
unknown
golang
9 months ago
3.5 kB
3
Indexable
func main() {
ctx := context.Background()
// Initialize clients
pubsubClient, storageClient, err := initClients(ctx)
if err != nil {
log.Fatalf("failed to initialize clients: %v", err)
}
defer pubsubClient.Close()
defer storageClient.Close()
// Initialize downloader
bucket := storageClient.Bucket(config.GetString(gcpBucketName))
// Set up response topic
responseTopic, err := setupResponseTopic(ctx, pubsubClient)
if err != nil {
log.Fatalf("failed to setup response topic: %v", err)
}
dl := downloader.New(bucket, responseTopic)
// Cloud Run HTTP server
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
slog.Info("received request",
"method", r.Method,
"content_length", r.ContentLength,
"headers", r.Header,
)
var m PubSubMessage
if err := json.NewDecoder(r.Body).Decode(&m); err != nil {
slog.Error("failed to decode message",
"error", err,
"error_type", fmt.Sprintf("%T", err),
)
http.Error(w, "Bad Request", http.StatusBadRequest)
return
}
slog.Info("decoded pub/sub message",
"message_id", m.Message.ID,
"subscription", m.Subscription,
"data_size", len(m.Message.Data),
)
var msg downloader.DownloadMessage
if err := json.Unmarshal(m.Message.Data, &msg); err != nil {
slog.Error("failed to unmarshal message",
"error", err,
"error_type", fmt.Sprintf("%T", err),
"raw_data", string(m.Message.Data),
)
http.Error(w, "Bad Request", http.StatusBadRequest)
return
}
slog.Info("received message",
"message_id", m.Message.ID,
"video_id", msg.ID,
"provider", msg.VideoProvider,
"url", msg.VideoURL,
"is_live", msg.IsLive,
"created_at", msg.CreatedAt,
"metadata", msg.Metadata,
)
// Create a new background context for processing
// This ensures the processing continues even if the HTTP request times out
processCtx := context.Background()
opts := downloader.DownloadOptions{
ID: msg.ID,
VideoURL: msg.VideoURL,
VideoProvider: msg.VideoProvider,
}
// Use a goroutine to handle the processing asynchronously
go func() {
if err := dl.ProcessVideo(processCtx, opts); err != nil {
slog.Error("failed to process video in background",
"error", err,
"error_type", fmt.Sprintf("%T", err),
"error_stack", fmt.Sprintf("%+v", err),
"id", msg.ID,
"provider", msg.VideoProvider,
"url", msg.VideoURL,
)
// Log if the error is permanent
if strings.Contains(err.Error(), "failed to generate signed URL") ||
strings.Contains(err.Error(), "invalid URL") ||
strings.Contains(err.Error(), "unsupported format") {
slog.Info("permanent error detected in background processing",
"error", err,
"id", msg.ID,
)
} else {
slog.Info("transient error detected in background processing",
"error", err,
"id", msg.ID,
)
}
return
}
slog.Info("successfully processed video in background",
"id", msg.ID,
"provider", msg.VideoProvider,
"url", msg.VideoURL,
)
}()
// Immediately acknowledge the message
slog.Info("acknowledged message, processing continues in background",
"id", msg.ID,
"provider", msg.VideoProvider,
"url", msg.VideoURL,
)
w.WriteHeader(http.StatusOK)
})
port := os.Getenv("PORT")
if port == "" {
port = "8080"
}
slog.Info("starting HTTP server", "port", port)
if err := http.ListenAndServe(":"+port, nil); err != nil {
log.Fatal(err)
}
}
Editor is loading...
Leave a Comment