Project

General

Profile

root / trunk / src / haizea / core / scheduler / resourcepool.py @ 632

1
# -------------------------------------------------------------------------- #
2
# Copyright 2006-2008, University of Chicago                                 #
3
# Copyright 2008, Distributed Systems Architecture Group, Universidad        #
4
# Complutense de Madrid (dsa-research.org)                                   #
5
#                                                                            #
6
# Licensed under the Apache License, Version 2.0 (the "License"); you may    #
7
# not use this file except in compliance with the License. You may obtain    #
8
# a copy of the License at                                                   #
9
#                                                                            #
10
# http://www.apache.org/licenses/LICENSE-2.0                                 #
11
#                                                                            #
12
# Unless required by applicable law or agreed to in writing, software        #
13
# distributed under the License is distributed on an "AS IS" BASIS,          #
14
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.   #
15
# See the License for the specific language governing permissions and        #
16
# limitations under the License.                                             #
17
# -------------------------------------------------------------------------- #
18

    
19
from haizea.common.utils import vnodemapstr, get_accounting
20
import haizea.common.constants as constants
21
import haizea.core.enact.actions as actions
22
from haizea.core.scheduler import EnactmentError
23
import logging 
24

    
25

    
26
class ResourcePool(object):
27
    def __init__(self, info_enact, vm_enact, deploy_enact):
28
        self.logger = logging.getLogger("RPOOL")
29
                
30
        self.info = info_enact
31
        self.vm = vm_enact
32
        # TODO: Ideally, deployment enactment shouldn't be here, specially since
33
        # it already "hangs" below the deployment modules. For now,
34
        # it does no harm, though.
35
        self.deployment = deploy_enact
36
        
37
        self.nodes = self.info.get_nodes()
38
        
39
    def start_vms(self, lease, rr):
40
        start_action = actions.VMEnactmentStartAction()
41
        start_action.from_rr(rr)
42
                
43
        for (vnode, pnode) in rr.nodes.items():
44
            node = self.get_node(pnode)
45
            #diskimage = node.get_diskimage(lease.id, vnode, lease.diskimage_id)
46
            start_action.vnodes[vnode].pnode = node.enactment_info
47
            #start_action.vnodes[vnode].diskimage = diskimage.filename
48
            start_action.vnodes[vnode].resources = rr.resources_in_pnode[pnode]
49

    
50
        try:
51
            self.vm.start(start_action)
52
        except EnactmentError, exc:
53
            self.logger.error("Enactment of start VM failed: %s" % exc.message)
54
            raise
55
        
56
    def stop_vms(self, lease, rr):
57
        stop_action = actions.VMEnactmentStopAction()
58
        stop_action.from_rr(rr)
59
        try:
60
            self.vm.stop(stop_action)
61
        except EnactmentError, exc:
62
            self.logger.error("Enactment of end VM failed: %s" % exc.message)
63
            raise
64
         
65
    def suspend_vms(self, lease, rr):
66
        # Add memory image files
67
        for vnode in rr.vnodes:
68
            pnode = rr.vmrr.nodes[vnode]
69
            self.add_ramfile(pnode, lease.id, vnode, lease.requested_resources[vnode].get_quantity(constants.RES_MEM))
70

    
71
        # Enact suspend
72
        suspend_action = actions.VMEnactmentSuspendAction()
73
        suspend_action.from_rr(rr)
74
        try:
75
            self.vm.suspend(suspend_action)
76
        except EnactmentError, exc:
77
            self.logger.error("Enactment of suspend VM failed: %s" % exc.message)
78
            raise
79
    
80
    def verify_suspend(self, lease, rr):
81
        verify_suspend_action = actions.VMEnactmentConfirmSuspendAction()
82
        verify_suspend_action.from_rr(rr)
83
        self.vm.verify_suspend(verify_suspend_action)
84
    
85
    def resume_vms(self, lease, rr):
86
        # Remove memory image files
87
        for vnode in rr.vnodes:
88
            pnode = rr.vmrr.nodes[vnode]
89
            self.remove_ramfile(pnode, lease.id, vnode)
90

    
91
        # Enact resume
92
        resume_action = actions.VMEnactmentResumeAction()
93
        resume_action.from_rr(rr)
94
        try:
95
            self.vm.resume(resume_action)
96
        except EnactmentError, exc:
97
            self.logger.error("Enactment of resume VM failed: %s" % exc.message)
98
            raise
99
    
100
    def verify_resume(self, lease, rr):
101
        verify_resume_action = actions.VMEnactmentConfirmResumeAction()
102
        verify_resume_action.from_rr(rr)
103
        self.vm.verify_resume(verify_resume_action)    
104
    
105
    def get_nodes(self):
106
        return self.nodes.values()
107
    
108
    # An auxiliary node is a host whose resources are going to be scheduled, but
109
    # where no VMs are actually going to run. For example, a disk image repository node.
110
    def get_aux_nodes(self):
111
        # TODO: We're only asking the deployment enactment module for auxiliary nodes.
112
        # There might be a scenario where the info enactment module also reports
113
        # auxiliary nodes.
114
        return self.deployment.get_aux_nodes()
115

    
116
    def get_num_nodes(self):
117
        return len(self.nodes)
118
        
119
    def get_node(self, node_id):
120
        return self.nodes[node_id]
121
        
122
    def add_diskimage(self, pnode, diskimage_id, imagesize, lease_id, vnode):
123
        self.logger.debug("Adding disk image for L%iV%i in pnode=%i" % (lease_id, vnode, pnode))
124
        
125
        self.logger.vdebug("Files BEFORE:")
126
        self.get_node(pnode).print_files()
127
        
128
        imagefile = self.deployment.resolve_to_file(lease_id, vnode, diskimage_id)
129
        img = DiskImageFile(imagefile, imagesize, lease_id, vnode, diskimage_id)
130
        self.get_node(pnode).add_file(img)
131

    
132
        self.logger.vdebug("Files AFTER:")
133
        self.get_node(pnode).print_files()
134
        
135
        get_accounting().append_stat(constants.COUNTER_DISKUSAGE, self.get_max_disk_usage())
136
        return img
137
            
138
    def remove_diskimage(self, pnode, lease, vnode):
139
        node = self.get_node(pnode)
140
        node.print_files()
141

    
142
        self.logger.debug("Removing disk image for L%iV%i in node %i" % (lease, vnode, pnode))
143
        node.remove_diskimage(lease, vnode)
144

    
145
        node.print_files()
146
        
147
        get_accounting().append_stat(constants.COUNTER_DISKUSAGE, self.get_max_disk_usage())    
148
        
149
    def add_ramfile(self, pnode, lease_id, vnode, size):
150
        node = self.get_node(pnode)
151
        self.logger.debug("Adding RAM file for L%iV%i in node %i" % (lease_id, vnode, pnode))
152
        node.print_files()
153
        f = RAMImageFile("RAM_L%iV%i" % (lease_id, vnode), size, lease_id, vnode)
154
        node.add_file(f)        
155
        node.print_files()
156
        get_accounting().append_stat(constants.COUNTER_DISKUSAGE, self.get_max_disk_usage())
157

    
158
    def remove_ramfile(self, pnode, lease_id, vnode):
159
        node = self.get_node(pnode)
160
        self.logger.debug("Removing RAM file for L%iV%i in node %i" % (lease_id, vnode, pnode))
161
        node.print_files()
162
        node.remove_ramfile(lease_id, vnode)
163
        node.print_files()
164
        get_accounting().append_stat(constants.COUNTER_DISKUSAGE, self.get_max_disk_usage())
165
        
166
    def get_max_disk_usage(self):
167
        return max([n.get_disk_usage() for n in self.nodes.values()])
168
    
169
class ResourcePoolNode(object):
170
    def __init__(self, node_id, hostname, capacity):
171
        self.logger = logging.getLogger("RESOURCEPOOL")
172
        self.id = node_id
173
        self.hostname = hostname
174
        self.capacity = capacity
175
        self.files = []
176

    
177
        # enactment-specific information
178
        self.enactment_info = None
179
        
180
    def get_capacity(self):
181
        return self.capacity
182
           
183
    def add_file(self, f):
184
        self.files.append(f)
185
        
186
    def get_diskimage(self, lease_id, vnode, diskimage_id):
187
        image = [f for f in self.files if isinstance(f, DiskImageFile) and 
188
                 f.diskimage_id == diskimage_id and 
189
                 f.lease_id == lease_id and
190
                 f.vnode == vnode]
191
        if len(image) == 0:
192
            return None
193
        elif len(image) == 1:
194
            return image[0]
195
        elif len(image) > 1:
196
            self.logger.warning("More than one tainted image for L%iV%i on node %i" % (lease_id, vnode, self.nod_id))
197
            return image[0]
198

    
199
    def remove_diskimage(self, lease_id, vnode):
200
        image = [f for f in self.files if isinstance(f, DiskImageFile) and 
201
                 f.lease_id == lease_id and
202
                 f.vnode == vnode]
203
        if len(image) > 0:
204
            image = image[0]
205
            self.files.remove(image)
206
            
207
    def remove_ramfile(self, lease_id, vnode):
208
        ramfile = [f for f in self.files if isinstance(f, RAMImageFile) and f.lease_id==lease_id and f.vnode==vnode]
209
        if len(ramfile) > 0:
210
            ramfile = ramfile[0]
211
            self.files.remove(ramfile)
212
                
213
        
214
    def get_disk_usage(self):
215
        return sum([f.filesize for f in self.files])
216

    
217

    
218
    def get_diskimages(self):
219
        return [f for f in self.files if isinstance(f, DiskImageFile)]
220
        
221
    def print_files(self):
222
        images = ""
223
        if len(self.files) > 0:
224
            images = ", ".join([str(img) for img in self.files])
225
        self.logger.vdebug("Node %i files: %iMB %s" % (self.id, self.get_disk_usage(), images))
226

    
227
    def xmlrpc_marshall(self):
228
        # Convert to something we can send through XMLRPC
229
        h = {}
230
        h["id"] = self.id
231
        h["hostname"] = self.hostname
232
        h["cpu"] = self.capacity.get_quantity(constants.RES_CPU)
233
        h["mem"] = self.capacity.get_quantity(constants.RES_MEM)
234
                
235
        return h
236
        
237

    
238
        
239
class File(object):
240
    def __init__(self, filename, filesize):
241
        self.filename = filename
242
        self.filesize = filesize
243
        
244
class DiskImageFile(File):
245
    def __init__(self, filename, filesize, lease_id, vnode, diskimage_id):
246
        File.__init__(self, filename, filesize)
247
        self.lease_id = lease_id
248
        self.vnode = vnode
249
        self.diskimage_id = diskimage_id
250
                
251
    def __str__(self):
252
        return "(DISK L%iv%i %s %s)" % (self.lease_id, self.vnode, self.diskimage_id, self.filename)
253

    
254

    
255
class RAMImageFile(File):
256
    def __init__(self, filename, filesize, lease_id, vnode):
257
        File.__init__(self, filename, filesize)
258
        self.lease_id = lease_id
259
        self.vnode = vnode
260
                
261
    def __str__(self):
262
        return "(RAM L%iv%i %s)" % (self.lease_id, self.vnode, self.filename)
263
    
264
class ResourcePoolWithReusableImages(ResourcePool):
265
    def __init__(self, info_enact, vm_enact, deploy_enact):
266
        ResourcePool.__init__(self, info_enact, vm_enact, deploy_enact)
267
        
268
        self.nodes = dict([(id,ResourcePoolNodeWithReusableImages.from_node(node)) for id, node in self.nodes.items()])
269
    
270
    def add_reusable_image(self, pnode, diskimage_id, imagesize, mappings, timeout):
271
        self.logger.debug("Adding reusable image for %s in pnode=%i" % (mappings, pnode))
272
        
273
        self.logger.vdebug("Files BEFORE:")
274
        self.get_node(pnode).print_files()
275
        
276
        imagefile = "reusable-%s" % diskimage_id
277
        img = ReusableDiskImageFile(imagefile, imagesize, diskimage_id, timeout)
278
        for (lease_id, vnode) in mappings:
279
            img.add_mapping(lease_id, vnode)
280

    
281
        self.get_node(pnode).add_reusable_image(img)
282

    
283
        self.logger.vdebug("Files AFTER:")
284
        self.get_node(pnode).print_files()
285
        
286
        get_accounting().append_stat(constants.COUNTER_DISKUSAGE, self.get_max_disk_usage())
287
        return img
288
    
289
    def add_mapping_to_existing_reusable_image(self, pnode_id, diskimage_id, lease_id, vnode, timeout):
290
        self.get_node(pnode_id).add_mapping_to_existing_reusable_image(diskimage_id, lease_id, vnode, timeout)
291
    
292
    def remove_diskimage(self, pnode_id, lease, vnode):
293
        ResourcePool.remove_diskimage(self, pnode_id, lease, vnode)
294
        self.logger.debug("Removing cached images for L%iV%i in node %i" % (lease, vnode, pnode_id))
295
        for img in self.get_node(pnode_id).get_reusable_images():
296
            if (lease, vnode) in img.mappings:
297
                img.mappings.remove((lease, vnode))
298
            self.get_node(pnode_id).print_files()
299
            # Keep image around, even if it isn't going to be used
300
            # by any VMs. It might be reused later on.
301
            # It will be purged if space has to be made available
302
            # for other images
303
        
304
    def get_nodes_with_reusable_image(self, diskimage_id, after = None):
305
        return [n.id for n in self.get_nodes() if n.exists_reusable_image(diskimage_id, after=after)]
306

    
307
    def exists_reusable_image(self, pnode_id, diskimage_id, after):
308
        return self.get_node(pnode_id).exists_reusable_image(diskimage_id, after = after)
309
    
310
    
311
class ResourcePoolNodeWithReusableImages(ResourcePoolNode):
312
    def __init__(self, node_id, hostname, capacity):
313
        ResourcePoolNode.__init__(self, node_id, hostname, capacity)
314
        self.reusable_images = []
315

    
316
    @classmethod
317
    def from_node(cls, n):
318
        node = cls(n.id, n.hostname, n.capacity)
319
        node.enactment_info = n.enactment_info
320
        return node
321
    
322
    def add_reusable_image(self, f):
323
        self.reusable_images.append(f)
324

    
325
    def add_mapping_to_existing_reusable_image(self, diskimage_id, lease_id, vnode, timeout):
326
        for f in self.reusable_images:
327
            if f.diskimage_id == diskimage_id:
328
                f.add_mapping(lease_id, vnode)
329
                f.update_timeout(timeout)
330
                break  # Ugh
331
        self.print_files()
332
            
333
    def get_reusable_image(self, diskimage_id, after = None, lease_id=None, vnode=None):
334
        images = [i for i in self.reusable_images if i.diskimage_id == diskimage_id]
335
        if after != None:
336
            images = [i for i in images if i.timeout >= after]
337
        if lease_id != None and vnode != None:
338
            images = [i for i in images if i.has_mapping(lease_id, vnode)]
339
        if len(images)>0:
340
            return images[0]
341
        else:
342
            return None
343
        
344
    def exists_reusable_image(self, imagefile, after = None, lease_id=None, vnode=None):
345
        entry = self.get_reusable_image(imagefile, after = after, lease_id=lease_id, vnode=vnode)
346
        if entry == None:
347
            return False
348
        else:
349
            return True
350

    
351
    def get_reusable_images(self):
352
        return self.reusable_images
353

    
354
    def get_reusable_images_size(self):
355
        return sum([f.filesize for f in self.reusable_images])
356
    
357
    def purge_oldest_unused_image(self):
358
        unused = [img for img in self.reusable_images if not img.has_mappings()]
359
        if len(unused) == 0:
360
            return 0
361
        else:
362
            i = iter(unused)
363
            oldest = i.next()
364
            for img in i:
365
                if img.timeout < oldest.timeout:
366
                    oldest = img
367
            self.reusable_images.remove(oldest)
368
            return 1
369
    
370
    def purge_downto(self, target):
371
        done = False
372
        while not done:
373
            removed = self.purge_oldest_unused_image()
374
            if removed==0:
375
                done = True
376
                success = False
377
            elif removed == 1:
378
                if self.get_reusable_images_size() <= target:
379
                    done = True
380
                    success = True
381
        return success
382

    
383
    def print_files(self):
384
        ResourcePoolNode.print_files(self)
385
        images = ""
386
        if len(self.reusable_images) > 0:
387
            images = ", ".join([str(img) for img in self.reusable_images])
388
        self.logger.vdebug("Node %i reusable images: %iMB %s" % (self.id, self.get_reusable_images_size(), images))
389

    
390
class ReusableDiskImageFile(File):
391
    def __init__(self, filename, filesize, diskimage_id, timeout):
392
        File.__init__(self, filename, filesize)
393
        self.diskimage_id = diskimage_id
394
        self.mappings = set([])
395
        self.timeout = timeout
396
        
397
    def add_mapping(self, lease_id, vnode):
398
        self.mappings.add((lease_id, vnode))
399
        
400
    def has_mapping(self, lease_id, vnode):
401
        return (lease_id, vnode) in self.mappings
402
    
403
    def has_mappings(self):
404
        return len(self.mappings) > 0
405
        
406
    def update_timeout(self, timeout):
407
        if timeout > self.timeout:
408
            self.timeout = timeout
409
        
410
    def is_expired(self, curTime):
411
        if self.timeout == None:
412
            return False
413
        elif self.timeout > curTime:
414
            return True
415
        else:
416
            return False
417
        
418
    def __str__(self):
419
        if self.timeout == None:
420
            timeout = "NOTIMEOUT"
421
        else:
422
            timeout = self.timeout
423
        return "(REUSABLE %s %s %s %s)" % (vnodemapstr(self.mappings), self.diskimage_id, str(timeout), self.filename)
424