Project

General

Profile

root / branches / 1.1 / src / haizea / core / accounting.py @ 841

1
# -------------------------------------------------------------------------- #
2
# Copyright 2006-2009, University of Chicago                                 #
3
# Copyright 2008-2009, Distributed Systems Architecture Group, Universidad   #
4
# Complutense de Madrid (dsa-research.org)                                   #
5
#                                                                            #
6
# Licensed under the Apache License, Version 2.0 (the "License"); you may    #
7
# not use this file except in compliance with the License. You may obtain    #
8
# a copy of the License at                                                   #
9
#                                                                            #
10
# http://www.apache.org/licenses/LICENSE-2.0                                 #
11
#                                                                            #
12
# Unless required by applicable law or agreed to in writing, software        #
13
# distributed under the License is distributed on an "AS IS" BASIS,          #
14
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.   #
15
# See the License for the specific language governing permissions and        #
16
# limitations under the License.                                             #
17
# -------------------------------------------------------------------------- #
18

    
19
"""Classes used to collect data"""
20

    
21
import os
22
import os.path
23
import logging
24
from haizea.common.utils import pickle, get_clock
25
from haizea.common.constants import LOGLEVEL_VDEBUG
26
from errno import EEXIST
27

    
28
class AccountingData(object):
29
    """A container for all the accounting data. When Haizea saves
30
    accounting data, it does so by pickling an object of this class.
31
    """
32
    
33
    def __init__(self):
34
        """Initializes all the counters and data to empty values"""
35
        # Counters
36
        self.counters = {}
37
        self.counter_avg_type = {}
38
        
39
        # Per-lease data
40
        self.lease_stats_names = []
41
        self.lease_stats = {}
42

    
43
        # Per-run data ("statistics")
44
        self.stats_names = []
45
        self.stats = {}
46

    
47
        # Leases
48
        self.leases = {}
49
        
50
        # Attributes
51
        self.attrs = {}
52
        
53
        self.starttime = None
54
        
55

    
56
class AccountingDataCollection(object):
57
    """Accounting data collection
58
    
59
    This class provides a framework to collect data while Haizea is running.
60
    It is designed so that the code that collects the data is placed in
61
    separate classes (the "probes"; a probe must be a child class of
62
    AccountingProbe). Probes can collect three types of data:
63
    
64
    Accounting probes can collect three types of data:
65

66
     - Per-lease data: Data attributable to individual leases or derived 
67
       from how each lease was scheduled. 
68
     - Per-run data: Data from a single run of Haizea. 
69
     - Counters: A counter is a time-ordered list showing how some metric 
70
       varied throughout a single run of Haizea. 
71
    
72
    The the AccountingDataCollection class takes care of calling these 
73
    probes at three points when Haizea is running:
74
    (1) at every time step, (2) when a lease is requested, and (3) when a
75
    lease is done.
76

77
    """
78
    
79
    AVERAGE_NONE=0
80
    AVERAGE_NORMAL=1
81
    AVERAGE_TIMEWEIGHTED=2
82

    
83

    
84
    def __init__(self, datafile, attrs):
85
        """Constructor
86
        
87
        @param datafile: Path to file where accounting data will be saved
88
        @type datafile: C{str}
89
        @param attrs: Run attributes
90
        @type attrs: C{dict}
91
        """
92
        self.__data = AccountingData()
93
        self.__datafile = datafile
94
        self.__probes = []
95

    
96
        self.__data.attrs = attrs
97

    
98

    
99
    def add_probe(self, probe):
100
        """Adds a new accounting probe
101
        
102
        @param probe: Probe to add
103
        @type probe: L{AccountingProbe}
104
        """
105
        self.__probes.append(probe)
106

    
107

    
108
    def create_counter(self, counter_id, avgtype):
109
        """Adds a new counter.
110
        
111
        Counters can store not just the value of the counter throughout
112
        time, but also a running average. This is specified with the
113
        avgtype parameter, which can be equal to:
114
        
115
         - AccountingDataCollection.AVERAGE_NONE: Don't compute an average
116
         - AccountingDataCollection.AVERAGE_NORMAL: For each entry, compute
117
           the average of all the values including and preceding that entry.
118
         - AccountingDataCollection.AVERAGE_TIMEWEIGHTED: For each entry,
119
           compute the average of all the values including and preceding
120
           that entry, weighing the average according to the time between
121
           each entry.
122
        
123
        @param counter_id: Name of the counter
124
        @type counter_id: C{str}
125
        @param avgtype: Type of average to compute
126
        @type avgtype: C{int}
127
        """        
128
        self.__data.counters[counter_id] = []
129
        self.__data.counter_avg_type[counter_id] = avgtype
130

    
131

    
132
    def create_lease_stat(self, stat_id):
133
        """Adds a new per-lease type of data ("stat").
134
        
135
        @param stat_id: Name of the stat
136
        @type stat_id: C{str}
137
        """        
138
        if not stat_id in self.__data.lease_stats_names:
139
            self.__data.lease_stats_names.append(stat_id)
140

    
141

    
142
    def create_stat(self, stat_id):
143
        """Adds a new per-run type of data ("stat").
144
        
145
        @param stat_id: Name of the stat
146
        @type stat_id: C{str}
147
        """        
148
        self.__data.stats_names.append(stat_id)
149

    
150

    
151
    def incr_counter(self, counter_id, lease_id = None, amount = 1):
152
        """Increment a counter
153
        
154
        @param counter_id: Name of the counter
155
        @type counter_id: C{str}
156
        @param lease_id: Optionally, the lease that caused this increment.
157
        @type lease_id: C{int}
158
        """        
159
        time = get_clock().get_time()
160
        self.append_to_counter(counter_id, self.__data.counters[counter_id][-1][2] + amount, lease_id)
161

    
162

    
163
    def decr_counter(self, counter_id, lease_id = None, amount = 1):
164
        """Decrement a counter
165
        
166
        @param counter_id: Name of the counter
167
        @type counter_id: C{str}
168
        @param lease_id: Optionally, the ID of the lease that caused this increment.
169
        @type lease_id: C{int}
170
        """        
171
        time = get_clock().get_time()
172
        self.append_to_counter(counter_id, self.__data.counters[counter_id][-1][2] - amount, lease_id)
173

    
174

    
175
    def append_to_counter(self, counter_id, value, lease_id = None):
176
        """Append a value to a counter
177
        
178
        @param counter_id: Name of the counter
179
        @type counter_id: C{str}
180
        @param value: Value to append
181
        @type value: C{int} or C{float}
182
        @param lease_id: Optionally, the ID of the lease that caused this increment.
183
        @type lease_id: C{int}
184
        """      
185
        time = get_clock().get_time()
186
        if len(self.__data.counters[counter_id]) > 0:
187
            prevtime = self.__data.counters[counter_id][-1][0]
188
            prevlease = self.__data.counters[counter_id][-1][1]
189
            prevval = self.__data.counters[counter_id][-1][2]
190
            if time == prevtime:
191
                self.__data.counters[counter_id][-1][2] = value
192
            else:
193
                if prevlease != lease_id or prevval != value:
194
                    self.__data.counters[counter_id].append([time, lease_id, value])
195
        else:
196
            self.__data.counters[counter_id].append([time, lease_id, value])
197

    
198

    
199

    
200
    def get_last_counter_time(self, counter_id):
201
        """Get the time of the last entry in a counter
202
        
203
        """        
204
        return self.__data.counters[counter_id][-1][0]
205
    
206

    
207
    def get_last_counter_value(self, counter_id):
208
        """Get the value of the last entry in a counter
209
        
210
        """        
211
        return self.__data.counters[counter_id][-1][2]
212
            
213

    
214
    def set_lease_stat(self, stat_id, lease_id, value):
215
        """Set the value of a per-lease datum
216
        
217
        @param stat_id: Name of the stat
218
        @type stat_id: C{str}
219
        @param lease_id: The ID of the lease the value is associated to
220
        @type lease_id: C{int}
221
        @param value: Value of the stat
222
        @type value: C{int} or C{float}
223
        """             
224
        self.__data.lease_stats.setdefault(lease_id, {})[stat_id] = value
225

    
226

    
227
    def set_stat(self, stat_id, value):
228
        """Set the value of a per-run datum
229
        
230
        @param stat_id: Name of the stat
231
        @type stat_id: C{str}
232
        @param value: Value of the stat
233
        @type value: C{int} or C{float}
234
        """  
235
        self.__data.stats[stat_id] = value
236
        
237
            
238
    def start(self, time):
239
        """Start collecting data
240
        
241
        @param time: Time at which data started being collected
242
        @type time: L{mx.DateTime}
243
        """        
244
        self.__data.starttime = time
245
        
246
        # Start the counters
247
        for counter_id in self.__data.counters:
248
            self.append_to_counter(counter_id, 0)
249

    
250
        
251
    def stop(self):
252
        """Stop collecting data
253
        
254
        """               
255
        time = get_clock().get_time()
256

    
257
        # Stop the counters
258
        for counter_id in self.__data.counters:
259
            self.append_to_counter(counter_id, self.__data.counters[counter_id][-1][2])
260
        
261
        # Add the averages
262
        for counter_id in self.__data.counters:
263
            l = self.__normalize_times(self.__data.counters[counter_id])
264
            avgtype = self.__data.counter_avg_type[counter_id]
265
            if avgtype == AccountingDataCollection.AVERAGE_NONE:
266
                self.__data.counters[counter_id] = self.__add_no_average(l)
267
            elif avgtype == AccountingDataCollection.AVERAGE_NORMAL:
268
                self.__data.counters[counter_id] = self.__add_average(l)
269
            elif avgtype == AccountingDataCollection.AVERAGE_TIMEWEIGHTED:
270
                self.__data.counters[counter_id] = self.__add_timeweighted_average(l)
271
        
272
        for probe in self.__probes:
273
            probe.finalize_accounting()
274
            
275

    
276
    def save_to_disk(self, leases):
277
        """Save accounting data to disk.
278
        
279
        @param leases: List of leases to be saved to disk
280
        @type leases: List of L{Lease}s
281
        """           
282
        try:
283
            dirname = os.path.dirname(self.__datafile)
284
            if not os.path.exists(dirname):
285
                os.makedirs(dirname)
286
        except OSError, e:
287
            if e.errno != EEXIST:
288
                raise e
289
    
290
        # Add lease data
291
        # Remove some data that won't be necessary in the reporting tools
292
        for l in leases.values():
293
            l.logger = None
294
            self.__data.leases[l.id] = l
295

    
296
        # Save data
297
        pickle(self.__data, self.__datafile)
298
            
299
    
300
    def at_timestep(self, lease_scheduler):
301
        """Invoke the probes' at_timestep handlers.
302
        
303
        @param lease_scheduler: Lease Scheduler
304
        @type lease_scheduler: L{LeaseScheduler}
305
        """        
306
        for probe in self.__probes:
307
            probe.at_timestep(lease_scheduler)
308
    
309
    
310
    def at_lease_request(self, lease):
311
        """Invoke the probes' at_lease_request handlers.
312
        
313
        @param lease: Requested lease
314
        @type lease: L{Lease}
315
        """        
316
        for probe in self.__probes:
317
            probe.at_lease_request(lease)
318

    
319
    
320
    def at_lease_done(self, lease):
321
        """Invoke the probes' at_lease_done handlers.
322
        
323
        @param lease: Lease that was completed
324
        @type lease: L{Lease}
325
        """        
326
        for probe in self.__probes:
327
            probe.at_lease_done(lease)
328
        if logging.getLogger().getEffectiveLevel() != LOGLEVEL_VDEBUG:
329
            lease.clear_rrs()
330
                
331
                
332
    def __normalize_times(self, counter):
333
        return [((v[0] - self.__data.starttime).seconds, v[1], v[2]) for v in counter]
334
        
335
        
336
    def __add_no_average(self, counter):
337
        return [(v[0], v[1], v[2], None) for v in counter]
338
    
339
    
340
    def __add_timeweighted_average(self, counter):
341
        accum = 0
342
        prev_time = None
343
        prev_value = None
344
        stats = []
345
        for v in counter:
346
            time = v[0]
347
            lease_id = v[1]
348
            value = v[2]
349
            if prev_time != None:
350
                timediff = time - prev_time
351
                weighted_value = prev_value*timediff
352
                accum += weighted_value
353
                avg = accum/time
354
            else:
355
                avg = value
356
            stats.append((time, lease_id, value, avg))
357
            prev_time = time
358
            prev_value = value
359
        
360
        return stats        
361
    
362
    
363
    def __add_average(self, counter):
364
        accum = 0
365
        count = 0
366
        stats = []
367
        for v in counter:
368
            value = v[2]
369
            accum += value
370
            count += 1
371
            avg = accum/count
372
            stats.append((v[0], v[1], value, avg))
373
        
374
        return stats          
375
    
376

    
377
class AccountingProbe(object):
378
    """Base class for accounting probes
379
    
380
    Accounting probes must extend this class, and can override some of
381
    the methods to make sure the accounting framework runs the probe
382
    at certain points (see method documentation for details on when
383
    to override a method).
384

385
    """
386
    def __init__(self, accounting):
387
        """Constructor
388
        
389
        Child classes must use their constructors to create counters 
390
        (with AccountingDataCollection.create_counter) and specify 
391
        per-lease data (with AccountingDataCollection.create_lease_stat)
392
        and per-run data (with AccountingDataCollection.create_stat).
393
        """
394
        self.accounting = accounting
395
        
396
    def finalize_accounting(self):
397
        """Finalize data collection.
398
        
399
        Override this method to perform any actions when data collection
400
        stops. This is usually where per-run data is computed.
401
        """
402
        pass
403
    
404
    def at_timestep(self, lease_scheduler):
405
        """Collect data at a timestep.
406
        
407
        Override this method to perform any actions every time the
408
        Haizea scheduler wakes up.
409

410
        @param lease_scheduler: Lease Scheduler
411
        @type lease_scheduler: L{LeaseScheduler}
412
        """
413
        pass
414
    
415
    def at_lease_request(self, lease):
416
        """Collect data after a lease request.
417
        
418
        Override this method to perform any actions after a lease
419
        has been requested.
420
 
421
        @param lease: Requested lease
422
        @type lease: L{Lease}
423
        """
424
        pass
425
    
426
    def at_lease_done(self, lease):
427
        """Collect data when a lease is done (this includes successful
428
        completion and rejected/cancelled/failed leases).
429
        
430
        @param lease: Lease that was completed
431
        @type lease: L{Lease}
432
        """        
433
        pass
434
    
435
    def _set_stat_from_counter(self, stat_id, counter_id):
436
        """Convenience function that sets the value of a per-run
437
        stat with the last value of a counter.
438
        
439
        @param stat_id: Name of per-run stat
440
        @type stat_id: C{str}
441
        @param counter_id: Name of counter
442
        @type counter_id: C{str}
443
        """        
444
        value = self.accounting.get_last_counter_value(counter_id)
445
        self.accounting.set_stat(stat_id, value)
446