Project

General

Profile

root / branches / 1.1 / src / haizea / core / scheduler / resourcepool.py @ 842

1
# -------------------------------------------------------------------------- #
2
# Copyright 2006-2009, University of Chicago                                 #
3
# Copyright 2008-2009, Distributed Systems Architecture Group, Universidad   #
4
# Complutense de Madrid (dsa-research.org)                                   #
5
#                                                                            #
6
# Licensed under the Apache License, Version 2.0 (the "License"); you may    #
7
# not use this file except in compliance with the License. You may obtain    #
8
# a copy of the License at                                                   #
9
#                                                                            #
10
# http://www.apache.org/licenses/LICENSE-2.0                                 #
11
#                                                                            #
12
# Unless required by applicable law or agreed to in writing, software        #
13
# distributed under the License is distributed on an "AS IS" BASIS,          #
14
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.   #
15
# See the License for the specific language governing permissions and        #
16
# limitations under the License.                                             #
17
# -------------------------------------------------------------------------- #
18

    
19
from haizea.common.utils import vnodemapstr
20
import haizea.common.constants as constants
21
import haizea.core.enact.actions as actions
22
from haizea.core.scheduler import EnactmentError
23
from haizea.core.leases import UnmanagedSoftwareEnvironment, DiskImageSoftwareEnvironment
24
import logging 
25

    
26

    
27
class ResourcePool(object):
28
    def __init__(self, info_enact, vm_enact, deploy_enact):
29
        self.logger = logging.getLogger("RPOOL")
30
                
31
        self.info = info_enact
32
        self.vm = vm_enact
33
        # TODO: Ideally, deployment enactment shouldn't be here, specially since
34
        # it already "hangs" below the deployment modules. For now,
35
        # it does no harm, though.
36
        self.deployment = deploy_enact
37
        
38
        self.nodes = self.info.get_nodes()
39
        
40
    def start_vms(self, lease, rr):
41
        start_action = actions.VMEnactmentStartAction()
42
        start_action.from_rr(rr)
43
                
44
        for (vnode, pnode) in rr.nodes.items():
45
            node = self.get_node(pnode)
46
            #diskimage = node.get_diskimage(lease.id, vnode, lease.diskimage_id)
47
            start_action.vnodes[vnode].pnode = node.enactment_info
48
            #start_action.vnodes[vnode].diskimage = diskimage.filename
49
            start_action.vnodes[vnode].resources = rr.resources_in_pnode[pnode]
50

    
51
        try:
52
            self.vm.start(start_action)
53
        except EnactmentError, exc:
54
            self.logger.error("Enactment of start VM failed: %s" % exc.message)
55
            raise
56
        
57
    def stop_vms(self, lease, rr):
58
        stop_action = actions.VMEnactmentStopAction()
59
        stop_action.from_rr(rr)
60
        try:
61
            self.vm.stop(stop_action)
62
        except EnactmentError, exc:
63
            self.logger.error("Enactment of end VM failed: %s" % exc.message)
64
            raise
65

    
66
    def verify_deploy(self, lease, rr):
67
        deployed = False
68
        if isinstance(lease.software, UnmanagedSoftwareEnvironment):
69
            deployed = True
70
        elif isinstance(lease.software, DiskImageSoftwareEnvironment):
71
            missing = False
72
            for (vnode, pnode) in rr.nodes.items():
73
                img = self.get_node(pnode).get_diskimage(lease, vnode, lease.software.image_id)
74
                if img == None:
75
                    self.logger.error("L%iV%i is not deployed in node %i" % (lease.id, vnode, pnode))
76
                    missing = True
77
            if not missing: deployed = True
78
        return deployed
79
         
80
    def suspend_vms(self, lease, rr):
81
        # Add memory image files
82
        for vnode in rr.vnodes:
83
            pnode = rr.vmrr.nodes[vnode]
84
            self.add_ramfile(pnode, lease, vnode, lease.requested_resources[vnode].get_quantity(constants.RES_MEM))
85

    
86
        # Enact suspend
87
        suspend_action = actions.VMEnactmentSuspendAction()
88
        suspend_action.from_rr(rr)
89
        try:
90
            self.vm.suspend(suspend_action)
91
        except EnactmentError, exc:
92
            self.logger.error("Enactment of suspend VM failed: %s" % exc.message)
93
            raise
94
    
95
    def verify_suspend(self, lease, rr):
96
        verify_suspend_action = actions.VMEnactmentConfirmSuspendAction()
97
        verify_suspend_action.from_rr(rr)
98
        self.vm.verify_suspend(verify_suspend_action)
99
    
100
    def resume_vms(self, lease, rr):
101
        # Remove memory image files
102
        for vnode in rr.vnodes:
103
            pnode = rr.vmrr.nodes[vnode]
104
            self.remove_ramfile(pnode, lease, vnode)
105

    
106
        # Enact resume
107
        resume_action = actions.VMEnactmentResumeAction()
108
        resume_action.from_rr(rr)
109
        try:
110
            self.vm.resume(resume_action)
111
        except EnactmentError, exc:
112
            self.logger.error("Enactment of resume VM failed: %s" % exc.message)
113
            raise
114
    
115
    def verify_resume(self, lease, rr):
116
        verify_resume_action = actions.VMEnactmentConfirmResumeAction()
117
        verify_resume_action.from_rr(rr)
118
        self.vm.verify_resume(verify_resume_action)    
119
    
120
    def refresh_nodes(self):
121
        new_nodes = self.info.refresh()
122
        for node in new_nodes:
123
            self.nodes[node.id] = node
124
        return new_nodes        
125
    
126
    def get_nodes(self):
127
        return self.nodes.values()
128
    
129
    # An auxiliary node is a host whose resources are going to be scheduled, but
130
    # where no VMs are actually going to run. For example, a disk image repository node.
131
    def get_aux_nodes(self):
132
        # TODO: We're only asking the deployment enactment module for auxiliary nodes.
133
        # There might be a scenario where the info enactment module also reports
134
        # auxiliary nodes.
135
        return self.deployment.get_aux_nodes()
136

    
137
    def get_num_nodes(self):
138
        return len(self.nodes)
139
        
140
    def get_node(self, node_id):
141
        return self.nodes[node_id]
142
        
143
    def add_diskimage(self, pnode, diskimage_id, imagesize, lease, vnode):
144
        self.logger.debug("Adding disk image for L%iV%i in pnode=%i" % (lease.id, vnode, pnode))
145
        
146
        self.logger.vdebug("Files BEFORE:")
147
        self.get_node(pnode).print_files()
148
        
149
        imagefile = self.deployment.resolve_to_file(lease, vnode, diskimage_id)
150
        img = DiskImageFile(imagefile, imagesize, lease, vnode, diskimage_id)
151
        self.get_node(pnode).add_file(img)
152

    
153
        self.logger.vdebug("Files AFTER:")
154
        self.get_node(pnode).print_files()
155
        
156
        return img
157
            
158
    def remove_diskimage(self, pnode, lease, vnode):
159
        node = self.get_node(pnode)
160
        node.print_files()
161
        self.logger.debug("Removing disk image for L%iV%i in node %i" % (lease.id, vnode, pnode))
162
        node.remove_diskimage(lease, vnode)
163

    
164
        node.print_files()
165
                
166
    def add_ramfile(self, pnode, lease, vnode, size):
167
        node = self.get_node(pnode)
168
        self.logger.debug("Adding RAM file for L%iV%i in node %i" % (lease.id, vnode, pnode))
169
        node.print_files()
170
        f = RAMImageFile("RAM_L%iV%i" % (lease.id, vnode), size, lease, vnode)
171
        node.add_file(f)        
172
        node.print_files()
173

    
174
    def remove_ramfile(self, pnode, lease, vnode):
175
        node = self.get_node(pnode)
176
        self.logger.debug("Removing RAM file for L%iV%i in node %i" % (lease.id, vnode, pnode))
177
        node.print_files()
178
        node.remove_ramfile(lease, vnode)
179
        node.print_files()
180
      
181
    def get_disk_image_mappings(self, lease):
182
        vnode_map = {}
183
        for n in self.nodes.values():
184
            disk_images = [img for img in n.get_diskimages() if img.lease == lease]
185
            for img in disk_images:
186
                vnode_map[img.vnode] = n.id
187
        return vnode_map
188
    
189
    def get_ram_image_mappings(self, lease):
190
        vnode_map = {}
191
        for n in self.nodes.values():
192
            disk_images = [img for img in n.get_ramimages() if img.lease == lease]
193
            for img in disk_images:
194
                vnode_map[img.vnode] = n.id
195
        return vnode_map    
196
            
197
    def get_max_disk_usage(self):
198
        return max([n.get_disk_usage() for n in self.nodes.values()])
199
    
200
class ResourcePoolNode(object):
201
    def __init__(self, node_id, hostname, capacity):
202
        self.logger = logging.getLogger("RESOURCEPOOL")
203
        self.id = node_id
204
        self.hostname = hostname
205
        self.capacity = capacity
206
        self.files = []
207

    
208
        # enactment-specific information
209
        self.enactment_info = None
210
        
211
    def get_capacity(self):
212
        return self.capacity
213
           
214
    def add_file(self, f):
215
        self.files.append(f)
216
        
217
    def get_diskimage(self, lease, vnode, diskimage_id):
218
        image = [f for f in self.files if isinstance(f, DiskImageFile) and 
219
                 f.diskimage_id == diskimage_id and 
220
                 f.lease == lease and
221
                 f.vnode == vnode]
222
        if len(image) == 0:
223
            return None
224
        elif len(image) == 1:
225
            return image[0]
226
        elif len(image) > 1:
227
            self.logger.warning("More than one tainted image for L%iV%i on node %i" % (lease.id, vnode, self.id))
228
            return image[0]
229

    
230
    def remove_diskimage(self, lease, vnode):
231
        image = [f for f in self.files if isinstance(f, DiskImageFile) and 
232
                 f.lease == lease and
233
                 f.vnode == vnode]
234
        if len(image) > 0:
235
            image = image[0]
236
            self.files.remove(image)
237
            
238
    def remove_ramfile(self, lease, vnode):
239
        ramfile = [f for f in self.files if isinstance(f, RAMImageFile) and f.lease==lease and f.vnode==vnode]
240
        if len(ramfile) > 0:
241
            ramfile = ramfile[0]
242
            self.files.remove(ramfile)
243
                
244
        
245
    def get_disk_usage(self):
246
        return sum([f.filesize for f in self.files])
247

    
248

    
249
    def get_diskimages(self):
250
        return [f for f in self.files if isinstance(f, DiskImageFile)]
251

    
252
    def get_ramimages(self):
253
        return [f for f in self.files if isinstance(f, RAMImageFile)]
254
        
255
    def print_files(self):
256
        images = ""
257
        if len(self.files) > 0:
258
            images = ", ".join([str(img) for img in self.files])
259
        self.logger.vdebug("Node %i files: %iMB %s" % (self.id, self.get_disk_usage(), images))
260

    
261
    def xmlrpc_marshall(self):
262
        # Convert to something we can send through XMLRPC
263
        h = {}
264
        h["id"] = self.id
265
        h["hostname"] = self.hostname
266
        h["cpu"] = sum(self.capacity.quantity[constants.RES_CPU])
267
        h["mem"] = self.capacity.get_quantity(constants.RES_MEM)
268
                
269
        return h
270
        
271

    
272
        
273
class File(object):
274
    def __init__(self, filename, filesize):
275
        self.filename = filename
276
        self.filesize = filesize
277
        
278
class DiskImageFile(File):
279
    def __init__(self, filename, filesize, lease, vnode, diskimage_id):
280
        File.__init__(self, filename, filesize)
281
        self.lease = lease
282
        self.vnode = vnode
283
        self.diskimage_id = diskimage_id
284
                
285
    def __str__(self):
286
        return "(DISK L%iv%i %s %s)" % (self.lease.id, self.vnode, self.diskimage_id, self.filename)
287

    
288

    
289
class RAMImageFile(File):
290
    def __init__(self, filename, filesize, lease, vnode):
291
        File.__init__(self, filename, filesize)
292
        self.lease = lease
293
        self.vnode = vnode
294
                
295
    def __str__(self):
296
        return "(RAM L%iv%i %s)" % (self.lease.id, self.vnode, self.filename)
297
    
298
class ResourcePoolWithReusableImages(ResourcePool):
299
    def __init__(self, info_enact, vm_enact, deploy_enact):
300
        ResourcePool.__init__(self, info_enact, vm_enact, deploy_enact)
301
        
302
        self.nodes = dict([(id,ResourcePoolNodeWithReusableImages.from_node(node)) for id, node in self.nodes.items()])
303
    
304
    def add_reusable_image(self, pnode, diskimage_id, imagesize, mappings, timeout):
305
        self.logger.debug("Adding reusable image for %s in pnode=%i" % (mappings, pnode))
306
        
307
        self.logger.vdebug("Files BEFORE:")
308
        self.get_node(pnode).print_files()
309
        
310
        imagefile = "reusable-%s" % diskimage_id
311
        img = ReusableDiskImageFile(imagefile, imagesize, diskimage_id, timeout)
312
        for (lease, vnode) in mappings:
313
            img.add_mapping(lease, vnode)
314

    
315
        self.get_node(pnode).add_reusable_image(img)
316

    
317
        self.logger.vdebug("Files AFTER:")
318
        self.get_node(pnode).print_files()
319
        
320
        return img
321
    
322
    def add_mapping_to_existing_reusable_image(self, pnode_id, diskimage_id, lease, vnode, timeout):
323
        self.get_node(pnode_id).add_mapping_to_existing_reusable_image(diskimage_id, lease, vnode, timeout)
324
    
325
    def remove_diskimage(self, pnode_id, lease, vnode):
326
        ResourcePool.remove_diskimage(self, pnode_id, lease, vnode)
327
        self.logger.debug("Removing cached images for L%iV%i in node %i" % (lease.id, vnode, pnode_id))
328
        for img in self.get_node(pnode_id).get_reusable_images():
329
            if (lease, vnode) in img.mappings:
330
                img.mappings.remove((lease, vnode))
331
            self.get_node(pnode_id).print_files()
332
            # Keep image around, even if it isn't going to be used
333
            # by any VMs. It might be reused later on.
334
            # It will be purged if space has to be made available
335
            # for other images
336
        
337
    def get_nodes_with_reusable_image(self, diskimage_id, after = None):
338
        return [n.id for n in self.get_nodes() if n.exists_reusable_image(diskimage_id, after=after)]
339

    
340
    def exists_reusable_image(self, pnode_id, diskimage_id, after):
341
        return self.get_node(pnode_id).exists_reusable_image(diskimage_id, after = after)
342
    
343
    
344
class ResourcePoolNodeWithReusableImages(ResourcePoolNode):
345
    def __init__(self, node_id, hostname, capacity):
346
        ResourcePoolNode.__init__(self, node_id, hostname, capacity)
347
        self.reusable_images = []
348

    
349
    @classmethod
350
    def from_node(cls, n):
351
        node = cls(n.id, n.hostname, n.capacity)
352
        node.enactment_info = n.enactment_info
353
        return node
354
    
355
    def add_reusable_image(self, f):
356
        self.reusable_images.append(f)
357

    
358
    def add_mapping_to_existing_reusable_image(self, diskimage_id, lease, vnode, timeout):
359
        for f in self.reusable_images:
360
            if f.diskimage_id == diskimage_id:
361
                f.add_mapping(lease, vnode)
362
                f.update_timeout(timeout)
363
                break  # Ugh
364
        self.print_files()
365
            
366
    def get_reusable_image(self, diskimage_id, after = None, lease=None, vnode=None):
367
        images = [i for i in self.reusable_images if i.diskimage_id == diskimage_id]
368
        if after != None:
369
            images = [i for i in images if i.timeout >= after]
370
        if lease != None and vnode != None:
371
            images = [i for i in images if i.has_mapping(lease, vnode)]
372
        if len(images)>0:
373
            return images[0]
374
        else:
375
            return None
376
        
377
    def exists_reusable_image(self, imagefile, after = None, lease=None, vnode=None):
378
        entry = self.get_reusable_image(imagefile, after = after, lease=lease, vnode=vnode)
379
        if entry == None:
380
            return False
381
        else:
382
            return True
383

    
384
    def get_reusable_images(self):
385
        return self.reusable_images
386

    
387
    def get_reusable_images_size(self):
388
        return sum([f.filesize for f in self.reusable_images])
389
    
390
    def purge_oldest_unused_image(self):
391
        unused = [img for img in self.reusable_images if not img.has_mappings()]
392
        if len(unused) == 0:
393
            return 0
394
        else:
395
            i = iter(unused)
396
            oldest = i.next()
397
            for img in i:
398
                if img.timeout < oldest.timeout:
399
                    oldest = img
400
            self.reusable_images.remove(oldest)
401
            return 1
402
    
403
    def purge_downto(self, target):
404
        done = False
405
        while not done:
406
            removed = self.purge_oldest_unused_image()
407
            if removed==0:
408
                done = True
409
                success = False
410
            elif removed == 1:
411
                if self.get_reusable_images_size() <= target:
412
                    done = True
413
                    success = True
414
        return success
415

    
416
    def print_files(self):
417
        ResourcePoolNode.print_files(self)
418
        images = ""
419
        if len(self.reusable_images) > 0:
420
            images = ", ".join([str(img) for img in self.reusable_images])
421
        self.logger.vdebug("Node %i reusable images: %iMB %s" % (self.id, self.get_reusable_images_size(), images))
422

    
423
class ReusableDiskImageFile(File):
424
    def __init__(self, filename, filesize, diskimage_id, timeout):
425
        File.__init__(self, filename, filesize)
426
        self.diskimage_id = diskimage_id
427
        self.mappings = set([])
428
        self.timeout = timeout
429
        
430
    def add_mapping(self, lease, vnode):
431
        self.mappings.add((lease, vnode))
432
        
433
    def has_mapping(self, lease, vnode):
434
        return (lease, vnode) in self.mappings
435
    
436
    def has_mappings(self):
437
        return len(self.mappings) > 0
438
        
439
    def update_timeout(self, timeout):
440
        if timeout > self.timeout:
441
            self.timeout = timeout
442
        
443
    def is_expired(self, curTime):
444
        if self.timeout == None:
445
            return False
446
        elif self.timeout > curTime:
447
            return True
448
        else:
449
            return False
450
        
451
    def __str__(self):
452
        if self.timeout == None:
453
            timeout = "NOTIMEOUT"
454
        else:
455
            timeout = self.timeout
456
        return "(REUSABLE %s %s %s %s)" % (vnodemapstr(self.mappings), self.diskimage_id, str(timeout), self.filename)
457