Untitled
unknown
plain_text
5 months ago
5.3 kB
2
Indexable
package main import ( "errors" "fmt" "sync" ) // Job represents a single job with dependencies and an execution function. type Job struct { Name string Dependencies []*Job ExecuteFunc func(depResults []interface{}) (interface{}, error) // Execution function for the job } // AddDependency adds a dependency to the job. func (j *Job) AddDependency(dep *Job) { j.Dependencies = append(j.Dependencies, dep) } // JobGraph represents a collection of jobs and their dependencies. type JobGraph struct { Jobs map[string]*Job } // NewJobGraph creates a new job graph. func NewJobGraph() *JobGraph { return &JobGraph{Jobs: make(map[string]*Job)} } // AddJob creates or retrieves a job by name and sets its execute function. func (g *JobGraph) AddJob(name string, executeFunc func(depResults []interface{}) (interface{}, error)) *Job { if job, exists := g.Jobs[name]; exists { return job } job := &Job{Name: name, ExecuteFunc: executeFunc} g.Jobs[name] = job return job } // AddDependency adds a dependency relationship between two jobs. func (g *JobGraph) AddDependency(jobName, depName string) { job := g.Jobs[jobName] dep := g.Jobs[depName] job.AddDependency(dep) } // Scheduler schedules jobs for execution based on dependencies. type Scheduler struct { JobGraph *JobGraph } // NewScheduler creates a new scheduler with the provided job graph. func NewScheduler(graph *JobGraph) *Scheduler { return &Scheduler{JobGraph: graph} } // TopologicalSort performs a topological sort on the job graph to find a valid execution order. func (s *Scheduler) TopologicalSort() ([]*Job, map[string]uint64, error) { visited := make(map[*Job]bool) visiting := make(map[*Job]bool) var result []*Job jobDependencyFrequency := make(map[string]uint64) var visit func(job *Job) error visit = func(job *Job) error { if visiting[job] { return errors.New("cycle detected in job dependencies") } if !visited[job] { visiting[job] = true for _, dep := range job.Dependencies { if err := visit(dep); err != nil { return err } } visiting[job] = false visited[job] = true result = append(result, job) } return nil } for _, job := range s.JobGraph.Jobs { if !visited[job] { if err := visit(job); err != nil { return nil, jobDependencyFrequency, err } } for _, dependency := range job.Dependencies { jobDependencyFrequency[dependency.Name] += 1 } } // Reverse the result to get the correct order for i, j := 0, len(result)-1; i < j; i, j = i+1, j-1 { result[i], result[j] = result[j], result[i] } return result, jobDependencyFrequency, nil } // Execute runs the jobs in the order determined by topological sorting and in parallel where possible. func (s *Scheduler) Execute() error { order, jobDependencyFrequency, err := s.TopologicalSort() if err != nil { return err } results := make(map[*Job]interface{}) // To store results of jobs var mu sync.Mutex // Protects access to the results map var wg sync.WaitGroup // To synchronize job execution // channel to signal job completion and pass results jobChannels := make(map[*Job]chan interface{}) // Start each job in a goroutine, waiting on dependencies' results for _, job := range order { jobChannels[job] = make(chan interface{}) wg.Add(1) go func(job *Job) { defer wg.Done() // Collect dependency results depResults := []interface{}{} for _, dep := range job.Dependencies { depResult := <-jobChannels[dep] // Wait for dependency to complete depResults = append(depResults, depResult) } // Execute the job with dependency results result, err := job.ExecuteFunc(depResults) if err != nil { fmt.Printf("Error executing %s: %v\n", job.Name, err) return } // Store result mu.Lock() results[job] = result mu.Unlock() // Signal completion and send result to channel numberOfParents := jobDependencyFrequency[job.Name] for numberOfParents > 0 { jobChannels[job] <- result numberOfParents-- } fmt.Printf("Completed %s with result: %v\n", job.Name, result) }(job) } wg.Wait() // Wait for all jobs to finish return nil } func main() { jobGraph := NewJobGraph() // Define jobs with their execution functions jobGraph.AddJob("JobD", func(depResults []interface{}) (interface{}, error) { fmt.Println("Executing JobD") return "ResultD", nil }) jobGraph.AddJob("JobB", func(depResults []interface{}) (interface{}, error) { fmt.Printf("Executing JobB with dependencies: %v\n", depResults) return "ResultB", nil }) jobGraph.AddJob("JobC", func(depResults []interface{}) (interface{}, error) { fmt.Printf("Executing JobC with dependencies: %v\n", depResults) return "ResultC", nil }) jobGraph.AddJob("JobA", func(depResults []interface{}) (interface{}, error) { fmt.Printf("Executing JobA with dependencies: %v\n", depResults) return "ResultA", nil }) // Define dependencies jobGraph.AddDependency("JobA", "JobB") jobGraph.AddDependency("JobA", "JobC") jobGraph.AddDependency("JobB", "JobD") jobGraph.AddDependency("JobC", "JobD") // Execute jobs scheduler := NewScheduler(jobGraph) if err := scheduler.Execute(); err != nil { fmt.Println("Error:", err) } fmt.Printf("DONE") }
Editor is loading...
Leave a Comment