Project

General

Profile

root / branches / 1.1 / src / haizea / core / scheduler / preparation_schedulers / imagetransfer.py @ 826

1
# -------------------------------------------------------------------------- #
2
# Copyright 2006-2009, University of Chicago                                 #
3
# Copyright 2008-2009, Distributed Systems Architecture Group, Universidad   #
4
# Complutense de Madrid (dsa-research.org)                                   #
5
#                                                                            #
6
# Licensed under the Apache License, Version 2.0 (the "License"); you may    #
7
# not use this file except in compliance with the License. You may obtain    #
8
# a copy of the License at                                                   #
9
#                                                                            #
10
# http://www.apache.org/licenses/LICENSE-2.0                                 #
11
#                                                                            #
12
# Unless required by applicable law or agreed to in writing, software        #
13
# distributed under the License is distributed on an "AS IS" BASIS,          #
14
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.   #
15
# See the License for the specific language governing permissions and        #
16
# limitations under the License.                                             #
17
# -------------------------------------------------------------------------- #
18

    
19
import haizea.common.constants as constants
20
from haizea.core.scheduler.preparation_schedulers import PreparationScheduler
21
from haizea.core.scheduler.slottable import ResourceReservation
22
from haizea.core.scheduler import MigrationResourceReservation, InconsistentLeaseStateError
23
from haizea.core.leases import Lease, Capacity, UnmanagedSoftwareEnvironment
24
from haizea.core.scheduler import ReservationEventHandler, NotSchedulableException, EarliestStartingTime
25
from haizea.common.utils import estimate_transfer_time, get_config
26
from mx.DateTime import TimeDelta
27

    
28
import bisect
29
import logging
30

    
31
class ImageTransferPreparationScheduler(PreparationScheduler):
32
    def __init__(self, slottable, resourcepool, deployment_enact):
33
        PreparationScheduler.__init__(self, slottable, resourcepool, deployment_enact)
34
        
35
        self.imagenode = self.deployment_enact.get_imagenode()
36
        
37
        self.transfers = []
38
        self.completed_transfers = []
39
        
40
        self.imagenode_bandwidth = self.deployment_enact.get_bandwidth()
41
        
42
        self.handlers ={}
43
        self.handlers[FileTransferResourceReservation] = ReservationEventHandler(
44
                                sched    = self,
45
                                on_start = ImageTransferPreparationScheduler._handle_start_filetransfer,
46
                                on_end   = ImageTransferPreparationScheduler._handle_end_filetransfer)
47

    
48
        self.handlers[DiskImageMigrationResourceReservation] = ReservationEventHandler(
49
                                sched    = self,
50
                                on_start = ImageTransferPreparationScheduler._handle_start_migrate,
51
                                on_end   = ImageTransferPreparationScheduler._handle_end_migrate)
52

    
53
    def schedule(self, lease, vmrr, earliest, nexttime):
54
        if type(lease.software) == UnmanagedSoftwareEnvironment:
55
            return [], True
56
        if lease.get_type() == Lease.ADVANCE_RESERVATION:
57
            return self.__schedule_deadline(lease, vmrr, earliest, nexttime)
58
        elif lease.get_type() in (Lease.BEST_EFFORT, Lease.IMMEDIATE):
59
            return self.__schedule_asap(lease, vmrr, earliest, nexttime)
60

    
61
    def schedule_migration(self, lease, vmrr, nexttime):
62
        if type(lease.software) == UnmanagedSoftwareEnvironment:
63
            return []
64
        
65
        # This code is the same as the one in vm_scheduler
66
        # Should be factored out
67
        last_vmrr = lease.get_last_vmrr()
68
        vnode_migrations = dict([(vnode, (last_vmrr.nodes[vnode], vmrr.nodes[vnode])) for vnode in vmrr.nodes])
69
        
70
        mustmigrate = False
71
        for vnode in vnode_migrations:
72
            if vnode_migrations[vnode][0] != vnode_migrations[vnode][1]:
73
                mustmigrate = True
74
                break
75
            
76
        if not mustmigrate:
77
            return []
78

    
79
        if get_config().get("migration") == constants.MIGRATE_YES_NOTRANSFER:
80
            start = nexttime
81
            end = nexttime
82
            res = {}
83
            migr_rr = DiskImageMigrationResourceReservation(lease, start, end, res, vmrr, vnode_migrations)
84
            migr_rr.state = ResourceReservation.STATE_SCHEDULED
85
            return [migr_rr]
86

    
87
        # Figure out what migrations can be done simultaneously
88
        migrations = []
89
        while len(vnode_migrations) > 0:
90
            pnodes = set()
91
            migration = {}
92
            for vnode in vnode_migrations:
93
                origin = vnode_migrations[vnode][0]
94
                dest = vnode_migrations[vnode][1]
95
                if not origin in pnodes and not dest in pnodes:
96
                    migration[vnode] = vnode_migrations[vnode]
97
                    pnodes.add(origin)
98
                    pnodes.add(dest)
99
            for vnode in migration:
100
                del vnode_migrations[vnode]
101
            migrations.append(migration)
102
        
103
        # Create migration RRs
104
        start = max(last_vmrr.post_rrs[-1].end, nexttime)
105
        bandwidth = self.resourcepool.info.get_migration_bandwidth()
106
        migr_rrs = []
107
        for m in migrations:
108
            mb_per_physnode = {}
109
            for vnode, (pnode_from, pnode_to) in m.items():
110
                mb_per_physnode[pnode_from] = mb_per_physnode.setdefault(pnode_from, 0) + lease.software.image_size
111
            max_mb_to_migrate = max(mb_per_physnode.values())
112
            migr_time = estimate_transfer_time(max_mb_to_migrate, bandwidth)
113
            end = start + migr_time
114
            res = {}
115
            for (origin,dest) in m.values():
116
                resorigin = Capacity([constants.RES_NETOUT])
117
                resorigin.set_quantity(constants.RES_NETOUT, bandwidth)
118
                resdest = Capacity([constants.RES_NETIN])
119
                resdest.set_quantity(constants.RES_NETIN, bandwidth)
120
                res[origin] = self.slottable.create_resource_tuple_from_capacity(resorigin)
121
                res[dest] = self.slottable.create_resource_tuple_from_capacity(resdest)                
122
            migr_rr = DiskImageMigrationResourceReservation(lease, start, start + migr_time, res, vmrr, m)
123
            migr_rr.state = ResourceReservation.STATE_SCHEDULED
124
            migr_rrs.append(migr_rr)
125
            start = end
126
        
127
        return migr_rrs
128

    
129
    def estimate_migration_time(self, lease):
130
        migration = get_config().get("migration")
131
        if migration == constants.MIGRATE_YES:
132
            bandwidth = self.resourcepool.info.get_migration_bandwidth()
133
            vmrr = lease.get_last_vmrr()
134
            #images_in_pnode = dict([(pnode,0) for pnode in set(vmrr.nodes.values())])
135
            transfer_time = estimate_transfer_time(lease.software.image_size, bandwidth) * len (vmrr.nodes)
136
            #for (vnode,pnode) in vmrr.nodes.items():
137
            #    images_in_pnode[pnode] += lease.software.image_size
138
            #max_to_transfer = max(images_in_pnode.values()) * 2 # Kludge
139
            return transfer_time
140
        elif migration == constants.MIGRATE_YES_NOTRANSFER:
141
            return TimeDelta(seconds=0)
142

    
143
    def find_earliest_starting_times(self, lease, nexttime):
144
        node_ids = [node.id for node in self.resourcepool.get_nodes()]  
145
        config = get_config()
146
        mechanism = config.get("transfer-mechanism")
147
        reusealg = config.get("diskimage-reuse")
148
        avoidredundant = config.get("avoid-redundant-transfers")
149
        
150
        if type(lease.software) == UnmanagedSoftwareEnvironment:
151
            earliest = {}
152
            for node in node_ids:
153
                earliest[node] = EarliestStartingTime(nexttime, EarliestStartingTime.EARLIEST_NOPREPARATION)
154
            return earliest
155
        
156
        # Figure out earliest times assuming we have to transfer the images
157
        transfer_duration = self.__estimate_image_transfer_time(lease, self.imagenode_bandwidth)
158
        if mechanism == constants.TRANSFER_UNICAST:
159
            transfer_duration *= lease.numnodes
160
        start = self.__get_next_transfer_slot(nexttime, transfer_duration)
161
        earliest = {}
162
        for node in node_ids:
163
            earliest[node] = ImageTransferEarliestStartingTime(start + transfer_duration, ImageTransferEarliestStartingTime.EARLIEST_IMAGETRANSFER)
164
            earliest[node].transfer_start = start
165
                
166
        # Check if we can reuse images
167
        if reusealg == constants.REUSE_IMAGECACHES:
168
            nodeswithimg = self.resourcepool.get_nodes_with_reusable_image(lease.software.image_id)
169
            for node in nodeswithimg:
170
                earliest[node].time = nexttime
171
                earliest[node].type = ImageTransferEarliestStartingTime.EARLIEST_REUSE
172
        
173
                
174
        # Check if we can avoid redundant transfers
175
        if avoidredundant:
176
            if mechanism == constants.TRANSFER_UNICAST:
177
                # Piggybacking not supported if unicasting 
178
                # each individual image
179
                pass
180
            if mechanism == constants.TRANSFER_MULTICAST:                
181
                # We can only piggyback on transfers that haven't started yet
182
                transfers = [t for t in self.transfers if t.state == ResourceReservation.STATE_SCHEDULED]
183
                for t in transfers:
184
                    if t.file == lease.software.image_id:
185
                        start = t.end
186
                        if start > nexttime:
187
                            for n in earliest:
188
                                if start < earliest[n].time:
189
                                    earliest[n].time = start
190
                                    earliest[n].type = ImageTransferEarliestStartingTime.EARLIEST_PIGGYBACK
191
                                    earliest[n].piggybacking_on = t
192

    
193
        return earliest
194
            
195
    def cancel_preparation(self, lease):
196
        toremove = self.__remove_transfers(lease)     
197
        for t in toremove:
198
            t.lease.remove_preparationrr(t)
199
            self.slottable.remove_reservation(t)
200
        self.__remove_files(lease)
201
        
202
    def cleanup(self, lease):                
203
        self.__remove_files(lease)
204
  
205
        
206
    def __schedule_deadline(self, lease, vmrr, earliest, nexttime):
207
        config = get_config()
208
        reusealg = config.get("diskimage-reuse")
209
        avoidredundant = config.get("avoid-redundant-transfers")
210
        is_ready = False
211
            
212
        musttransfer = {}
213
        mustpool = {}
214
        nodeassignment = vmrr.nodes
215
        start = lease.start.requested
216
        end = lease.start.requested + lease.duration.requested
217
        for (vnode, pnode) in nodeassignment.items():
218
            lease_id = lease.id
219
            self.logger.debug("Scheduling image transfer of '%s' for vnode %i to physnode %i" % (lease.software.image_id, vnode, pnode))
220

    
221
            if reusealg == constants.REUSE_IMAGECACHES:
222
                if self.resourcepool.exists_reusable_image(pnode, lease.software.image_id, start):
223
                    self.logger.debug("No need to schedule an image transfer (reusing an image in pool)")
224
                    mustpool[vnode] = pnode                            
225
                else:
226
                    self.logger.debug("Need to schedule a transfer.")
227
                    musttransfer[vnode] = pnode
228
            else:
229
                self.logger.debug("Need to schedule a transfer.")
230
                musttransfer[vnode] = pnode
231

    
232
        if len(musttransfer) == 0:
233
            is_ready = True
234
            transfer_rrs = []
235
        else:
236
            try:
237
                transfer_rrs = self.__schedule_imagetransfer_edf(lease, musttransfer, earliest, nexttime)
238
            except NotSchedulableException, exc:
239
                raise
240
 
241
        # No chance of scheduling exception at this point. It's safe
242
        # to add entries to the pools
243
        if reusealg == constants.REUSE_IMAGECACHES:
244
            for (vnode, pnode) in mustpool.items():
245
                self.resourcepool.add_mapping_to_existing_reusable_image(pnode, lease.software.image_id, lease.id, vnode, start)
246
                self.resourcepool.add_diskimage(pnode, lease.software.image_id, lease.software.image_size, lease.id, vnode)
247
                
248
        return transfer_rrs, is_ready
249

    
250

    
251
    def __schedule_asap(self, lease, vmrr, earliest, nexttime):
252
        config = get_config()
253
        reusealg = config.get("diskimage-reuse")
254
        avoidredundant = config.get("avoid-redundant-transfers")
255

    
256
        is_ready = False
257

    
258
        transfer_rrs = []
259
        musttransfer = {}
260
        piggybacking = []
261
        for (vnode, pnode) in vmrr.nodes.items():
262
            earliest_type = earliest[pnode].type
263
            if earliest_type == ImageTransferEarliestStartingTime.EARLIEST_REUSE:
264
                # Add to pool
265
                self.logger.debug("Reusing image for V%i->P%i." % (vnode, pnode))
266
                self.resourcepool.add_mapping_to_existing_reusable_image(pnode, lease.software.image_id, lease.id, vnode, vmrr.end)
267
                self.resourcepool.add_diskimage(pnode, lease.software.image_id, lease.software.image_size, lease.id, vnode)
268
            elif earliest_type == ImageTransferEarliestStartingTime.EARLIEST_PIGGYBACK:
269
                # We can piggyback on an existing transfer
270
                transfer_rr = earliest[pnode].piggybacking_on
271
                transfer_rr.piggyback(lease.id, vnode, pnode)
272
                self.logger.debug("Piggybacking transfer for V%i->P%i on existing transfer in lease %i." % (vnode, pnode, transfer_rr.lease.id))
273
                piggybacking.append(transfer_rr)
274
            else:
275
                # Transfer
276
                musttransfer[vnode] = pnode
277
                self.logger.debug("Must transfer V%i->P%i." % (vnode, pnode))
278

    
279
        if len(musttransfer)>0:
280
            transfer_rrs = self.__schedule_imagetransfer_fifo(lease, musttransfer, earliest)
281
            
282
        if len(musttransfer)==0 and len(piggybacking)==0:
283
            is_ready = True
284
            
285
        return transfer_rrs, is_ready
286

    
287

    
288
    def __schedule_imagetransfer_edf(self, lease, musttransfer, earliest, nexttime):
289
        # Estimate image transfer time 
290
        bandwidth = self.deployment_enact.get_bandwidth()
291
        config = get_config()
292
        mechanism = config.get("transfer-mechanism")
293
        transfer_duration = self.__estimate_image_transfer_time(lease, bandwidth)
294
        if mechanism == constants.TRANSFER_UNICAST:
295
            transfer_duration *= len(musttransfer)
296

    
297
        # Determine start time
298
        start = self.__get_last_transfer_slot(lease.start.requested, transfer_duration)
299
        if start < nexttime:
300
            raise NotSchedulableException("Could not schedule the file transfer to complete in time.")
301
        
302
        res = {}
303
        resimgnode = Capacity([constants.RES_NETOUT])
304
        resimgnode.set_quantity(constants.RES_NETOUT, bandwidth)
305
        resnode = Capacity([constants.RES_NETIN])
306
        resnode.set_quantity(constants.RES_NETIN, bandwidth)
307
        res[self.imagenode.id] = self.slottable.create_resource_tuple_from_capacity(resimgnode)
308
        for pnode in musttransfer.values():
309
            res[pnode] = self.slottable.create_resource_tuple_from_capacity(resnode)
310
        
311
        newtransfer = FileTransferResourceReservation(lease, res)
312
        newtransfer.deadline = lease.start.requested
313
        newtransfer.state = ResourceReservation.STATE_SCHEDULED
314
        newtransfer.file = lease.software.image_id
315
        newtransfer.start = start
316
        newtransfer.end = start + transfer_duration
317
        for vnode, pnode in musttransfer.items():
318
            newtransfer.piggyback(lease.id, vnode, pnode)
319
        
320
        bisect.insort(self.transfers, newtransfer)
321
        
322
        return [newtransfer]
323
    
324
    def __schedule_imagetransfer_fifo(self, lease, musttransfer, earliest):
325
        # Estimate image transfer time 
326
        bandwidth = self.imagenode_bandwidth
327
        config = get_config()
328
        mechanism = config.get("transfer-mechanism")
329
        
330
        # The starting time is the first available slot, which was
331
        # included in the "earliest" dictionary.
332
        pnodes = musttransfer.values()
333
        start = earliest[pnodes[0]].transfer_start
334
        transfer_duration = self.__estimate_image_transfer_time(lease, bandwidth)
335
        
336
        res = {}
337
        resimgnode = Capacity([constants.RES_NETOUT])
338
        resimgnode.set_quantity(constants.RES_NETOUT, bandwidth)
339
        resnode = Capacity([constants.RES_NETIN])
340
        resnode.set_quantity(constants.RES_NETIN, bandwidth)
341
        res[self.imagenode.id] = self.slottable.create_resource_tuple_from_capacity(resimgnode)
342
        for n in musttransfer.values():
343
            res[n] = self.slottable.create_resource_tuple_from_capacity(resnode)
344
         
345
        newtransfer = FileTransferResourceReservation(lease, res)
346
        newtransfer.start = start
347
        if mechanism == constants.TRANSFER_UNICAST:
348
            newtransfer.end = start + (len(musttransfer) * transfer_duration)
349
        if mechanism == constants.TRANSFER_MULTICAST:
350
            newtransfer.end = start + transfer_duration
351
        
352
        newtransfer.deadline = None
353
        newtransfer.state = ResourceReservation.STATE_SCHEDULED
354
        newtransfer.file = lease.software.image_id
355
        for vnode, pnode in musttransfer.items():
356
            newtransfer.piggyback(lease.id, vnode, pnode)
357
            
358
        bisect.insort(self.transfers, newtransfer)
359
        
360
        return [newtransfer]
361
    
362
    
363
    def __estimate_image_transfer_time(self, lease, bandwidth):
364
        config = get_config()
365
        force_transfer_time = config.get("force-imagetransfer-time")
366
        if force_transfer_time != None:
367
            return force_transfer_time
368
        else:      
369
            return estimate_transfer_time(lease.software.image_size, bandwidth)    
370
    
371
    
372
    def __get_next_transfer_slot(self, nexttime, required_duration):
373
        # This can probably be optimized by using one of the many
374
        # "list of holes" algorithms out there
375
        if len(self.transfers) == 0:
376
            return nexttime
377
        elif nexttime + required_duration <= self.transfers[0].start:
378
            return nexttime
379
        else:
380
            for i in xrange(len(self.transfers) - 1):
381
                if self.transfers[i].end != self.transfers[i+1].start:
382
                    hole_duration = self.transfers[i+1].start - self.transfers[i].end
383
                    if hole_duration >= required_duration:
384
                        return self.transfers[i].end
385
            return self.transfers[-1].end
386
        
387
        
388
    def __get_last_transfer_slot(self, deadline, required_duration):
389
        # This can probably be optimized by using one of the many
390
        # "list of holes" algorithms out there
391
        if len(self.transfers) == 0:
392
            return deadline - required_duration
393
        elif self.transfers[-1].end + required_duration <= deadline:
394
            return deadline - required_duration
395
        else:
396
            for i in xrange(len(self.transfers) - 1, 0, -1):
397
                hole_start =  self.transfers[i-1].end
398
                hole_end = self.transfers[i].start
399
                if deadline > hole_start and deadline <= hole_end:
400
                    hole_duration = deadline - hole_start
401
                else:
402
                    hole_duration = hole_end - hole_start
403
                if hole_duration < required_duration or hole_start >= deadline:
404
                    # We're not interested in gaps after the deadline
405
                    # or gaps insufficient to support the transfer
406
                    pass
407
                else:
408
                    if deadline > hole_start and deadline <= hole_end:
409
                        return deadline - required_duration
410
                    else:
411
                        return hole_end - required_duration                    
412
            return self.transfers[0].start - required_duration
413

    
414
    def __remove_transfers(self, lease):
415
        toremove = []
416
        for t in self.transfers:
417
            for pnode in t.transfers:
418
                leases = [l for l, v in t.transfers[pnode]]
419
                if lease.id in leases:
420
                    newtransfers = [(l, v) for l, v in t.transfers[pnode] if l!=lease.id]
421
                    t.transfers[pnode] = newtransfers
422
            # Check if the transfer has to be cancelled
423
            a = sum([len(l) for l in t.transfers.values()])
424
            if a == 0:
425
                toremove.append(t)
426
        for t in toremove:
427
            self.transfers.remove(t)
428
            
429
        return toremove
430
    
431
    def __remove_files(self, lease):
432
        for vnode, pnode in lease.get_last_vmrr().nodes.items():
433
            self.resourcepool.remove_diskimage(pnode, lease.id, vnode)         
434

    
435
    @staticmethod
436
    def _handle_start_filetransfer(sched, lease, rr):
437
        sched.logger.debug("LEASE-%i Start of handleStartFileTransfer" % lease.id)
438
        lease.print_contents()
439
        lease_state = lease.get_state()
440
        if lease_state == Lease.STATE_SCHEDULED or lease_state == Lease.STATE_READY:
441
            lease.set_state(Lease.STATE_PREPARING)
442
            rr.state = ResourceReservation.STATE_ACTIVE
443
            # TODO: Enactment
444
        else:
445
            raise InconsistentLeaseStateError(lease, doing = "starting a file transfer")
446
            
447
        # TODO: Check for piggybacking
448
        
449
        lease.print_contents()
450
        sched.logger.debug("LEASE-%i End of handleStartFileTransfer" % lease.id)
451
        sched.logger.info("Starting image transfer for lease %i" % (lease.id))
452

    
453
    @staticmethod
454
    def _handle_end_filetransfer(sched, lease, rr):
455
        sched.logger.debug("LEASE-%i Start of handleEndFileTransfer" % lease.id)
456
        lease.print_contents()
457
        lease_state = lease.get_state()
458
        if lease_state == Lease.STATE_PREPARING:
459
            lease.set_state(Lease.STATE_READY)
460
            rr.state = ResourceReservation.STATE_DONE
461
            for physnode in rr.transfers:
462
                vnodes = rr.transfers[physnode]
463
 
464
#                # Find out timeout of image. It will be the latest end time of all the
465
#                # leases being used by that image.
466
#                leases = [l for (l, v) in vnodes]
467
#                maxend=None
468
#                for lease_id in leases:
469
#                    l = sched.leases.get_lease(lease_id)
470
#                    end = lease.get_endtime()
471
#                    if maxend==None or end>maxend:
472
#                        maxend=end
473
                maxend = None
474
                # TODO: ENACTMENT: Verify the image was transferred correctly
475
                sched._add_diskimages(physnode, rr.file, lease.software.image_size, vnodes, timeout=maxend)
476
        else:
477
            raise InconsistentLeaseStateError(lease, doing = "ending a file transfer")
478

    
479
        sched.transfers.remove(rr)
480
        lease.print_contents()
481
        sched.logger.debug("LEASE-%i End of handleEndFileTransfer" % lease.id)
482
        sched.logger.info("Completed image transfer for lease %i" % (lease.id))
483

    
484
    def _handle_start_migrate(self, l, rr):
485
        self.logger.debug("LEASE-%i Start of handleStartMigrate" % l.id)
486
        l.print_contents()
487
        rr.state = ResourceReservation.STATE_ACTIVE
488
        l.print_contents()
489
        self.logger.debug("LEASE-%i End of handleStartMigrate" % l.id)
490
        self.logger.info("Migrating lease %i..." % (l.id))
491

    
492
    def _handle_end_migrate(self, l, rr):
493
        self.logger.debug("LEASE-%i Start of handleEndMigrate" % l.id)
494
        l.print_contents()
495

    
496
        for vnode in rr.transfers:
497
            origin = rr.transfers[vnode][0]
498
            dest = rr.transfers[vnode][1]
499
            
500
            self.resourcepool.remove_diskimage(origin, l.id, vnode)
501
            self.resourcepool.add_diskimage(dest, l.software.image_id, l.software.image_size, l.id, vnode)
502
        
503
        rr.state = ResourceReservation.STATE_DONE
504
        l.print_contents()
505
        self.logger.debug("LEASE-%i End of handleEndMigrate" % l.id)
506
        self.logger.info("Migrated lease %i..." % (l.id))
507
        
508
    def _add_diskimages(self, pnode_id, diskimage_id, diskimage_size, vnodes, timeout):
509
        self.logger.debug("Adding image for leases=%s in nod_id=%i" % (vnodes, pnode_id))
510

    
511
        config = get_config()
512
        reusealg = config.get("diskimage-reuse")
513
        if reusealg == constants.REUSE_IMAGECACHES:
514
            maxcachesize = config.get("diskimage-cache-size")
515
        else:
516
            maxcachesize = None
517
            
518
        pnode = self.resourcepool.get_node(pnode_id)
519

    
520
        if reusealg == constants.REUSE_NONE:
521
            for (lease_id, vnode) in vnodes:
522
                self.resourcepool.add_diskimage(pnode_id, diskimage_id, diskimage_size, lease_id, vnode)
523
        elif reusealg == constants.REUSE_IMAGECACHES:
524
            # Sometimes we might find that the image is already deployed
525
            # (although unused). In that case, don't add another copy to
526
            # the pool. Just "reactivate" it.
527
            if pnode.exists_reusable_image(diskimage_id):
528
                for (lease_id, vnode) in vnodes:
529
                    pnode.add_mapping_to_existing_reusable_image(diskimage_id, lease_id, vnode, timeout)
530
            else:
531
                if maxcachesize == constants.CACHESIZE_UNLIMITED:
532
                    can_add_to_cache = True
533
                else:
534
                    # We may have to remove images from the cache
535
                    cachesize = pnode.get_reusable_images_size()
536
                    reqsize = cachesize + diskimage_size
537
                    if reqsize > maxcachesize:
538
                        # Have to shrink cache
539
                        desiredsize = maxcachesize - diskimage_size
540
                        self.logger.debug("Adding the image would make the size of pool in node %i = %iMB. Will try to bring it down to %i" % (pnode_id, reqsize, desiredsize))
541
                        pnode.print_files()
542
                        success = pnode.purge_downto(maxcachesize)
543
                        if not success:
544
                            can_add_to_cache = False
545
                        else:
546
                            can_add_to_cache = True
547
                    else:
548
                        can_add_to_cache = True
549
                        
550
                if can_add_to_cache:
551
                    self.resourcepool.add_reusable_image(pnode_id, diskimage_id, diskimage_size, vnodes, timeout)
552
                else:
553
                    # This just means we couldn't add the image
554
                    # to the pool. We will have to make do with just adding the tainted images.
555
                    self.logger.debug("Unable to add to pool. Must create individual disk images directly instead.")
556
                    
557
            # Besides adding the image to the cache, we need to create a separate image for
558
            # this specific lease
559
            for (lease_id, vnode) in vnodes:
560
                self.resourcepool.add_diskimage(pnode_id, diskimage_id, diskimage_size, lease_id, vnode)
561
                    
562
        pnode.print_files()
563

    
564
class FileTransferResourceReservation(ResourceReservation):
565
    def __init__(self, lease, res, start=None, end=None):
566
        ResourceReservation.__init__(self, lease, start, end, res)
567
        self.deadline = None
568
        self.file = None
569
        # Dictionary of  physnode -> [ (lease_id, vnode)* ]
570
        self.transfers = {}
571

    
572
    def print_contents(self, loglevel="VDEBUG"):
573
        ResourceReservation.print_contents(self, loglevel)
574
        logger = logging.getLogger("LEASES")
575
        logger.log(loglevel, "Type           : FILE TRANSFER")
576
        logger.log(loglevel, "Deadline       : %s" % self.deadline)
577
        logger.log(loglevel, "File           : %s" % self.file)
578
        logger.log(loglevel, "Transfers      : %s" % self.transfers)
579
        
580
    def piggyback(self, lease_id, vnode, physnode):
581
        if self.transfers.has_key(physnode):
582
            self.transfers[physnode].append((lease_id, vnode))
583
        else:
584
            self.transfers[physnode] = [(lease_id, vnode)]
585
            
586
    def is_preemptible(self):
587
        return False       
588
    
589
    def __cmp__(self, rr):
590
        return cmp(self.start, rr.start)
591
    
592
class ImageTransferEarliestStartingTime(EarliestStartingTime):
593
    EARLIEST_IMAGETRANSFER = 2
594
    EARLIEST_REUSE = 3
595
    EARLIEST_PIGGYBACK = 4
596
    
597
    def __init__(self, time, type):
598
        EarliestStartingTime.__init__(self, time, type)
599
        self.transfer_start = None
600
        self.piggybacking_on = None
601

    
602
class DiskImageMigrationResourceReservation(MigrationResourceReservation):
603
    def __init__(self, lease, start, end, res, vmrr, transfers):
604
        MigrationResourceReservation.__init__(self, lease, start, end, res, vmrr, transfers)
605

    
606
    def print_contents(self, loglevel=constants.LOGLEVEL_VDEBUG):
607
        logger = logging.getLogger("LEASES")
608
        logger.log(loglevel, "Type           : DISK IMAGE MIGRATION")
609
        logger.log(loglevel, "Transfers      : %s" % self.transfers)
610
        ResourceReservation.print_contents(self, loglevel)