Untitled

 avatar
unknown
python
2 years ago
1.1 kB
7
Indexable
def mapper(self, _, line):
        records = re.split('\n', line.strip())
        for record in records:
            station, date, reading = re.split(',', record)
            yield f'{station},{date}',(float(reading),1)
            yield f'{station},*',(float(reading),1)
    
    def combiner(self, key, vals):
        inter_val, inter_count = 0, 0
        for val in vals:
            inter_val += val[0]
            inter_count += val[1]
        
        yield key, (inter_val, inter_count)
    
    def reducer_init(self):
        self.overall_avg = 0
    
    def reducer(self, key, vals):
        tau = float(jobconf_from_env('myjob.settings.tau'))
        total_readings, total_count = 0, 0
        for val in vals:
            total_readings += val[0]
            total_count += val[1]
        daily_avg = total_readings/total_count
        
        station, date = key.split(',', 1)
        if date == '*':
            self.overall_avg = daily_avg
        else:
            gap = abs(self.overall_avg - daily_avg)
            if gap > tau:
                yield station, f'{date},{gap}'
    
Editor is loading...