1
|
|
2
|
|
3
|
|
4
|
|
5
|
|
6
|
|
7
|
|
8
|
|
9
|
|
10
|
|
11
|
|
12
|
|
13
|
|
14
|
|
15
|
|
16
|
|
17
|
|
18
|
|
19
|
"""Classes used to collect data"""
|
20
|
|
21
|
import os
|
22
|
import os.path
|
23
|
import logging
|
24
|
from haizea.common.utils import pickle, get_clock
|
25
|
from haizea.common.constants import LOGLEVEL_VDEBUG
|
26
|
from errno import EEXIST
|
27
|
|
28
|
class AccountingData(object):
|
29
|
"""A container for all the accounting data. When Haizea saves
|
30
|
accounting data, it does so by pickling an object of this class.
|
31
|
"""
|
32
|
|
33
|
def __init__(self):
|
34
|
"""Initializes all the counters and data to empty values"""
|
35
|
|
36
|
self.counters = {}
|
37
|
self.counter_avg_type = {}
|
38
|
|
39
|
|
40
|
self.lease_stats_names = []
|
41
|
self.lease_stats = {}
|
42
|
|
43
|
|
44
|
self.stats_names = []
|
45
|
self.stats = {}
|
46
|
|
47
|
|
48
|
self.leases = {}
|
49
|
|
50
|
|
51
|
self.attrs = {}
|
52
|
|
53
|
self.starttime = None
|
54
|
|
55
|
|
56
|
class AccountingDataCollection(object):
|
57
|
"""Accounting data collection
|
58
|
|
59
|
This class provides a framework to collect data while Haizea is running.
|
60
|
It is designed so that the code that collects the data is placed in
|
61
|
separate classes (the "probes"; a probe must be a child class of
|
62
|
AccountingProbe). Probes can collect three types of data:
|
63
|
|
64
|
Accounting probes can collect three types of data:
|
65
|
|
66
|
- Per-lease data: Data attributable to individual leases or derived
|
67
|
from how each lease was scheduled.
|
68
|
- Per-run data: Data from a single run of Haizea.
|
69
|
- Counters: A counter is a time-ordered list showing how some metric
|
70
|
varied throughout a single run of Haizea.
|
71
|
|
72
|
The the AccountingDataCollection class takes care of calling these
|
73
|
probes at three points when Haizea is running:
|
74
|
(1) at every time step, (2) when a lease is requested, and (3) when a
|
75
|
lease is done.
|
76
|
|
77
|
"""
|
78
|
|
79
|
AVERAGE_NONE=0
|
80
|
AVERAGE_NORMAL=1
|
81
|
AVERAGE_TIMEWEIGHTED=2
|
82
|
|
83
|
|
84
|
def __init__(self, datafile, attrs):
|
85
|
"""Constructor
|
86
|
|
87
|
@param datafile: Path to file where accounting data will be saved
|
88
|
@type datafile: C{str}
|
89
|
@param attrs: Run attributes
|
90
|
@type attrs: C{dict}
|
91
|
"""
|
92
|
self.__data = AccountingData()
|
93
|
self.__datafile = datafile
|
94
|
self.__probes = []
|
95
|
|
96
|
self.__data.attrs = attrs
|
97
|
|
98
|
|
99
|
def add_probe(self, probe):
|
100
|
"""Adds a new accounting probe
|
101
|
|
102
|
@param probe: Probe to add
|
103
|
@type probe: L{AccountingProbe}
|
104
|
"""
|
105
|
self.__probes.append(probe)
|
106
|
|
107
|
|
108
|
def create_counter(self, counter_id, avgtype):
|
109
|
"""Adds a new counter.
|
110
|
|
111
|
Counters can store not just the value of the counter throughout
|
112
|
time, but also a running average. This is specified with the
|
113
|
avgtype parameter, which can be equal to:
|
114
|
|
115
|
- AccountingDataCollection.AVERAGE_NONE: Don't compute an average
|
116
|
- AccountingDataCollection.AVERAGE_NORMAL: For each entry, compute
|
117
|
the average of all the values including and preceding that entry.
|
118
|
- AccountingDataCollection.AVERAGE_TIMEWEIGHTED: For each entry,
|
119
|
compute the average of all the values including and preceding
|
120
|
that entry, weighing the average according to the time between
|
121
|
each entry.
|
122
|
|
123
|
@param counter_id: Name of the counter
|
124
|
@type counter_id: C{str}
|
125
|
@param avgtype: Type of average to compute
|
126
|
@type avgtype: C{int}
|
127
|
"""
|
128
|
self.__data.counters[counter_id] = []
|
129
|
self.__data.counter_avg_type[counter_id] = avgtype
|
130
|
|
131
|
|
132
|
def create_lease_stat(self, stat_id):
|
133
|
"""Adds a new per-lease type of data ("stat").
|
134
|
|
135
|
@param stat_id: Name of the stat
|
136
|
@type stat_id: C{str}
|
137
|
"""
|
138
|
if not stat_id in self.__data.lease_stats_names:
|
139
|
self.__data.lease_stats_names.append(stat_id)
|
140
|
|
141
|
|
142
|
def create_stat(self, stat_id):
|
143
|
"""Adds a new per-run type of data ("stat").
|
144
|
|
145
|
@param stat_id: Name of the stat
|
146
|
@type stat_id: C{str}
|
147
|
"""
|
148
|
self.__data.stats_names.append(stat_id)
|
149
|
|
150
|
|
151
|
def incr_counter(self, counter_id, lease_id = None, amount = 1):
|
152
|
"""Increment a counter
|
153
|
|
154
|
@param counter_id: Name of the counter
|
155
|
@type counter_id: C{str}
|
156
|
@param lease_id: Optionally, the lease that caused this increment.
|
157
|
@type lease_id: C{int}
|
158
|
"""
|
159
|
time = get_clock().get_time()
|
160
|
self.append_to_counter(counter_id, self.__data.counters[counter_id][-1][2] + amount, lease_id)
|
161
|
|
162
|
|
163
|
def decr_counter(self, counter_id, lease_id = None, amount = 1):
|
164
|
"""Decrement a counter
|
165
|
|
166
|
@param counter_id: Name of the counter
|
167
|
@type counter_id: C{str}
|
168
|
@param lease_id: Optionally, the ID of the lease that caused this increment.
|
169
|
@type lease_id: C{int}
|
170
|
"""
|
171
|
time = get_clock().get_time()
|
172
|
self.append_to_counter(counter_id, self.__data.counters[counter_id][-1][2] - amount, lease_id)
|
173
|
|
174
|
|
175
|
def append_to_counter(self, counter_id, value, lease_id = None):
|
176
|
"""Append a value to a counter
|
177
|
|
178
|
@param counter_id: Name of the counter
|
179
|
@type counter_id: C{str}
|
180
|
@param value: Value to append
|
181
|
@type value: C{int} or C{float}
|
182
|
@param lease_id: Optionally, the ID of the lease that caused this increment.
|
183
|
@type lease_id: C{int}
|
184
|
"""
|
185
|
time = get_clock().get_time()
|
186
|
if len(self.__data.counters[counter_id]) > 0:
|
187
|
prevtime = self.__data.counters[counter_id][-1][0]
|
188
|
prevlease = self.__data.counters[counter_id][-1][1]
|
189
|
prevval = self.__data.counters[counter_id][-1][2]
|
190
|
if time == prevtime:
|
191
|
self.__data.counters[counter_id][-1][2] = value
|
192
|
else:
|
193
|
if prevlease != lease_id or prevval != value:
|
194
|
self.__data.counters[counter_id].append([time, lease_id, value])
|
195
|
else:
|
196
|
self.__data.counters[counter_id].append([time, lease_id, value])
|
197
|
|
198
|
|
199
|
|
200
|
def get_last_counter_time(self, counter_id):
|
201
|
"""Get the time of the last entry in a counter
|
202
|
|
203
|
"""
|
204
|
return self.__data.counters[counter_id][-1][0]
|
205
|
|
206
|
|
207
|
def get_last_counter_value(self, counter_id):
|
208
|
"""Get the value of the last entry in a counter
|
209
|
|
210
|
"""
|
211
|
return self.__data.counters[counter_id][-1][2]
|
212
|
|
213
|
|
214
|
def set_lease_stat(self, stat_id, lease_id, value):
|
215
|
"""Set the value of a per-lease datum
|
216
|
|
217
|
@param stat_id: Name of the stat
|
218
|
@type stat_id: C{str}
|
219
|
@param lease_id: The ID of the lease the value is associated to
|
220
|
@type lease_id: C{int}
|
221
|
@param value: Value of the stat
|
222
|
@type value: C{int} or C{float}
|
223
|
"""
|
224
|
self.__data.lease_stats.setdefault(lease_id, {})[stat_id] = value
|
225
|
|
226
|
|
227
|
def set_stat(self, stat_id, value):
|
228
|
"""Set the value of a per-run datum
|
229
|
|
230
|
@param stat_id: Name of the stat
|
231
|
@type stat_id: C{str}
|
232
|
@param value: Value of the stat
|
233
|
@type value: C{int} or C{float}
|
234
|
"""
|
235
|
self.__data.stats[stat_id] = value
|
236
|
|
237
|
|
238
|
def start(self, time):
|
239
|
"""Start collecting data
|
240
|
|
241
|
@param time: Time at which data started being collected
|
242
|
@type time: L{mx.DateTime}
|
243
|
"""
|
244
|
self.__data.starttime = time
|
245
|
|
246
|
|
247
|
for counter_id in self.__data.counters:
|
248
|
self.append_to_counter(counter_id, 0)
|
249
|
|
250
|
|
251
|
def stop(self):
|
252
|
"""Stop collecting data
|
253
|
|
254
|
"""
|
255
|
time = get_clock().get_time()
|
256
|
|
257
|
|
258
|
for counter_id in self.__data.counters:
|
259
|
self.append_to_counter(counter_id, self.__data.counters[counter_id][-1][2])
|
260
|
|
261
|
|
262
|
for counter_id in self.__data.counters:
|
263
|
l = self.__normalize_times(self.__data.counters[counter_id])
|
264
|
avgtype = self.__data.counter_avg_type[counter_id]
|
265
|
if avgtype == AccountingDataCollection.AVERAGE_NONE:
|
266
|
self.__data.counters[counter_id] = self.__add_no_average(l)
|
267
|
elif avgtype == AccountingDataCollection.AVERAGE_NORMAL:
|
268
|
self.__data.counters[counter_id] = self.__add_average(l)
|
269
|
elif avgtype == AccountingDataCollection.AVERAGE_TIMEWEIGHTED:
|
270
|
self.__data.counters[counter_id] = self.__add_timeweighted_average(l)
|
271
|
|
272
|
for probe in self.__probes:
|
273
|
probe.finalize_accounting()
|
274
|
|
275
|
|
276
|
def save_to_disk(self, leases):
|
277
|
"""Save accounting data to disk.
|
278
|
|
279
|
@param leases: List of leases to be saved to disk
|
280
|
@type leases: List of L{Lease}s
|
281
|
"""
|
282
|
try:
|
283
|
dirname = os.path.dirname(self.__datafile)
|
284
|
if not os.path.exists(dirname):
|
285
|
os.makedirs(dirname)
|
286
|
except OSError, e:
|
287
|
if e.errno != EEXIST:
|
288
|
raise e
|
289
|
|
290
|
|
291
|
|
292
|
for l in leases.values():
|
293
|
l.logger = None
|
294
|
self.__data.leases[l.id] = l
|
295
|
|
296
|
|
297
|
pickle(self.__data, self.__datafile)
|
298
|
|
299
|
|
300
|
def at_timestep(self, lease_scheduler):
|
301
|
"""Invoke the probes' at_timestep handlers.
|
302
|
|
303
|
@param lease_scheduler: Lease Scheduler
|
304
|
@type lease_scheduler: L{LeaseScheduler}
|
305
|
"""
|
306
|
for probe in self.__probes:
|
307
|
probe.at_timestep(lease_scheduler)
|
308
|
|
309
|
|
310
|
def at_lease_request(self, lease):
|
311
|
"""Invoke the probes' at_lease_request handlers.
|
312
|
|
313
|
@param lease: Requested lease
|
314
|
@type lease: L{Lease}
|
315
|
"""
|
316
|
for probe in self.__probes:
|
317
|
probe.at_lease_request(lease)
|
318
|
|
319
|
|
320
|
def at_lease_done(self, lease):
|
321
|
"""Invoke the probes' at_lease_done handlers.
|
322
|
|
323
|
@param lease: Lease that was completed
|
324
|
@type lease: L{Lease}
|
325
|
"""
|
326
|
for probe in self.__probes:
|
327
|
probe.at_lease_done(lease)
|
328
|
if logging.getLogger().getEffectiveLevel() != LOGLEVEL_VDEBUG:
|
329
|
lease.clear_rrs()
|
330
|
|
331
|
|
332
|
def __normalize_times(self, counter):
|
333
|
return [((v[0] - self.__data.starttime).seconds, v[1], v[2]) for v in counter]
|
334
|
|
335
|
|
336
|
def __add_no_average(self, counter):
|
337
|
return [(v[0], v[1], v[2], None) for v in counter]
|
338
|
|
339
|
|
340
|
def __add_timeweighted_average(self, counter):
|
341
|
accum = 0
|
342
|
prev_time = None
|
343
|
prev_value = None
|
344
|
stats = []
|
345
|
for v in counter:
|
346
|
time = v[0]
|
347
|
lease_id = v[1]
|
348
|
value = v[2]
|
349
|
if prev_time != None:
|
350
|
timediff = time - prev_time
|
351
|
weighted_value = prev_value*timediff
|
352
|
accum += weighted_value
|
353
|
avg = accum/time
|
354
|
else:
|
355
|
avg = value
|
356
|
stats.append((time, lease_id, value, avg))
|
357
|
prev_time = time
|
358
|
prev_value = value
|
359
|
|
360
|
return stats
|
361
|
|
362
|
|
363
|
def __add_average(self, counter):
|
364
|
accum = 0
|
365
|
count = 0
|
366
|
stats = []
|
367
|
for v in counter:
|
368
|
value = v[2]
|
369
|
accum += value
|
370
|
count += 1
|
371
|
avg = accum/count
|
372
|
stats.append((v[0], v[1], value, avg))
|
373
|
|
374
|
return stats
|
375
|
|
376
|
|
377
|
class AccountingProbe(object):
|
378
|
"""Base class for accounting probes
|
379
|
|
380
|
Accounting probes must extend this class, and can override some of
|
381
|
the methods to make sure the accounting framework runs the probe
|
382
|
at certain points (see method documentation for details on when
|
383
|
to override a method).
|
384
|
|
385
|
"""
|
386
|
def __init__(self, accounting):
|
387
|
"""Constructor
|
388
|
|
389
|
Child classes must use their constructors to create counters
|
390
|
(with AccountingDataCollection.create_counter) and specify
|
391
|
per-lease data (with AccountingDataCollection.create_lease_stat)
|
392
|
and per-run data (with AccountingDataCollection.create_stat).
|
393
|
"""
|
394
|
self.accounting = accounting
|
395
|
|
396
|
def finalize_accounting(self):
|
397
|
"""Finalize data collection.
|
398
|
|
399
|
Override this method to perform any actions when data collection
|
400
|
stops. This is usually where per-run data is computed.
|
401
|
"""
|
402
|
pass
|
403
|
|
404
|
def at_timestep(self, lease_scheduler):
|
405
|
"""Collect data at a timestep.
|
406
|
|
407
|
Override this method to perform any actions every time the
|
408
|
Haizea scheduler wakes up.
|
409
|
|
410
|
@param lease_scheduler: Lease Scheduler
|
411
|
@type lease_scheduler: L{LeaseScheduler}
|
412
|
"""
|
413
|
pass
|
414
|
|
415
|
def at_lease_request(self, lease):
|
416
|
"""Collect data after a lease request.
|
417
|
|
418
|
Override this method to perform any actions after a lease
|
419
|
has been requested.
|
420
|
|
421
|
@param lease: Requested lease
|
422
|
@type lease: L{Lease}
|
423
|
"""
|
424
|
pass
|
425
|
|
426
|
def at_lease_done(self, lease):
|
427
|
"""Collect data when a lease is done (this includes successful
|
428
|
completion and rejected/cancelled/failed leases).
|
429
|
|
430
|
@param lease: Lease that was completed
|
431
|
@type lease: L{Lease}
|
432
|
"""
|
433
|
pass
|
434
|
|
435
|
def _set_stat_from_counter(self, stat_id, counter_id):
|
436
|
"""Convenience function that sets the value of a per-run
|
437
|
stat with the last value of a counter.
|
438
|
|
439
|
@param stat_id: Name of per-run stat
|
440
|
@type stat_id: C{str}
|
441
|
@param counter_id: Name of counter
|
442
|
@type counter_id: C{str}
|
443
|
"""
|
444
|
value = self.accounting.get_last_counter_value(counter_id)
|
445
|
self.accounting.set_stat(stat_id, value)
|
446
|
|