Untitled
unknown
python
2 years ago
1.1 kB
15
Indexable
def mapper(self, _, line):
records = re.split('\n', line.strip())
for record in records:
station, date, reading = re.split(',', record)
yield f'{station},{date}',(float(reading),1)
yield f'{station},*',(float(reading),1)
def combiner(self, key, vals):
inter_val, inter_count = 0, 0
for val in vals:
inter_val += val[0]
inter_count += val[1]
yield key, (inter_val, inter_count)
def reducer_init(self):
self.overall_avg = 0
def reducer(self, key, vals):
tau = float(jobconf_from_env('myjob.settings.tau'))
total_readings, total_count = 0, 0
for val in vals:
total_readings += val[0]
total_count += val[1]
daily_avg = total_readings/total_count
station, date = key.split(',', 1)
if date == '*':
self.overall_avg = daily_avg
else:
gap = abs(self.overall_avg - daily_avg)
if gap > tau:
yield station, f'{date},{gap}'
Editor is loading...