Untitled

 avatar
unknown
plain_text
5 months ago
5.3 kB
2
Indexable
package main

import (
	"errors"
	"fmt"
	"sync"
)

// Job represents a single job with dependencies and an execution function.
type Job struct {
	Name         string
	Dependencies []*Job
	ExecuteFunc  func(depResults []interface{}) (interface{}, error) // Execution function for the job
}

// AddDependency adds a dependency to the job.
func (j *Job) AddDependency(dep *Job) {
	j.Dependencies = append(j.Dependencies, dep)
}

// JobGraph represents a collection of jobs and their dependencies.
type JobGraph struct {
	Jobs map[string]*Job
}

// NewJobGraph creates a new job graph.
func NewJobGraph() *JobGraph {
	return &JobGraph{Jobs: make(map[string]*Job)}
}

// AddJob creates or retrieves a job by name and sets its execute function.
func (g *JobGraph) AddJob(name string, executeFunc func(depResults []interface{}) (interface{}, error)) *Job {
	if job, exists := g.Jobs[name]; exists {
		return job
	}
	job := &Job{Name: name, ExecuteFunc: executeFunc}
	g.Jobs[name] = job
	return job
}

// AddDependency adds a dependency relationship between two jobs.
func (g *JobGraph) AddDependency(jobName, depName string) {
	job := g.Jobs[jobName]
	dep := g.Jobs[depName]
	job.AddDependency(dep)
}

// Scheduler schedules jobs for execution based on dependencies.
type Scheduler struct {
	JobGraph *JobGraph
}

// NewScheduler creates a new scheduler with the provided job graph.
func NewScheduler(graph *JobGraph) *Scheduler {
	return &Scheduler{JobGraph: graph}
}

// TopologicalSort performs a topological sort on the job graph to find a valid execution order.
func (s *Scheduler) TopologicalSort() ([]*Job, map[string]uint64, error) {
	visited := make(map[*Job]bool)
	visiting := make(map[*Job]bool)
	var result []*Job
	jobDependencyFrequency := make(map[string]uint64)

	var visit func(job *Job) error
	visit = func(job *Job) error {
		if visiting[job] {
			return errors.New("cycle detected in job dependencies")
		}
		if !visited[job] {
			visiting[job] = true
			for _, dep := range job.Dependencies {
				if err := visit(dep); err != nil {
					return err
				}
			}
			visiting[job] = false
			visited[job] = true
			result = append(result, job)
		}
		return nil
	}

	for _, job := range s.JobGraph.Jobs {
		if !visited[job] {
			if err := visit(job); err != nil {
				return nil, jobDependencyFrequency, err
			}
		}

		for _, dependency := range job.Dependencies {
			jobDependencyFrequency[dependency.Name] += 1
		}
	}

	// Reverse the result to get the correct order
	for i, j := 0, len(result)-1; i < j; i, j = i+1, j-1 {
		result[i], result[j] = result[j], result[i]
	}

	return result, jobDependencyFrequency, nil
}

// Execute runs the jobs in the order determined by topological sorting and in parallel where possible.
func (s *Scheduler) Execute() error {
	order, jobDependencyFrequency, err := s.TopologicalSort()
	if err != nil {
		return err
	}

	results := make(map[*Job]interface{}) // To store results of jobs
	var mu sync.Mutex                     // Protects access to the results map
	var wg sync.WaitGroup                 // To synchronize job execution

	// channel to signal job completion and pass results
	jobChannels := make(map[*Job]chan interface{})

	// Start each job in a goroutine, waiting on dependencies' results
	for _, job := range order {
		jobChannels[job] = make(chan interface{})
		wg.Add(1)

		go func(job *Job) {
			defer wg.Done()

			// Collect dependency results
			depResults := []interface{}{}
			for _, dep := range job.Dependencies {
				depResult := <-jobChannels[dep] // Wait for dependency to complete
				depResults = append(depResults, depResult)
			}

			// Execute the job with dependency results
			result, err := job.ExecuteFunc(depResults)
			if err != nil {
				fmt.Printf("Error executing %s: %v\n", job.Name, err)
				return
			}

			// Store result
			mu.Lock()
			results[job] = result
			mu.Unlock()

			// Signal completion and send result to channel
			numberOfParents := jobDependencyFrequency[job.Name]

			for numberOfParents > 0 {
				jobChannels[job] <- result
				numberOfParents--
			}

			fmt.Printf("Completed %s with result: %v\n", job.Name, result)
		}(job)
	}

	wg.Wait() // Wait for all jobs to finish
	return nil
}

func main() {
	jobGraph := NewJobGraph()

	// Define jobs with their execution functions
	jobGraph.AddJob("JobD", func(depResults []interface{}) (interface{}, error) {
		fmt.Println("Executing JobD")
		return "ResultD", nil
	})
	jobGraph.AddJob("JobB", func(depResults []interface{}) (interface{}, error) {
		fmt.Printf("Executing JobB with dependencies: %v\n", depResults)
		return "ResultB", nil
	})
	jobGraph.AddJob("JobC", func(depResults []interface{}) (interface{}, error) {
		fmt.Printf("Executing JobC with dependencies: %v\n", depResults)
		return "ResultC", nil
	})
	jobGraph.AddJob("JobA", func(depResults []interface{}) (interface{}, error) {
		fmt.Printf("Executing JobA with dependencies: %v\n", depResults)
		return "ResultA", nil
	})

	// Define dependencies
	jobGraph.AddDependency("JobA", "JobB")
	jobGraph.AddDependency("JobA", "JobC")
	jobGraph.AddDependency("JobB", "JobD")
	jobGraph.AddDependency("JobC", "JobD")

	// Execute jobs
	scheduler := NewScheduler(jobGraph)
	if err := scheduler.Execute(); err != nil {
		fmt.Println("Error:", err)
	}

	fmt.Printf("DONE")
}
Editor is loading...
Leave a Comment