Project

General

Profile

root / trunk / src / haizea / core / scheduler / preparation_schedulers / imagetransfer.py @ 632

1
# -------------------------------------------------------------------------- #
2
# Copyright 2006-2008, University of Chicago                                 #
3
# Copyright 2008, Distributed Systems Architecture Group, Universidad        #
4
# Complutense de Madrid (dsa-research.org)                                   #
5
#                                                                            #
6
# Licensed under the Apache License, Version 2.0 (the "License"); you may    #
7
# not use this file except in compliance with the License. You may obtain    #
8
# a copy of the License at                                                   #
9
#                                                                            #
10
# http://www.apache.org/licenses/LICENSE-2.0                                 #
11
#                                                                            #
12
# Unless required by applicable law or agreed to in writing, software        #
13
# distributed under the License is distributed on an "AS IS" BASIS,          #
14
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.   #
15
# See the License for the specific language governing permissions and        #
16
# limitations under the License.                                             #
17
# -------------------------------------------------------------------------- #
18

    
19
import haizea.common.constants as constants
20
from haizea.core.scheduler.preparation_schedulers import PreparationScheduler
21
from haizea.core.scheduler.slottable import ResourceReservation
22
from haizea.core.scheduler import MigrationResourceReservation
23
from haizea.core.leases import Lease, Capacity, UnmanagedSoftwareEnvironment
24
from haizea.core.scheduler import ReservationEventHandler, NotSchedulableException, EarliestStartingTime
25
from haizea.common.utils import estimate_transfer_time, get_config
26
from haizea.core.scheduler.slottable import ResourceTuple
27
from mx.DateTime import TimeDelta
28

    
29
import copy
30
import bisect
31

    
32

    
33
class ImageTransferPreparationScheduler(PreparationScheduler):
34
    def __init__(self, slottable, resourcepool, deployment_enact):
35
        PreparationScheduler.__init__(self, slottable, resourcepool, deployment_enact)
36
        
37
        self.imagenode = self.deployment_enact.get_imagenode()
38
        
39
        self.transfers = []
40
        self.completed_transfers = []
41

    
42
        config = get_config()
43
        self.reusealg = config.get("diskimage-reuse")
44
        if self.reusealg == constants.REUSE_IMAGECACHES:
45
            self.maxcachesize = config.get("diskimage-cache-size")
46
        else:
47
            self.maxcachesize = None
48
        
49
        self.imagenode_bandwidth = self.deployment_enact.get_bandwidth()
50
        
51
        self.handlers ={}
52
        self.handlers[FileTransferResourceReservation] = ReservationEventHandler(
53
                                sched    = self,
54
                                on_start = ImageTransferPreparationScheduler._handle_start_filetransfer,
55
                                on_end   = ImageTransferPreparationScheduler._handle_end_filetransfer)
56

    
57
        self.handlers[DiskImageMigrationResourceReservation] = ReservationEventHandler(
58
                                sched    = self,
59
                                on_start = ImageTransferPreparationScheduler._handle_start_migrate,
60
                                on_end   = ImageTransferPreparationScheduler._handle_end_migrate)
61

    
62
    def schedule(self, lease, vmrr, earliest):
63
        if type(lease.software) == UnmanagedSoftwareEnvironment:
64
            return [], True
65
        if lease.get_type() == Lease.ADVANCE_RESERVATION:
66
            return self.__schedule_deadline(lease, vmrr, earliest)
67
        elif lease.get_type() in (Lease.BEST_EFFORT, Lease.IMMEDIATE):
68
            return self.__schedule_asap(lease, vmrr, earliest)
69

    
70
    def schedule_migration(self, lease, vmrr, nexttime):
71
        if type(lease.software) == UnmanagedSoftwareEnvironment:
72
            return []
73
        
74
        # This code is the same as the one in vm_scheduler
75
        # Should be factored out
76
        last_vmrr = lease.get_last_vmrr()
77
        vnode_migrations = dict([(vnode, (last_vmrr.nodes[vnode], vmrr.nodes[vnode])) for vnode in vmrr.nodes])
78
        
79
        mustmigrate = False
80
        for vnode in vnode_migrations:
81
            if vnode_migrations[vnode][0] != vnode_migrations[vnode][1]:
82
                mustmigrate = True
83
                break
84
            
85
        if not mustmigrate:
86
            return []
87

    
88
        if get_config().get("migration") == constants.MIGRATE_YES_NOTRANSFER:
89
            start = nexttime
90
            end = nexttime
91
            res = {}
92
            migr_rr = DiskImageMigrationResourceReservation(lease, start, end, res, vmrr, vnode_migrations)
93
            migr_rr.state = ResourceReservation.STATE_SCHEDULED
94
            return [migr_rr]
95

    
96
        # Figure out what migrations can be done simultaneously
97
        migrations = []
98
        while len(vnode_migrations) > 0:
99
            pnodes = set()
100
            migration = {}
101
            for vnode in vnode_migrations:
102
                origin = vnode_migrations[vnode][0]
103
                dest = vnode_migrations[vnode][1]
104
                if not origin in pnodes and not dest in pnodes:
105
                    migration[vnode] = vnode_migrations[vnode]
106
                    pnodes.add(origin)
107
                    pnodes.add(dest)
108
            for vnode in migration:
109
                del vnode_migrations[vnode]
110
            migrations.append(migration)
111
        
112
        # Create migration RRs
113
        start = max(last_vmrr.post_rrs[-1].end, nexttime)
114
        bandwidth = self.resourcepool.info.get_migration_bandwidth()
115
        migr_rrs = []
116
        for m in migrations:
117
            mb_to_migrate = lease.software.image_size * len(m.keys())
118
            migr_time = estimate_transfer_time(mb_to_migrate, bandwidth)
119
            end = start + migr_time
120
            res = {}
121
            for (origin,dest) in m.values():
122
                resorigin = Capacity([constants.RES_NETOUT])
123
                resorigin.set_quantity(constants.RES_NETOUT, bandwidth)
124
                resdest = Capacity([constants.RES_NETIN])
125
                resdest.set_quantity(constants.RES_NETIN, bandwidth)
126
                res[origin] = self.slottable.create_resource_tuple_from_capacity(resorigin)
127
                res[dest] = self.slottable.create_resource_tuple_from_capacity(resdest)                
128
            migr_rr = DiskImageMigrationResourceReservation(lease, start, start + migr_time, res, vmrr, m)
129
            migr_rr.state = ResourceReservation.STATE_SCHEDULED
130
            migr_rrs.append(migr_rr)
131
            start = end
132
        
133
        return migr_rrs
134

    
135
    def estimate_migration_time(self, lease):
136
        migration = get_config().get("migration")
137
        if migration == constants.MIGRATE_YES:
138
            vmrr = lease.get_last_vmrr()
139
            images_in_pnode = dict([(pnode,0) for pnode in set(vmrr.nodes.values())])
140
            for (vnode,pnode) in vmrr.nodes.items():
141
                images_in_pnode[pnode] += lease.software.image_size
142
            max_to_transfer = max(images_in_pnode.values())
143
            bandwidth = self.resourcepool.info.get_migration_bandwidth()
144
            return estimate_transfer_time(max_to_transfer, bandwidth)
145
        elif migration == constants.MIGRATE_YES_NOTRANSFER:
146
            return TimeDelta(seconds=0)
147

    
148
    def find_earliest_starting_times(self, lease, nexttime):
149
        node_ids = [node.id for node in self.resourcepool.get_nodes()]  
150
        config = get_config()
151
        mechanism = config.get("transfer-mechanism")
152
        reusealg = config.get("diskimage-reuse")
153
        avoidredundant = config.get("avoid-redundant-transfers")
154
        
155
        if type(lease.software) == UnmanagedSoftwareEnvironment:
156
            earliest = {}
157
            for node in node_ids:
158
                earliest[node] = EarliestStartingTime(nexttime, EarliestStartingTime.EARLIEST_NOPREPARATION)
159
            return earliest
160
        
161
        # Figure out earliest times assuming we have to transfer the images
162
        transfer_duration = self.__estimate_image_transfer_time(lease, self.imagenode_bandwidth)
163
        if mechanism == constants.TRANSFER_UNICAST:
164
            transfer_duration *= lease.numnodes
165
        start = self.__get_next_transfer_slot(nexttime, transfer_duration)
166
        earliest = {}
167
        for node in node_ids:
168
            earliest[node] = ImageTransferEarliestStartingTime(start + transfer_duration, ImageTransferEarliestStartingTime.EARLIEST_IMAGETRANSFER)
169
            earliest[node].transfer_start = start
170
                
171
        # Check if we can reuse images
172
        if reusealg == constants.REUSE_IMAGECACHES:
173
            nodeswithimg = self.resourcepool.get_nodes_with_reusable_image(lease.software.image_id)
174
            for node in nodeswithimg:
175
                earliest[node].time = nexttime
176
                earliest[node].type = ImageTransferEarliestStartingTime.EARLIEST_REUSE
177
        
178
                
179
        # Check if we can avoid redundant transfers
180
        if avoidredundant:
181
            if mechanism == constants.TRANSFER_UNICAST:
182
                # Piggybacking not supported if unicasting 
183
                # each individual image
184
                pass
185
            if mechanism == constants.TRANSFER_MULTICAST:                
186
                # We can only piggyback on transfers that haven't started yet
187
                transfers = [t for t in self.transfers if t.state == ResourceReservation.STATE_SCHEDULED]
188
                for t in transfers:
189
                    if t.file == lease.software.image_id:
190
                        start = t.end
191
                        if start > nexttime:
192
                            for n in earliest:
193
                                if start < earliest[n].time:
194
                                    earliest[n].time = start
195
                                    earliest[n].type = ImageTransferEarliestStartingTime.EARLIEST_PIGGYBACK
196
                                    earliest[n].piggybacking_on = t
197

    
198
        return earliest
199
            
200
    def cancel_preparation(self, lease):
201
        toremove = self.__remove_transfers(lease)     
202
        for t in toremove:
203
            t.lease.remove_preparationrr(t)
204
            self.slottable.remove_reservation(t)
205
        self.__remove_files(lease)
206
        
207
    def cleanup(self, lease):                
208
        self.__remove_files(lease)
209
  
210
        
211
    def __schedule_deadline(self, lease, vmrr, earliest):
212
        config = get_config()
213
        reusealg = config.get("diskimage-reuse")
214
        avoidredundant = config.get("avoid-redundant-transfers")
215
        is_ready = False
216
            
217
        musttransfer = {}
218
        mustpool = {}
219
        nodeassignment = vmrr.nodes
220
        start = lease.start.requested
221
        end = lease.start.requested + lease.duration.requested
222
        for (vnode, pnode) in nodeassignment.items():
223
            lease_id = lease.id
224
            self.logger.debug("Scheduling image transfer of '%s' for vnode %i to physnode %i" % (lease.software.image_id, vnode, pnode))
225

    
226
            if reusealg == constants.REUSE_IMAGECACHES:
227
                if self.resourcepool.exists_reusable_image(pnode, lease.software.image_id, start):
228
                    self.logger.debug("No need to schedule an image transfer (reusing an image in pool)")
229
                    mustpool[vnode] = pnode                            
230
                else:
231
                    self.logger.debug("Need to schedule a transfer.")
232
                    musttransfer[vnode] = pnode
233
            else:
234
                self.logger.debug("Need to schedule a transfer.")
235
                musttransfer[vnode] = pnode
236

    
237
        if len(musttransfer) == 0:
238
            is_ready = True
239
        else:
240
            try:
241
                transfer_rrs = self.__schedule_imagetransfer_edf(lease, musttransfer, earliest)
242
            except NotSchedulableException, exc:
243
                raise
244
 
245
        # No chance of scheduling exception at this point. It's safe
246
        # to add entries to the pools
247
        if reusealg == constants.REUSE_IMAGECACHES:
248
            for (vnode, pnode) in mustpool.items():
249
                self.resourcepool.add_mapping_to_existing_reusable_image(pnode, lease.diskimage_id, lease.id, vnode, start)
250
                self.resourcepool.add_diskimage(pnode, lease.diskimage_id, lease.diskimage_size, lease.id, vnode)
251
                
252
        return transfer_rrs, is_ready
253

    
254

    
255
    def __schedule_asap(self, lease, vmrr, earliest):
256
        config = get_config()
257
        reusealg = config.get("diskimage-reuse")
258
        avoidredundant = config.get("avoid-redundant-transfers")
259

    
260
        is_ready = False
261

    
262
        transfer_rrs = []
263
        musttransfer = {}
264
        piggybacking = []
265
        for (vnode, pnode) in vmrr.nodes.items():
266
            earliest_type = earliest[pnode].type
267
            if earliest_type == ImageTransferEarliestStartingTime.EARLIEST_REUSE:
268
                # Add to pool
269
                self.logger.debug("Reusing image for V%i->P%i." % (vnode, pnode))
270
                self.resourcepool.add_mapping_to_existing_reusable_image(pnode, lease.software.image_id, lease.id, vnode, vmrr.end)
271
                self.resourcepool.add_diskimage(pnode, lease.software.image_id, lease.software.image_size, lease.id, vnode)
272
            elif earliest_type == ImageTransferEarliestStartingTime.EARLIEST_PIGGYBACK:
273
                # We can piggyback on an existing transfer
274
                transfer_rr = earliest[pnode].piggybacking_on
275
                transfer_rr.piggyback(lease.id, vnode, pnode)
276
                self.logger.debug("Piggybacking transfer for V%i->P%i on existing transfer in lease %i." % (vnode, pnode, transfer_rr.lease.id))
277
                piggybacking.append(transfer_rr)
278
            else:
279
                # Transfer
280
                musttransfer[vnode] = pnode
281
                self.logger.debug("Must transfer V%i->P%i." % (vnode, pnode))
282

    
283
        if len(musttransfer)>0:
284
            transfer_rrs = self.__schedule_imagetransfer_fifo(lease, musttransfer, earliest)
285
            
286
        if len(musttransfer)==0 and len(piggybacking)==0:
287
            is_ready = True
288
            
289
        return transfer_rrs, is_ready
290

    
291

    
292
    def __schedule_imagetransfer_edf(self, lease, musttransfer, earliest):
293
        # Estimate image transfer time 
294
        bandwidth = self.deployment_enact.get_bandwidth()
295
        config = get_config()
296
        mechanism = config.get("transfer-mechanism")
297
        transfer_duration = self.__estimate_image_transfer_time(lease, bandwidth)
298
        if mechanism == constants.TRANSFER_UNICAST:
299
            transfer_duration *= len(musttransfer)
300

    
301
        # Determine start time
302
        start = self.__get_last_transfer_slot(lease.start.requested, transfer_duration)
303

    
304
        res = {}
305
        resimgnode = Capacity([constants.RES_NETOUT])
306
        resimgnode.set_quantity(constants.RES_NETOUT, bandwidth)
307
        resnode = Capacity([constants.RES_NETIN])
308
        resnode.set_quantity(constants.RES_NETIN, bandwidth)
309
        res[self.imagenode.id] = self.slottable.create_resource_tuple_from_capacity(resimgnode)
310
        for pnode in musttransfer.values():
311
            res[pnode] = self.slottable.create_resource_tuple_from_capacity(resnode)
312
        
313
        newtransfer = FileTransferResourceReservation(lease, res)
314
        newtransfer.deadline = lease.start.requested
315
        newtransfer.state = ResourceReservation.STATE_SCHEDULED
316
        newtransfer.file = lease.software.image_id
317
        newtransfer.start = start
318
        newtransfer.end = start + transfer_duration
319
        for vnode, pnode in musttransfer.items():
320
            newtransfer.piggyback(lease.id, vnode, pnode)
321
        
322
        bisect.insort(self.transfers, newtransfer)
323
        
324
        return [newtransfer]
325
    
326
    def __schedule_imagetransfer_fifo(self, lease, musttransfer, earliest):
327
        # Estimate image transfer time 
328
        bandwidth = self.imagenode_bandwidth
329
        config = get_config()
330
        mechanism = config.get("transfer-mechanism")
331
        
332
        # The starting time is the first available slot, which was
333
        # included in the "earliest" dictionary.
334
        pnodes = musttransfer.values()
335
        start = earliest[pnodes[0]].transfer_start
336
        transfer_duration = self.__estimate_image_transfer_time(lease, bandwidth)
337
        
338
        res = {}
339
        resimgnode = Capacity([constants.RES_NETOUT])
340
        resimgnode.set_quantity(constants.RES_NETOUT, bandwidth)
341
        resnode = Capacity([constants.RES_NETIN])
342
        resnode.set_quantity(constants.RES_NETIN, bandwidth)
343
        res[self.imagenode.id] = self.slottable.create_resource_tuple_from_capacity(resimgnode)
344
        for n in musttransfer.values():
345
            res[n] = self.slottable.create_resource_tuple_from_capacity(resnode)
346
         
347
        newtransfer = FileTransferResourceReservation(lease, res)
348
        newtransfer.start = start
349
        if mechanism == constants.TRANSFER_UNICAST:
350
            newtransfer.end = start + (len(musttransfer) * transfer_duration)
351
        if mechanism == constants.TRANSFER_MULTICAST:
352
            newtransfer.end = start + transfer_duration
353
        
354
        newtransfer.deadline = None
355
        newtransfer.state = ResourceReservation.STATE_SCHEDULED
356
        newtransfer.file = lease.software.image_id
357
        for vnode, pnode in musttransfer.items():
358
            newtransfer.piggyback(lease.id, vnode, pnode)
359
            
360
        bisect.insort(self.transfers, newtransfer)
361
        
362
        return [newtransfer]
363
    
364
    
365
    def __estimate_image_transfer_time(self, lease, bandwidth):
366
        config = get_config()
367
        force_transfer_time = config.get("force-imagetransfer-time")
368
        if force_transfer_time != None:
369
            return force_transfer_time
370
        else:      
371
            return estimate_transfer_time(lease.software.image_size, bandwidth)    
372
    
373
    
374
    def __get_next_transfer_slot(self, nexttime, required_duration):
375
        # This can probably be optimized by using one of the many
376
        # "list of holes" algorithms out there
377
        if len(self.transfers) == 0:
378
            return nexttime
379
        elif nexttime + required_duration <= self.transfers[0].start:
380
            return nexttime
381
        else:
382
            for i in xrange(len(self.transfers) - 1):
383
                if self.transfers[i].end != self.transfers[i+1].start:
384
                    hole_duration = self.transfers[i+1].start - self.transfers[i].end
385
                    if hole_duration >= required_duration:
386
                        return self.transfers[i].end
387
            return self.transfers[-1].end
388
        
389
        
390
    def __get_last_transfer_slot(self, deadline, required_duration):
391
        # This can probably be optimized by using one of the many
392
        # "list of holes" algorithms out there
393
        if len(self.transfers) == 0:
394
            return deadline - required_duration
395
        elif self.transfers[-1].end + required_duration <= deadline:
396
            return deadline - required_duration
397
        else:
398
            for i in xrange(len(self.transfers) - 1, 0, -1):
399
                if self.transfers[i].start != self.transfer[i-1].end:
400
                    hole_duration = self.transfer[i].start - self.transfers[i-1].end
401
                    if hole_duration >= required_duration:
402
                        return self.transfer[i].start - required_duration
403
            return self.transfers[0].start - required_duration
404

    
405
    def __remove_transfers(self, lease):
406
        toremove = []
407
        for t in self.transfers:
408
            for pnode in t.transfers:
409
                leases = [l for l, v in t.transfers[pnode]]
410
                if lease in leases:
411
                    newtransfers = [(l, v) for l, v in t.transfers[pnode] if l!=lease]
412
                    t.transfers[pnode] = newtransfers
413
            # Check if the transfer has to be cancelled
414
            a = sum([len(l) for l in t.transfers.values()])
415
            if a == 0:
416
                toremove.append(t)
417
        for t in toremove:
418
            self.transfers.remove(t)
419
            
420
        return toremove
421
    
422
    def __remove_files(self, lease):
423
        for vnode, pnode in lease.get_last_vmrr().nodes.items():
424
            self.resourcepool.remove_diskimage(pnode, lease.id, vnode)         
425

    
426
    @staticmethod
427
    def _handle_start_filetransfer(sched, lease, rr):
428
        sched.logger.debug("LEASE-%i Start of handleStartFileTransfer" % lease.id)
429
        lease.print_contents()
430
        lease_state = lease.get_state()
431
        if lease_state == Lease.STATE_SCHEDULED or lease_state == Lease.STATE_READY:
432
            lease.set_state(Lease.STATE_PREPARING)
433
            rr.state = ResourceReservation.STATE_ACTIVE
434
            # TODO: Enactment
435
        else:
436
            raise InconsistentLeaseStateError(l, doing = "starting a file transfer")
437
            
438
        # TODO: Check for piggybacking
439
        
440
        lease.print_contents()
441
        sched.logger.debug("LEASE-%i End of handleStartFileTransfer" % lease.id)
442
        sched.logger.info("Starting image transfer for lease %i" % (lease.id))
443

    
444
    @staticmethod
445
    def _handle_end_filetransfer(sched, lease, rr):
446
        sched.logger.debug("LEASE-%i Start of handleEndFileTransfer" % lease.id)
447
        lease.print_contents()
448
        lease_state = lease.get_state()
449
        if lease_state == Lease.STATE_PREPARING:
450
            lease.set_state(Lease.STATE_READY)
451
            rr.state = ResourceReservation.STATE_DONE
452
            for physnode in rr.transfers:
453
                vnodes = rr.transfers[physnode]
454
 
455
#                # Find out timeout of image. It will be the latest end time of all the
456
#                # leases being used by that image.
457
#                leases = [l for (l, v) in vnodes]
458
#                maxend=None
459
#                for lease_id in leases:
460
#                    l = sched.leases.get_lease(lease_id)
461
#                    end = lease.get_endtime()
462
#                    if maxend==None or end>maxend:
463
#                        maxend=end
464
                maxend = None
465
                # TODO: ENACTMENT: Verify the image was transferred correctly
466
                sched._add_diskimages(physnode, rr.file, lease.software.image_size, vnodes, timeout=maxend)
467
        else:
468
            raise InconsistentLeaseStateError(l, doing = "ending a file transfer")
469

    
470
        sched.transfers.remove(rr)
471
        lease.print_contents()
472
        sched.logger.debug("LEASE-%i End of handleEndFileTransfer" % lease.id)
473
        sched.logger.info("Completed image transfer for lease %i" % (lease.id))
474

    
475
    def _handle_start_migrate(self, l, rr):
476
        self.logger.debug("LEASE-%i Start of handleStartMigrate" % l.id)
477
        l.print_contents()
478
        rr.state = ResourceReservation.STATE_ACTIVE
479
        l.print_contents()
480
        self.logger.debug("LEASE-%i End of handleStartMigrate" % l.id)
481
        self.logger.info("Migrating lease %i..." % (l.id))
482

    
483
    def _handle_end_migrate(self, l, rr):
484
        self.logger.debug("LEASE-%i Start of handleEndMigrate" % l.id)
485
        l.print_contents()
486

    
487
        for vnode in rr.transfers:
488
            origin = rr.transfers[vnode][0]
489
            dest = rr.transfers[vnode][1]
490
            
491
            self.resourcepool.remove_diskimage(origin, l.id, vnode)
492
            self.resourcepool.add_diskimage(dest, l.software.image_id, l.software.image_size, l.id, vnode)
493
        
494
        rr.state = ResourceReservation.STATE_DONE
495
        l.print_contents()
496
        self.logger.debug("LEASE-%i End of handleEndMigrate" % l.id)
497
        self.logger.info("Migrated lease %i..." % (l.id))
498
        
499
    def _add_diskimages(self, pnode_id, diskimage_id, diskimage_size, vnodes, timeout):
500
        self.logger.debug("Adding image for leases=%s in nod_id=%i" % (vnodes, pnode_id))
501

    
502
        pnode = self.resourcepool.get_node(pnode_id)
503

    
504
        if self.reusealg == constants.REUSE_NONE:
505
            for (lease_id, vnode) in vnodes:
506
                self.resourcepool.add_diskimage(pnode_id, diskimage_id, diskimage_size, lease_id, vnode)
507
        elif self.reusealg == constants.REUSE_IMAGECACHES:
508
            # Sometimes we might find that the image is already deployed
509
            # (although unused). In that case, don't add another copy to
510
            # the pool. Just "reactivate" it.
511
            if pnode.exists_reusable_image(diskimage_id):
512
                for (lease_id, vnode) in vnodes:
513
                    pnode.add_mapping_to_existing_reusable_image(diskimage_id, lease_id, vnode, timeout)
514
            else:
515
                if self.maxcachesize == constants.CACHESIZE_UNLIMITED:
516
                    can_add_to_cache = True
517
                else:
518
                    # We may have to remove images from the cache
519
                    cachesize = pnode.get_reusable_images_size()
520
                    reqsize = cachesize + diskimage_size
521
                    if reqsize > self.maxcachesize:
522
                        # Have to shrink cache
523
                        desiredsize = self.maxcachesize - diskimage_size
524
                        self.logger.debug("Adding the image would make the size of pool in node %i = %iMB. Will try to bring it down to %i" % (pnode_id, reqsize, desiredsize))
525
                        pnode.print_files()
526
                        success = pnode.purge_downto(self.maxcachesize)
527
                        if not success:
528
                            can_add_to_cache = False
529
                        else:
530
                            can_add_to_cache = True
531
                    else:
532
                        can_add_to_cache = True
533
                        
534
                if can_add_to_cache:
535
                    self.resourcepool.add_reusable_image(pnode_id, diskimage_id, diskimage_size, vnodes, timeout)
536
                else:
537
                    # This just means we couldn't add the image
538
                    # to the pool. We will have to make do with just adding the tainted images.
539
                    self.logger.debug("Unable to add to pool. Must create individual disk images directly instead.")
540
                    
541
            # Besides adding the image to the cache, we need to create a separate image for
542
            # this specific lease
543
            for (lease_id, vnode) in vnodes:
544
                self.resourcepool.add_diskimage(pnode_id, diskimage_id, diskimage_size, lease_id, vnode)
545
                    
546
        pnode.print_files()
547

    
548
class FileTransferResourceReservation(ResourceReservation):
549
    def __init__(self, lease, res, start=None, end=None):
550
        ResourceReservation.__init__(self, lease, start, end, res)
551
        self.deadline = None
552
        self.file = None
553
        # Dictionary of  physnode -> [ (lease_id, vnode)* ]
554
        self.transfers = {}
555

    
556
    def print_contents(self, loglevel="VDEBUG"):
557
        ResourceReservation.print_contents(self, loglevel)
558
        self.logger.log(loglevel, "Type           : FILE TRANSFER")
559
        self.logger.log(loglevel, "Deadline       : %s" % self.deadline)
560
        self.logger.log(loglevel, "File           : %s" % self.file)
561
        self.logger.log(loglevel, "Transfers      : %s" % self.transfers)
562
        
563
    def piggyback(self, lease_id, vnode, physnode):
564
        if self.transfers.has_key(physnode):
565
            self.transfers[physnode].append((lease_id, vnode))
566
        else:
567
            self.transfers[physnode] = [(lease_id, vnode)]
568
            
569
    def is_preemptible(self):
570
        return False       
571
    
572
    def __cmp__(self, rr):
573
        return cmp(self.start, rr.start)
574
    
575
class ImageTransferEarliestStartingTime(EarliestStartingTime):
576
    EARLIEST_IMAGETRANSFER = 2
577
    EARLIEST_REUSE = 3
578
    EARLIEST_PIGGYBACK = 4
579
    
580
    def __init__(self, time, type):
581
        EarliestStartingTime.__init__(self, time, type)
582
        self.transfer_start = None
583
        self.piggybacking_on = None
584

    
585
class DiskImageMigrationResourceReservation(MigrationResourceReservation):
586
    def __init__(self, lease, start, end, res, vmrr, transfers):
587
        MigrationResourceReservation.__init__(self, lease, start, end, res, vmrr, transfers)
588

    
589
    def print_contents(self, loglevel=constants.LOGLEVEL_VDEBUG):
590
        self.logger.log(loglevel, "Type           : DISK IMAGE MIGRATION")
591
        self.logger.log(loglevel, "Transfers      : %s" % self.transfers)
592
        ResourceReservation.print_contents(self, loglevel)