Untitled
unknown
python
2 years ago
1.1 kB
7
Indexable
def mapper(self, _, line): records = re.split('\n', line.strip()) for record in records: station, date, reading = re.split(',', record) yield f'{station},{date}',(float(reading),1) yield f'{station},*',(float(reading),1) def combiner(self, key, vals): inter_val, inter_count = 0, 0 for val in vals: inter_val += val[0] inter_count += val[1] yield key, (inter_val, inter_count) def reducer_init(self): self.overall_avg = 0 def reducer(self, key, vals): tau = float(jobconf_from_env('myjob.settings.tau')) total_readings, total_count = 0, 0 for val in vals: total_readings += val[0] total_count += val[1] daily_avg = total_readings/total_count station, date = key.split(',', 1) if date == '*': self.overall_avg = daily_avg else: gap = abs(self.overall_avg - daily_avg) if gap > tau: yield station, f'{date},{gap}'
Editor is loading...