Project

General

Profile

root / trunk / src / haizea / core / accounting.py @ 632

1
# -------------------------------------------------------------------------- #
2
# Copyright 2006-2008, University of Chicago                                 #
3
# Copyright 2008, Distributed Systems Architecture Group, Universidad        #
4
# Complutense de Madrid (dsa-research.org)                                   #
5
#                                                                            #
6
# Licensed under the Apache License, Version 2.0 (the "License"); you may    #
7
# not use this file except in compliance with the License. You may obtain    #
8
# a copy of the License at                                                   #
9
#                                                                            #
10
# http://www.apache.org/licenses/LICENSE-2.0                                 #
11
#                                                                            #
12
# Unless required by applicable law or agreed to in writing, software        #
13
# distributed under the License is distributed on an "AS IS" BASIS,          #
14
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.   #
15
# See the License for the specific language governing permissions and        #
16
# limitations under the License.                                             #
17
# -------------------------------------------------------------------------- #
18

    
19
import os
20
import os.path
21
import haizea.common.constants as constants
22
from haizea.core.leases import Lease
23
from haizea.common.utils import pickle, get_config, get_clock
24
from errno import EEXIST
25

    
26
class AccountingData(object):
27
    def __init__(self):
28
        # Counters
29
        self.counters = {}
30
        self.counter_lists = {}
31
        self.counter_avg_type = {}
32
        
33
        # Lease data
34
        self.leases = {}
35
        
36
        # Attributes
37
        self.attrs = {}
38
        
39
        self.starttime = None
40
        
41
    def get_waiting_times(self):
42
        waiting_times = {}
43
        for lease_id in self.leases:
44
            lease = self.leases[lease_id]
45
            if lease.get_type() == Lease.BEST_EFFORT:
46
                waiting_times[lease_id] = lease.get_waiting_time()
47
        return waiting_times
48

    
49
    def get_slowdowns(self):
50
        slowdowns = {}
51
        for lease_id in self.leases:
52
            lease = self.leases[lease_id]
53
            if lease.get_type() == Lease.BEST_EFFORT:
54
                slowdowns[lease_id] = lease.get_slowdown()
55
        return slowdowns
56
    
57
    def get_besteffort_end(self):
58
        return max([l.end for l in self.leases.values() if l.get_type() == Lease.BEST_EFFORT])
59

    
60
class AccountingDataCollection(object):
61
    def __init__(self, manager, datafile):
62
        self.data = AccountingData()
63
        self.manager = manager
64
        self.datafile = datafile
65
        
66
        attrs = get_config().get_attrs()
67
        for attr in attrs:
68
            self.data.attrs[attr] = get_config().get_attr(attr)
69

    
70
    def create_counter(self, counter_id, avgtype, initial=0):
71
        self.data.counters[counter_id] = initial
72
        self.data.counter_lists[counter_id] = []
73
        self.data.counter_avg_type[counter_id] = avgtype
74

    
75
    def incr_counter(self, counter_id, lease_id = None):
76
        time = get_clock().get_time()
77
        self.append_stat(counter_id, self.data.counters[counter_id] + 1, lease_id, time)
78

    
79
    def decr_counter(self, counter_id, lease_id = None):
80
        time = get_clock().get_time()
81
        self.append_stat(counter_id, self.data.counters[counter_id] - 1, lease_id, time)
82
        
83
    def append_stat(self, counter_id, value, lease_id = None, time = None):
84
        if time == None:
85
            time = get_clock().get_time()
86
        if len(self.data.counter_lists[counter_id]) > 0:
87
            prevtime = self.data.counter_lists[counter_id][-1][0]
88
        else:
89
            prevtime = None
90
        self.data.counters[counter_id] = value
91
        if time == prevtime:
92
            self.data.counter_lists[counter_id][-1][2] = value
93
        else:
94
            self.data.counter_lists[counter_id].append([time, lease_id, value])
95

    
96
        
97
    def start(self, time):
98
        self.data.starttime = time
99
        
100
        # Start the counters
101
        for counter_id in self.data.counters:
102
            initial = self.data.counters[counter_id]
103
            self.append_stat(counter_id, initial, time = time)
104

    
105
        
106
    def stop(self):
107
        time = get_clock().get_time()
108

    
109
        # Stop the counters
110
        for counter_id in self.data.counters:
111
            self.append_stat(counter_id, self.data.counters[counter_id], time=time)
112
        
113
        # Add the averages
114
        for counter_id in self.data.counters:
115
            l = self.normalize_times(self.data.counter_lists[counter_id])
116
            avgtype = self.data.counter_avg_type[counter_id]
117
            if avgtype == constants.AVERAGE_NONE:
118
                self.data.counter_lists[counter_id] = self.add_no_average(l)
119
            elif avgtype == constants.AVERAGE_NORMAL:
120
                self.data.counter_lists[counter_id] = self.add_average(l)
121
            elif avgtype == constants.AVERAGE_TIMEWEIGHTED:
122
                self.data.counter_lists[counter_id] = self.add_timeweighted_average(l)
123
            
124
    def normalize_times(self, data):
125
        return [((v[0] - self.data.starttime).seconds, v[1], v[2]) for v in data]
126
        
127
    def add_no_average(self, data):
128
        return [(v[0], v[1], v[2], None) for v in data]
129
    
130
    def add_timeweighted_average(self, data):
131
        accum = 0
132
        prev_time = None
133
        prev_value = None
134
        stats = []
135
        for v in data:
136
            time = v[0]
137
            lease_id = v[1]
138
            value = v[2]
139
            if prev_time != None:
140
                timediff = time - prev_time
141
                weighted_value = prev_value*timediff
142
                accum += weighted_value
143
                avg = accum/time
144
            else:
145
                avg = value
146
            stats.append((time, lease_id, value, avg))
147
            prev_time = time
148
            prev_value = value
149
        
150
        return stats        
151
    
152
    def add_average(self, data):
153
        accum = 0
154
        count = 0
155
        stats = []
156
        for v in data:
157
            value = v[2]
158
            accum += value
159
            count += 1
160
            avg = accum/count
161
            stats.append((v[0], v[1], value, avg))
162
        
163
        return stats          
164
    
165
    def save_to_disk(self):
166
        try:
167
            dirname = os.path.dirname(self.datafile)
168
            if not os.path.exists(dirname):
169
                os.makedirs(dirname)
170
        except OSError, e:
171
            if e.errno != EEXIST:
172
                raise e
173
    
174
        # Add lease data
175
        leases = self.manager.scheduler.completed_leases.entries
176
        # Remove some data that won't be necessary in the reporting tools
177
        for l in leases.values():
178
            l.clear_rrs()
179
            l.logger = None
180
            self.data.leases[l.id] = l
181

    
182
        # Save data
183
        pickle(self.data, self.datafile)
184

    
185
                
186