Project

General

Profile

root / branches / 1.1 / src / haizea / core / scheduler / resourcepool.py @ 842

1 632 borja
# -------------------------------------------------------------------------- #
2 641 borja
# Copyright 2006-2009, University of Chicago                                 #
3
# Copyright 2008-2009, Distributed Systems Architecture Group, Universidad   #
4 632 borja
# Complutense de Madrid (dsa-research.org)                                   #
5
#                                                                            #
6
# Licensed under the Apache License, Version 2.0 (the "License"); you may    #
7
# not use this file except in compliance with the License. You may obtain    #
8
# a copy of the License at                                                   #
9
#                                                                            #
10
# http://www.apache.org/licenses/LICENSE-2.0                                 #
11
#                                                                            #
12
# Unless required by applicable law or agreed to in writing, software        #
13
# distributed under the License is distributed on an "AS IS" BASIS,          #
14
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.   #
15
# See the License for the specific language governing permissions and        #
16
# limitations under the License.                                             #
17
# -------------------------------------------------------------------------- #
18
19 647 borja
from haizea.common.utils import vnodemapstr
20 632 borja
import haizea.common.constants as constants
21
import haizea.core.enact.actions as actions
22
from haizea.core.scheduler import EnactmentError
23 800 borja
from haizea.core.leases import UnmanagedSoftwareEnvironment, DiskImageSoftwareEnvironment
24 632 borja
import logging
25
26
27
class ResourcePool(object):
28
    def __init__(self, info_enact, vm_enact, deploy_enact):
29
        self.logger = logging.getLogger("RPOOL")
30
31
        self.info = info_enact
32
        self.vm = vm_enact
33
        # TODO: Ideally, deployment enactment shouldn't be here, specially since
34
        # it already "hangs" below the deployment modules. For now,
35
        # it does no harm, though.
36
        self.deployment = deploy_enact
37
38
        self.nodes = self.info.get_nodes()
39
40
    def start_vms(self, lease, rr):
41
        start_action = actions.VMEnactmentStartAction()
42
        start_action.from_rr(rr)
43
44
        for (vnode, pnode) in rr.nodes.items():
45
            node = self.get_node(pnode)
46
            #diskimage = node.get_diskimage(lease.id, vnode, lease.diskimage_id)
47
            start_action.vnodes[vnode].pnode = node.enactment_info
48
            #start_action.vnodes[vnode].diskimage = diskimage.filename
49
            start_action.vnodes[vnode].resources = rr.resources_in_pnode[pnode]
50
51
        try:
52
            self.vm.start(start_action)
53
        except EnactmentError, exc:
54
            self.logger.error("Enactment of start VM failed: %s" % exc.message)
55
            raise
56
57
    def stop_vms(self, lease, rr):
58
        stop_action = actions.VMEnactmentStopAction()
59
        stop_action.from_rr(rr)
60
        try:
61
            self.vm.stop(stop_action)
62
        except EnactmentError, exc:
63
            self.logger.error("Enactment of end VM failed: %s" % exc.message)
64
            raise
65 800 borja
66
    def verify_deploy(self, lease, rr):
67 834 borja
        deployed = False
68 800 borja
        if isinstance(lease.software, UnmanagedSoftwareEnvironment):
69 834 borja
            deployed = True
70 800 borja
        elif isinstance(lease.software, DiskImageSoftwareEnvironment):
71 834 borja
            missing = False
72 800 borja
            for (vnode, pnode) in rr.nodes.items():
73 831 borja
                img = self.get_node(pnode).get_diskimage(lease, vnode, lease.software.image_id)
74 800 borja
                if img == None:
75 834 borja
                    self.logger.error("L%iV%i is not deployed in node %i" % (lease.id, vnode, pnode))
76
                    missing = True
77
            if not missing: deployed = True
78
        return deployed
79 632 borja
80
    def suspend_vms(self, lease, rr):
81
        # Add memory image files
82
        for vnode in rr.vnodes:
83
            pnode = rr.vmrr.nodes[vnode]
84 831 borja
            self.add_ramfile(pnode, lease, vnode, lease.requested_resources[vnode].get_quantity(constants.RES_MEM))
85 632 borja
86
        # Enact suspend
87
        suspend_action = actions.VMEnactmentSuspendAction()
88
        suspend_action.from_rr(rr)
89
        try:
90
            self.vm.suspend(suspend_action)
91
        except EnactmentError, exc:
92
            self.logger.error("Enactment of suspend VM failed: %s" % exc.message)
93
            raise
94
95
    def verify_suspend(self, lease, rr):
96
        verify_suspend_action = actions.VMEnactmentConfirmSuspendAction()
97
        verify_suspend_action.from_rr(rr)
98
        self.vm.verify_suspend(verify_suspend_action)
99
100
    def resume_vms(self, lease, rr):
101
        # Remove memory image files
102
        for vnode in rr.vnodes:
103
            pnode = rr.vmrr.nodes[vnode]
104 831 borja
            self.remove_ramfile(pnode, lease, vnode)
105 632 borja
106
        # Enact resume
107
        resume_action = actions.VMEnactmentResumeAction()
108
        resume_action.from_rr(rr)
109
        try:
110
            self.vm.resume(resume_action)
111
        except EnactmentError, exc:
112
            self.logger.error("Enactment of resume VM failed: %s" % exc.message)
113
            raise
114
115
    def verify_resume(self, lease, rr):
116
        verify_resume_action = actions.VMEnactmentConfirmResumeAction()
117
        verify_resume_action.from_rr(rr)
118
        self.vm.verify_resume(verify_resume_action)
119
120 676 borja
    def refresh_nodes(self):
121
        new_nodes = self.info.refresh()
122
        for node in new_nodes:
123
            self.nodes[node.id] = node
124
        return new_nodes
125
126 632 borja
    def get_nodes(self):
127
        return self.nodes.values()
128
129
    # An auxiliary node is a host whose resources are going to be scheduled, but
130
    # where no VMs are actually going to run. For example, a disk image repository node.
131
    def get_aux_nodes(self):
132
        # TODO: We're only asking the deployment enactment module for auxiliary nodes.
133
        # There might be a scenario where the info enactment module also reports
134
        # auxiliary nodes.
135
        return self.deployment.get_aux_nodes()
136
137
    def get_num_nodes(self):
138
        return len(self.nodes)
139
140
    def get_node(self, node_id):
141
        return self.nodes[node_id]
142
143 831 borja
    def add_diskimage(self, pnode, diskimage_id, imagesize, lease, vnode):
144
        self.logger.debug("Adding disk image for L%iV%i in pnode=%i" % (lease.id, vnode, pnode))
145 632 borja
146
        self.logger.vdebug("Files BEFORE:")
147
        self.get_node(pnode).print_files()
148
149 831 borja
        imagefile = self.deployment.resolve_to_file(lease, vnode, diskimage_id)
150
        img = DiskImageFile(imagefile, imagesize, lease, vnode, diskimage_id)
151 632 borja
        self.get_node(pnode).add_file(img)
152
153
        self.logger.vdebug("Files AFTER:")
154
        self.get_node(pnode).print_files()
155
156
        return img
157
158
    def remove_diskimage(self, pnode, lease, vnode):
159
        node = self.get_node(pnode)
160
        node.print_files()
161 831 borja
        self.logger.debug("Removing disk image for L%iV%i in node %i" % (lease.id, vnode, pnode))
162 632 borja
        node.remove_diskimage(lease, vnode)
163
164
        node.print_files()
165 647 borja
166 831 borja
    def add_ramfile(self, pnode, lease, vnode, size):
167 632 borja
        node = self.get_node(pnode)
168 831 borja
        self.logger.debug("Adding RAM file for L%iV%i in node %i" % (lease.id, vnode, pnode))
169 632 borja
        node.print_files()
170 831 borja
        f = RAMImageFile("RAM_L%iV%i" % (lease.id, vnode), size, lease, vnode)
171 632 borja
        node.add_file(f)
172
        node.print_files()
173
174 831 borja
    def remove_ramfile(self, pnode, lease, vnode):
175 632 borja
        node = self.get_node(pnode)
176 831 borja
        self.logger.debug("Removing RAM file for L%iV%i in node %i" % (lease.id, vnode, pnode))
177 632 borja
        node.print_files()
178 831 borja
        node.remove_ramfile(lease, vnode)
179 632 borja
        node.print_files()
180 834 borja
181
    def get_disk_image_mappings(self, lease):
182
        vnode_map = {}
183
        for n in self.nodes.values():
184
            disk_images = [img for img in n.get_diskimages() if img.lease == lease]
185
            for img in disk_images:
186
                vnode_map[img.vnode] = n.id
187
        return vnode_map
188
189
    def get_ram_image_mappings(self, lease):
190
        vnode_map = {}
191
        for n in self.nodes.values():
192
            disk_images = [img for img in n.get_ramimages() if img.lease == lease]
193
            for img in disk_images:
194
                vnode_map[img.vnode] = n.id
195
        return vnode_map
196
197 632 borja
    def get_max_disk_usage(self):
198
        return max([n.get_disk_usage() for n in self.nodes.values()])
199
200
class ResourcePoolNode(object):
201
    def __init__(self, node_id, hostname, capacity):
202
        self.logger = logging.getLogger("RESOURCEPOOL")
203
        self.id = node_id
204
        self.hostname = hostname
205
        self.capacity = capacity
206
        self.files = []
207
208
        # enactment-specific information
209
        self.enactment_info = None
210
211
    def get_capacity(self):
212
        return self.capacity
213
214
    def add_file(self, f):
215
        self.files.append(f)
216
217 831 borja
    def get_diskimage(self, lease, vnode, diskimage_id):
218 632 borja
        image = [f for f in self.files if isinstance(f, DiskImageFile) and
219
                 f.diskimage_id == diskimage_id and
220 831 borja
                 f.lease == lease and
221 632 borja
                 f.vnode == vnode]
222
        if len(image) == 0:
223
            return None
224
        elif len(image) == 1:
225
            return image[0]
226
        elif len(image) > 1:
227 831 borja
            self.logger.warning("More than one tainted image for L%iV%i on node %i" % (lease.id, vnode, self.id))
228 632 borja
            return image[0]
229
230 831 borja
    def remove_diskimage(self, lease, vnode):
231 632 borja
        image = [f for f in self.files if isinstance(f, DiskImageFile) and
232 831 borja
                 f.lease == lease and
233 632 borja
                 f.vnode == vnode]
234
        if len(image) > 0:
235
            image = image[0]
236
            self.files.remove(image)
237
238 831 borja
    def remove_ramfile(self, lease, vnode):
239
        ramfile = [f for f in self.files if isinstance(f, RAMImageFile) and f.lease==lease and f.vnode==vnode]
240 632 borja
        if len(ramfile) > 0:
241
            ramfile = ramfile[0]
242
            self.files.remove(ramfile)
243
244
245
    def get_disk_usage(self):
246
        return sum([f.filesize for f in self.files])
247
248
249
    def get_diskimages(self):
250
        return [f for f in self.files if isinstance(f, DiskImageFile)]
251 834 borja
252
    def get_ramimages(self):
253
        return [f for f in self.files if isinstance(f, RAMImageFile)]
254 632 borja
255
    def print_files(self):
256
        images = ""
257
        if len(self.files) > 0:
258
            images = ", ".join([str(img) for img in self.files])
259
        self.logger.vdebug("Node %i files: %iMB %s" % (self.id, self.get_disk_usage(), images))
260
261
    def xmlrpc_marshall(self):
262
        # Convert to something we can send through XMLRPC
263
        h = {}
264
        h["id"] = self.id
265
        h["hostname"] = self.hostname
266 842 borja
        h["cpu"] = sum(self.capacity.quantity[constants.RES_CPU])
267 632 borja
        h["mem"] = self.capacity.get_quantity(constants.RES_MEM)
268
269
        return h
270
271
272
273
class File(object):
274
    def __init__(self, filename, filesize):
275
        self.filename = filename
276
        self.filesize = filesize
277
278
class DiskImageFile(File):
279 831 borja
    def __init__(self, filename, filesize, lease, vnode, diskimage_id):
280 632 borja
        File.__init__(self, filename, filesize)
281 831 borja
        self.lease = lease
282 632 borja
        self.vnode = vnode
283
        self.diskimage_id = diskimage_id
284
285
    def __str__(self):
286 831 borja
        return "(DISK L%iv%i %s %s)" % (self.lease.id, self.vnode, self.diskimage_id, self.filename)
287 632 borja
288
289
class RAMImageFile(File):
290 831 borja
    def __init__(self, filename, filesize, lease, vnode):
291 632 borja
        File.__init__(self, filename, filesize)
292 831 borja
        self.lease = lease
293 632 borja
        self.vnode = vnode
294
295
    def __str__(self):
296 831 borja
        return "(RAM L%iv%i %s)" % (self.lease.id, self.vnode, self.filename)
297 632 borja
298
class ResourcePoolWithReusableImages(ResourcePool):
299
    def __init__(self, info_enact, vm_enact, deploy_enact):
300
        ResourcePool.__init__(self, info_enact, vm_enact, deploy_enact)
301
302
        self.nodes = dict([(id,ResourcePoolNodeWithReusableImages.from_node(node)) for id, node in self.nodes.items()])
303
304
    def add_reusable_image(self, pnode, diskimage_id, imagesize, mappings, timeout):
305
        self.logger.debug("Adding reusable image for %s in pnode=%i" % (mappings, pnode))
306
307
        self.logger.vdebug("Files BEFORE:")
308
        self.get_node(pnode).print_files()
309
310
        imagefile = "reusable-%s" % diskimage_id
311
        img = ReusableDiskImageFile(imagefile, imagesize, diskimage_id, timeout)
312 831 borja
        for (lease, vnode) in mappings:
313
            img.add_mapping(lease, vnode)
314 632 borja
315
        self.get_node(pnode).add_reusable_image(img)
316
317
        self.logger.vdebug("Files AFTER:")
318
        self.get_node(pnode).print_files()
319
320
        return img
321
322 831 borja
    def add_mapping_to_existing_reusable_image(self, pnode_id, diskimage_id, lease, vnode, timeout):
323
        self.get_node(pnode_id).add_mapping_to_existing_reusable_image(diskimage_id, lease, vnode, timeout)
324 632 borja
325
    def remove_diskimage(self, pnode_id, lease, vnode):
326
        ResourcePool.remove_diskimage(self, pnode_id, lease, vnode)
327 831 borja
        self.logger.debug("Removing cached images for L%iV%i in node %i" % (lease.id, vnode, pnode_id))
328 632 borja
        for img in self.get_node(pnode_id).get_reusable_images():
329
            if (lease, vnode) in img.mappings:
330
                img.mappings.remove((lease, vnode))
331
            self.get_node(pnode_id).print_files()
332
            # Keep image around, even if it isn't going to be used
333
            # by any VMs. It might be reused later on.
334
            # It will be purged if space has to be made available
335
            # for other images
336
337
    def get_nodes_with_reusable_image(self, diskimage_id, after = None):
338
        return [n.id for n in self.get_nodes() if n.exists_reusable_image(diskimage_id, after=after)]
339
340
    def exists_reusable_image(self, pnode_id, diskimage_id, after):
341
        return self.get_node(pnode_id).exists_reusable_image(diskimage_id, after = after)
342
343
344
class ResourcePoolNodeWithReusableImages(ResourcePoolNode):
345
    def __init__(self, node_id, hostname, capacity):
346
        ResourcePoolNode.__init__(self, node_id, hostname, capacity)
347
        self.reusable_images = []
348
349
    @classmethod
350
    def from_node(cls, n):
351
        node = cls(n.id, n.hostname, n.capacity)
352
        node.enactment_info = n.enactment_info
353
        return node
354
355
    def add_reusable_image(self, f):
356
        self.reusable_images.append(f)
357
358 831 borja
    def add_mapping_to_existing_reusable_image(self, diskimage_id, lease, vnode, timeout):
359 632 borja
        for f in self.reusable_images:
360
            if f.diskimage_id == diskimage_id:
361 831 borja
                f.add_mapping(lease, vnode)
362 632 borja
                f.update_timeout(timeout)
363
                break  # Ugh
364
        self.print_files()
365
366 831 borja
    def get_reusable_image(self, diskimage_id, after = None, lease=None, vnode=None):
367 632 borja
        images = [i for i in self.reusable_images if i.diskimage_id == diskimage_id]
368
        if after != None:
369
            images = [i for i in images if i.timeout >= after]
370 831 borja
        if lease != None and vnode != None:
371
            images = [i for i in images if i.has_mapping(lease, vnode)]
372 632 borja
        if len(images)>0:
373
            return images[0]
374
        else:
375
            return None
376
377 831 borja
    def exists_reusable_image(self, imagefile, after = None, lease=None, vnode=None):
378
        entry = self.get_reusable_image(imagefile, after = after, lease=lease, vnode=vnode)
379 632 borja
        if entry == None:
380
            return False
381
        else:
382
            return True
383
384
    def get_reusable_images(self):
385
        return self.reusable_images
386
387
    def get_reusable_images_size(self):
388
        return sum([f.filesize for f in self.reusable_images])
389
390
    def purge_oldest_unused_image(self):
391
        unused = [img for img in self.reusable_images if not img.has_mappings()]
392
        if len(unused) == 0:
393
            return 0
394
        else:
395
            i = iter(unused)
396
            oldest = i.next()
397
            for img in i:
398
                if img.timeout < oldest.timeout:
399
                    oldest = img
400
            self.reusable_images.remove(oldest)
401
            return 1
402
403
    def purge_downto(self, target):
404
        done = False
405
        while not done:
406
            removed = self.purge_oldest_unused_image()
407
            if removed==0:
408
                done = True
409
                success = False
410
            elif removed == 1:
411
                if self.get_reusable_images_size() <= target:
412
                    done = True
413
                    success = True
414
        return success
415
416
    def print_files(self):
417
        ResourcePoolNode.print_files(self)
418
        images = ""
419
        if len(self.reusable_images) > 0:
420
            images = ", ".join([str(img) for img in self.reusable_images])
421
        self.logger.vdebug("Node %i reusable images: %iMB %s" % (self.id, self.get_reusable_images_size(), images))
422
423
class ReusableDiskImageFile(File):
424
    def __init__(self, filename, filesize, diskimage_id, timeout):
425
        File.__init__(self, filename, filesize)
426
        self.diskimage_id = diskimage_id
427
        self.mappings = set([])
428
        self.timeout = timeout
429
430 831 borja
    def add_mapping(self, lease, vnode):
431
        self.mappings.add((lease, vnode))
432 632 borja
433 831 borja
    def has_mapping(self, lease, vnode):
434
        return (lease, vnode) in self.mappings
435 632 borja
436
    def has_mappings(self):
437
        return len(self.mappings) > 0
438
439
    def update_timeout(self, timeout):
440
        if timeout > self.timeout:
441
            self.timeout = timeout
442
443
    def is_expired(self, curTime):
444
        if self.timeout == None:
445
            return False
446
        elif self.timeout > curTime:
447
            return True
448
        else:
449
            return False
450
451
    def __str__(self):
452
        if self.timeout == None:
453
            timeout = "NOTIMEOUT"
454
        else:
455
            timeout = self.timeout
456
        return "(REUSABLE %s %s %s %s)" % (vnodemapstr(self.mappings), self.diskimage_id, str(timeout), self.filename)