Project

General

Profile

root / trunk / src / haizea / core / scheduler / resourcepool.py @ 676

1
# -------------------------------------------------------------------------- #
2
# Copyright 2006-2009, University of Chicago                                 #
3
# Copyright 2008-2009, Distributed Systems Architecture Group, Universidad   #
4
# Complutense de Madrid (dsa-research.org)                                   #
5
#                                                                            #
6
# Licensed under the Apache License, Version 2.0 (the "License"); you may    #
7
# not use this file except in compliance with the License. You may obtain    #
8
# a copy of the License at                                                   #
9
#                                                                            #
10
# http://www.apache.org/licenses/LICENSE-2.0                                 #
11
#                                                                            #
12
# Unless required by applicable law or agreed to in writing, software        #
13
# distributed under the License is distributed on an "AS IS" BASIS,          #
14
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.   #
15
# See the License for the specific language governing permissions and        #
16
# limitations under the License.                                             #
17
# -------------------------------------------------------------------------- #
18

    
19
from haizea.common.utils import vnodemapstr
20
import haizea.common.constants as constants
21
import haizea.core.enact.actions as actions
22
from haizea.core.scheduler import EnactmentError
23
import logging 
24

    
25

    
26
class ResourcePool(object):
27
    def __init__(self, info_enact, vm_enact, deploy_enact):
28
        self.logger = logging.getLogger("RPOOL")
29
                
30
        self.info = info_enact
31
        self.vm = vm_enact
32
        # TODO: Ideally, deployment enactment shouldn't be here, specially since
33
        # it already "hangs" below the deployment modules. For now,
34
        # it does no harm, though.
35
        self.deployment = deploy_enact
36
        
37
        self.nodes = self.info.get_nodes()
38
        
39
    def start_vms(self, lease, rr):
40
        start_action = actions.VMEnactmentStartAction()
41
        start_action.from_rr(rr)
42
                
43
        for (vnode, pnode) in rr.nodes.items():
44
            node = self.get_node(pnode)
45
            #diskimage = node.get_diskimage(lease.id, vnode, lease.diskimage_id)
46
            start_action.vnodes[vnode].pnode = node.enactment_info
47
            #start_action.vnodes[vnode].diskimage = diskimage.filename
48
            start_action.vnodes[vnode].resources = rr.resources_in_pnode[pnode]
49

    
50
        try:
51
            self.vm.start(start_action)
52
        except EnactmentError, exc:
53
            self.logger.error("Enactment of start VM failed: %s" % exc.message)
54
            raise
55
        
56
    def stop_vms(self, lease, rr):
57
        stop_action = actions.VMEnactmentStopAction()
58
        stop_action.from_rr(rr)
59
        try:
60
            self.vm.stop(stop_action)
61
        except EnactmentError, exc:
62
            self.logger.error("Enactment of end VM failed: %s" % exc.message)
63
            raise
64
         
65
    def suspend_vms(self, lease, rr):
66
        # Add memory image files
67
        for vnode in rr.vnodes:
68
            pnode = rr.vmrr.nodes[vnode]
69
            self.add_ramfile(pnode, lease.id, vnode, lease.requested_resources[vnode].get_quantity(constants.RES_MEM))
70

    
71
        # Enact suspend
72
        suspend_action = actions.VMEnactmentSuspendAction()
73
        suspend_action.from_rr(rr)
74
        try:
75
            self.vm.suspend(suspend_action)
76
        except EnactmentError, exc:
77
            self.logger.error("Enactment of suspend VM failed: %s" % exc.message)
78
            raise
79
    
80
    def verify_suspend(self, lease, rr):
81
        verify_suspend_action = actions.VMEnactmentConfirmSuspendAction()
82
        verify_suspend_action.from_rr(rr)
83
        self.vm.verify_suspend(verify_suspend_action)
84
    
85
    def resume_vms(self, lease, rr):
86
        # Remove memory image files
87
        for vnode in rr.vnodes:
88
            pnode = rr.vmrr.nodes[vnode]
89
            self.remove_ramfile(pnode, lease.id, vnode)
90

    
91
        # Enact resume
92
        resume_action = actions.VMEnactmentResumeAction()
93
        resume_action.from_rr(rr)
94
        try:
95
            self.vm.resume(resume_action)
96
        except EnactmentError, exc:
97
            self.logger.error("Enactment of resume VM failed: %s" % exc.message)
98
            raise
99
    
100
    def verify_resume(self, lease, rr):
101
        verify_resume_action = actions.VMEnactmentConfirmResumeAction()
102
        verify_resume_action.from_rr(rr)
103
        self.vm.verify_resume(verify_resume_action)    
104
    
105
    def refresh_nodes(self):
106
        new_nodes = self.info.refresh()
107
        for node in new_nodes:
108
            self.nodes[node.id] = node
109
        return new_nodes        
110
    
111
    def get_nodes(self):
112
        return self.nodes.values()
113
    
114
    # An auxiliary node is a host whose resources are going to be scheduled, but
115
    # where no VMs are actually going to run. For example, a disk image repository node.
116
    def get_aux_nodes(self):
117
        # TODO: We're only asking the deployment enactment module for auxiliary nodes.
118
        # There might be a scenario where the info enactment module also reports
119
        # auxiliary nodes.
120
        return self.deployment.get_aux_nodes()
121

    
122
    def get_num_nodes(self):
123
        return len(self.nodes)
124
        
125
    def get_node(self, node_id):
126
        return self.nodes[node_id]
127
        
128
    def add_diskimage(self, pnode, diskimage_id, imagesize, lease_id, vnode):
129
        self.logger.debug("Adding disk image for L%iV%i in pnode=%i" % (lease_id, vnode, pnode))
130
        
131
        self.logger.vdebug("Files BEFORE:")
132
        self.get_node(pnode).print_files()
133
        
134
        imagefile = self.deployment.resolve_to_file(lease_id, vnode, diskimage_id)
135
        img = DiskImageFile(imagefile, imagesize, lease_id, vnode, diskimage_id)
136
        self.get_node(pnode).add_file(img)
137

    
138
        self.logger.vdebug("Files AFTER:")
139
        self.get_node(pnode).print_files()
140
        
141
        return img
142
            
143
    def remove_diskimage(self, pnode, lease, vnode):
144
        node = self.get_node(pnode)
145
        node.print_files()
146

    
147
        self.logger.debug("Removing disk image for L%iV%i in node %i" % (lease, vnode, pnode))
148
        node.remove_diskimage(lease, vnode)
149

    
150
        node.print_files()
151
                
152
    def add_ramfile(self, pnode, lease_id, vnode, size):
153
        node = self.get_node(pnode)
154
        self.logger.debug("Adding RAM file for L%iV%i in node %i" % (lease_id, vnode, pnode))
155
        node.print_files()
156
        f = RAMImageFile("RAM_L%iV%i" % (lease_id, vnode), size, lease_id, vnode)
157
        node.add_file(f)        
158
        node.print_files()
159

    
160
    def remove_ramfile(self, pnode, lease_id, vnode):
161
        node = self.get_node(pnode)
162
        self.logger.debug("Removing RAM file for L%iV%i in node %i" % (lease_id, vnode, pnode))
163
        node.print_files()
164
        node.remove_ramfile(lease_id, vnode)
165
        node.print_files()
166
        
167
    def get_max_disk_usage(self):
168
        return max([n.get_disk_usage() for n in self.nodes.values()])
169
    
170
class ResourcePoolNode(object):
171
    def __init__(self, node_id, hostname, capacity):
172
        self.logger = logging.getLogger("RESOURCEPOOL")
173
        self.id = node_id
174
        self.hostname = hostname
175
        self.capacity = capacity
176
        self.files = []
177

    
178
        # enactment-specific information
179
        self.enactment_info = None
180
        
181
    def get_capacity(self):
182
        return self.capacity
183
           
184
    def add_file(self, f):
185
        self.files.append(f)
186
        
187
    def get_diskimage(self, lease_id, vnode, diskimage_id):
188
        image = [f for f in self.files if isinstance(f, DiskImageFile) and 
189
                 f.diskimage_id == diskimage_id and 
190
                 f.lease_id == lease_id and
191
                 f.vnode == vnode]
192
        if len(image) == 0:
193
            return None
194
        elif len(image) == 1:
195
            return image[0]
196
        elif len(image) > 1:
197
            self.logger.warning("More than one tainted image for L%iV%i on node %i" % (lease_id, vnode, self.id))
198
            return image[0]
199

    
200
    def remove_diskimage(self, lease_id, vnode):
201
        image = [f for f in self.files if isinstance(f, DiskImageFile) and 
202
                 f.lease_id == lease_id and
203
                 f.vnode == vnode]
204
        if len(image) > 0:
205
            image = image[0]
206
            self.files.remove(image)
207
            
208
    def remove_ramfile(self, lease_id, vnode):
209
        ramfile = [f for f in self.files if isinstance(f, RAMImageFile) and f.lease_id==lease_id and f.vnode==vnode]
210
        if len(ramfile) > 0:
211
            ramfile = ramfile[0]
212
            self.files.remove(ramfile)
213
                
214
        
215
    def get_disk_usage(self):
216
        return sum([f.filesize for f in self.files])
217

    
218

    
219
    def get_diskimages(self):
220
        return [f for f in self.files if isinstance(f, DiskImageFile)]
221
        
222
    def print_files(self):
223
        images = ""
224
        if len(self.files) > 0:
225
            images = ", ".join([str(img) for img in self.files])
226
        self.logger.vdebug("Node %i files: %iMB %s" % (self.id, self.get_disk_usage(), images))
227

    
228
    def xmlrpc_marshall(self):
229
        # Convert to something we can send through XMLRPC
230
        h = {}
231
        h["id"] = self.id
232
        h["hostname"] = self.hostname
233
        h["cpu"] = self.capacity.get_quantity(constants.RES_CPU)
234
        h["mem"] = self.capacity.get_quantity(constants.RES_MEM)
235
                
236
        return h
237
        
238

    
239
        
240
class File(object):
241
    def __init__(self, filename, filesize):
242
        self.filename = filename
243
        self.filesize = filesize
244
        
245
class DiskImageFile(File):
246
    def __init__(self, filename, filesize, lease_id, vnode, diskimage_id):
247
        File.__init__(self, filename, filesize)
248
        self.lease_id = lease_id
249
        self.vnode = vnode
250
        self.diskimage_id = diskimage_id
251
                
252
    def __str__(self):
253
        return "(DISK L%iv%i %s %s)" % (self.lease_id, self.vnode, self.diskimage_id, self.filename)
254

    
255

    
256
class RAMImageFile(File):
257
    def __init__(self, filename, filesize, lease_id, vnode):
258
        File.__init__(self, filename, filesize)
259
        self.lease_id = lease_id
260
        self.vnode = vnode
261
                
262
    def __str__(self):
263
        return "(RAM L%iv%i %s)" % (self.lease_id, self.vnode, self.filename)
264
    
265
class ResourcePoolWithReusableImages(ResourcePool):
266
    def __init__(self, info_enact, vm_enact, deploy_enact):
267
        ResourcePool.__init__(self, info_enact, vm_enact, deploy_enact)
268
        
269
        self.nodes = dict([(id,ResourcePoolNodeWithReusableImages.from_node(node)) for id, node in self.nodes.items()])
270
    
271
    def add_reusable_image(self, pnode, diskimage_id, imagesize, mappings, timeout):
272
        self.logger.debug("Adding reusable image for %s in pnode=%i" % (mappings, pnode))
273
        
274
        self.logger.vdebug("Files BEFORE:")
275
        self.get_node(pnode).print_files()
276
        
277
        imagefile = "reusable-%s" % diskimage_id
278
        img = ReusableDiskImageFile(imagefile, imagesize, diskimage_id, timeout)
279
        for (lease_id, vnode) in mappings:
280
            img.add_mapping(lease_id, vnode)
281

    
282
        self.get_node(pnode).add_reusable_image(img)
283

    
284
        self.logger.vdebug("Files AFTER:")
285
        self.get_node(pnode).print_files()
286
        
287
        return img
288
    
289
    def add_mapping_to_existing_reusable_image(self, pnode_id, diskimage_id, lease_id, vnode, timeout):
290
        self.get_node(pnode_id).add_mapping_to_existing_reusable_image(diskimage_id, lease_id, vnode, timeout)
291
    
292
    def remove_diskimage(self, pnode_id, lease, vnode):
293
        ResourcePool.remove_diskimage(self, pnode_id, lease, vnode)
294
        self.logger.debug("Removing cached images for L%iV%i in node %i" % (lease, vnode, pnode_id))
295
        for img in self.get_node(pnode_id).get_reusable_images():
296
            if (lease, vnode) in img.mappings:
297
                img.mappings.remove((lease, vnode))
298
            self.get_node(pnode_id).print_files()
299
            # Keep image around, even if it isn't going to be used
300
            # by any VMs. It might be reused later on.
301
            # It will be purged if space has to be made available
302
            # for other images
303
        
304
    def get_nodes_with_reusable_image(self, diskimage_id, after = None):
305
        return [n.id for n in self.get_nodes() if n.exists_reusable_image(diskimage_id, after=after)]
306

    
307
    def exists_reusable_image(self, pnode_id, diskimage_id, after):
308
        return self.get_node(pnode_id).exists_reusable_image(diskimage_id, after = after)
309
    
310
    
311
class ResourcePoolNodeWithReusableImages(ResourcePoolNode):
312
    def __init__(self, node_id, hostname, capacity):
313
        ResourcePoolNode.__init__(self, node_id, hostname, capacity)
314
        self.reusable_images = []
315

    
316
    @classmethod
317
    def from_node(cls, n):
318
        node = cls(n.id, n.hostname, n.capacity)
319
        node.enactment_info = n.enactment_info
320
        return node
321
    
322
    def add_reusable_image(self, f):
323
        self.reusable_images.append(f)
324

    
325
    def add_mapping_to_existing_reusable_image(self, diskimage_id, lease_id, vnode, timeout):
326
        for f in self.reusable_images:
327
            if f.diskimage_id == diskimage_id:
328
                f.add_mapping(lease_id, vnode)
329
                f.update_timeout(timeout)
330
                break  # Ugh
331
        self.print_files()
332
            
333
    def get_reusable_image(self, diskimage_id, after = None, lease_id=None, vnode=None):
334
        images = [i for i in self.reusable_images if i.diskimage_id == diskimage_id]
335
        if after != None:
336
            images = [i for i in images if i.timeout >= after]
337
        if lease_id != None and vnode != None:
338
            images = [i for i in images if i.has_mapping(lease_id, vnode)]
339
        if len(images)>0:
340
            return images[0]
341
        else:
342
            return None
343
        
344
    def exists_reusable_image(self, imagefile, after = None, lease_id=None, vnode=None):
345
        entry = self.get_reusable_image(imagefile, after = after, lease_id=lease_id, vnode=vnode)
346
        if entry == None:
347
            return False
348
        else:
349
            return True
350

    
351
    def get_reusable_images(self):
352
        return self.reusable_images
353

    
354
    def get_reusable_images_size(self):
355
        return sum([f.filesize for f in self.reusable_images])
356
    
357
    def purge_oldest_unused_image(self):
358
        unused = [img for img in self.reusable_images if not img.has_mappings()]
359
        if len(unused) == 0:
360
            return 0
361
        else:
362
            i = iter(unused)
363
            oldest = i.next()
364
            for img in i:
365
                if img.timeout < oldest.timeout:
366
                    oldest = img
367
            self.reusable_images.remove(oldest)
368
            return 1
369
    
370
    def purge_downto(self, target):
371
        done = False
372
        while not done:
373
            removed = self.purge_oldest_unused_image()
374
            if removed==0:
375
                done = True
376
                success = False
377
            elif removed == 1:
378
                if self.get_reusable_images_size() <= target:
379
                    done = True
380
                    success = True
381
        return success
382

    
383
    def print_files(self):
384
        ResourcePoolNode.print_files(self)
385
        images = ""
386
        if len(self.reusable_images) > 0:
387
            images = ", ".join([str(img) for img in self.reusable_images])
388
        self.logger.vdebug("Node %i reusable images: %iMB %s" % (self.id, self.get_reusable_images_size(), images))
389

    
390
class ReusableDiskImageFile(File):
391
    def __init__(self, filename, filesize, diskimage_id, timeout):
392
        File.__init__(self, filename, filesize)
393
        self.diskimage_id = diskimage_id
394
        self.mappings = set([])
395
        self.timeout = timeout
396
        
397
    def add_mapping(self, lease_id, vnode):
398
        self.mappings.add((lease_id, vnode))
399
        
400
    def has_mapping(self, lease_id, vnode):
401
        return (lease_id, vnode) in self.mappings
402
    
403
    def has_mappings(self):
404
        return len(self.mappings) > 0
405
        
406
    def update_timeout(self, timeout):
407
        if timeout > self.timeout:
408
            self.timeout = timeout
409
        
410
    def is_expired(self, curTime):
411
        if self.timeout == None:
412
            return False
413
        elif self.timeout > curTime:
414
            return True
415
        else:
416
            return False
417
        
418
    def __str__(self):
419
        if self.timeout == None:
420
            timeout = "NOTIMEOUT"
421
        else:
422
            timeout = self.timeout
423
        return "(REUSABLE %s %s %s %s)" % (vnodemapstr(self.mappings), self.diskimage_id, str(timeout), self.filename)
424