Untitled
unknown
plain_text
a year ago
5.3 kB
3
Indexable
package main
import (
"errors"
"fmt"
"sync"
)
// Job represents a single job with dependencies and an execution function.
type Job struct {
Name string
Dependencies []*Job
ExecuteFunc func(depResults []interface{}) (interface{}, error) // Execution function for the job
}
// AddDependency adds a dependency to the job.
func (j *Job) AddDependency(dep *Job) {
j.Dependencies = append(j.Dependencies, dep)
}
// JobGraph represents a collection of jobs and their dependencies.
type JobGraph struct {
Jobs map[string]*Job
}
// NewJobGraph creates a new job graph.
func NewJobGraph() *JobGraph {
return &JobGraph{Jobs: make(map[string]*Job)}
}
// AddJob creates or retrieves a job by name and sets its execute function.
func (g *JobGraph) AddJob(name string, executeFunc func(depResults []interface{}) (interface{}, error)) *Job {
if job, exists := g.Jobs[name]; exists {
return job
}
job := &Job{Name: name, ExecuteFunc: executeFunc}
g.Jobs[name] = job
return job
}
// AddDependency adds a dependency relationship between two jobs.
func (g *JobGraph) AddDependency(jobName, depName string) {
job := g.Jobs[jobName]
dep := g.Jobs[depName]
job.AddDependency(dep)
}
// Scheduler schedules jobs for execution based on dependencies.
type Scheduler struct {
JobGraph *JobGraph
}
// NewScheduler creates a new scheduler with the provided job graph.
func NewScheduler(graph *JobGraph) *Scheduler {
return &Scheduler{JobGraph: graph}
}
// TopologicalSort performs a topological sort on the job graph to find a valid execution order.
func (s *Scheduler) TopologicalSort() ([]*Job, map[string]uint64, error) {
visited := make(map[*Job]bool)
visiting := make(map[*Job]bool)
var result []*Job
jobDependencyFrequency := make(map[string]uint64)
var visit func(job *Job) error
visit = func(job *Job) error {
if visiting[job] {
return errors.New("cycle detected in job dependencies")
}
if !visited[job] {
visiting[job] = true
for _, dep := range job.Dependencies {
if err := visit(dep); err != nil {
return err
}
}
visiting[job] = false
visited[job] = true
result = append(result, job)
}
return nil
}
for _, job := range s.JobGraph.Jobs {
if !visited[job] {
if err := visit(job); err != nil {
return nil, jobDependencyFrequency, err
}
}
for _, dependency := range job.Dependencies {
jobDependencyFrequency[dependency.Name] += 1
}
}
// Reverse the result to get the correct order
for i, j := 0, len(result)-1; i < j; i, j = i+1, j-1 {
result[i], result[j] = result[j], result[i]
}
return result, jobDependencyFrequency, nil
}
// Execute runs the jobs in the order determined by topological sorting and in parallel where possible.
func (s *Scheduler) Execute() error {
order, jobDependencyFrequency, err := s.TopologicalSort()
if err != nil {
return err
}
results := make(map[*Job]interface{}) // To store results of jobs
var mu sync.Mutex // Protects access to the results map
var wg sync.WaitGroup // To synchronize job execution
// channel to signal job completion and pass results
jobChannels := make(map[*Job]chan interface{})
// Start each job in a goroutine, waiting on dependencies' results
for _, job := range order {
jobChannels[job] = make(chan interface{})
wg.Add(1)
go func(job *Job) {
defer wg.Done()
// Collect dependency results
depResults := []interface{}{}
for _, dep := range job.Dependencies {
depResult := <-jobChannels[dep] // Wait for dependency to complete
depResults = append(depResults, depResult)
}
// Execute the job with dependency results
result, err := job.ExecuteFunc(depResults)
if err != nil {
fmt.Printf("Error executing %s: %v\n", job.Name, err)
return
}
// Store result
mu.Lock()
results[job] = result
mu.Unlock()
// Signal completion and send result to channel
numberOfParents := jobDependencyFrequency[job.Name]
for numberOfParents > 0 {
jobChannels[job] <- result
numberOfParents--
}
fmt.Printf("Completed %s with result: %v\n", job.Name, result)
}(job)
}
wg.Wait() // Wait for all jobs to finish
return nil
}
func main() {
jobGraph := NewJobGraph()
// Define jobs with their execution functions
jobGraph.AddJob("JobD", func(depResults []interface{}) (interface{}, error) {
fmt.Println("Executing JobD")
return "ResultD", nil
})
jobGraph.AddJob("JobB", func(depResults []interface{}) (interface{}, error) {
fmt.Printf("Executing JobB with dependencies: %v\n", depResults)
return "ResultB", nil
})
jobGraph.AddJob("JobC", func(depResults []interface{}) (interface{}, error) {
fmt.Printf("Executing JobC with dependencies: %v\n", depResults)
return "ResultC", nil
})
jobGraph.AddJob("JobA", func(depResults []interface{}) (interface{}, error) {
fmt.Printf("Executing JobA with dependencies: %v\n", depResults)
return "ResultA", nil
})
// Define dependencies
jobGraph.AddDependency("JobA", "JobB")
jobGraph.AddDependency("JobA", "JobC")
jobGraph.AddDependency("JobB", "JobD")
jobGraph.AddDependency("JobC", "JobD")
// Execute jobs
scheduler := NewScheduler(jobGraph)
if err := scheduler.Execute(); err != nil {
fmt.Println("Error:", err)
}
fmt.Printf("DONE")
}
Editor is loading...
Leave a Comment