Project

General

Profile

root / branches / 1.1 / src / haizea / core / scheduler / vm_scheduler.py @ 849

1
# -------------------------------------------------------------------------- #
2
# Copyright 2006-2009, University of Chicago                                 #
3
# Copyright 2008-2009, Distributed Systems Architecture Group, Universidad   #
4
# Complutense de Madrid (dsa-research.org)                                   #
5
#                                                                            #
6
# Licensed under the Apache License, Version 2.0 (the "License"); you may    #
7
# not use this file except in compliance with the License. You may obtain    #
8
# a copy of the License at                                                   #
9
#                                                                            #
10
# http://www.apache.org/licenses/LICENSE-2.0                                 #
11
#                                                                            #
12
# Unless required by applicable law or agreed to in writing, software        #
13
# distributed under the License is distributed on an "AS IS" BASIS,          #
14
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.   #
15
# See the License for the specific language governing permissions and        #
16
# limitations under the License.                                             #
17
# -------------------------------------------------------------------------- #
18

    
19
"""This module provides the main classes for Haizea's VM Scheduler. All the
20
scheduling code that decides when and where a lease is scheduled is contained
21
in the VMScheduler class (except for the code that specifically decides
22
what physical machines each virtual machine is mapped to, which is factored out
23
into the "mapper" module). This module also provides the classes for the
24
reservations that will be placed in the slot table and correspond to VMs. 
25
"""
26

    
27
import haizea.common.constants as constants
28
from haizea.common.utils import round_datetime_delta, round_datetime, estimate_transfer_time, pretty_nodemap, get_config, get_clock, get_persistence, compute_suspend_resume_time
29
from haizea.core.leases import Lease, Capacity
30
from haizea.core.scheduler.slottable import ResourceReservation, ResourceTuple
31
from haizea.core.scheduler import ReservationEventHandler, RescheduleLeaseException, NormalEndLeaseException, EnactmentError, NotSchedulableException, InconsistentScheduleError, InconsistentLeaseStateError, MigrationResourceReservation
32
from operator import attrgetter, itemgetter
33
from mx.DateTime import TimeDelta
34

    
35
import logging
36
import operator
37

    
38

    
39
class VMScheduler(object):
40
    """The Haizea VM Scheduler
41
    
42
    This class is responsible for taking a lease and scheduling VMs to satisfy
43
    the requirements of that lease.
44
    """
45
    
46
    def __init__(self, slottable, resourcepool, mapper, max_in_future):
47
        """Constructor
48
        
49
        The constructor does little more than create the VM scheduler's
50
        attributes. However, it does expect (in the arguments) a fully-constructed 
51
        SlotTable, ResourcePool, and Mapper (these are constructed in the 
52
        Manager's constructor). 
53
        
54
        Arguments:
55
        slottable -- Slot table
56
        resourcepool -- Resource pool where enactment commands will be sent to
57
        mapper -- Mapper
58
        """        
59
        self.slottable = slottable
60
        self.resourcepool = resourcepool
61
        self.mapper = mapper
62
        self.logger = logging.getLogger("VMSCHED")
63
        
64
        # Register the handlers for the types of reservations used by
65
        # the VM scheduler
66
        self.handlers = {}
67
        self.handlers[VMResourceReservation] = ReservationEventHandler(
68
                                sched    = self,
69
                                on_start = VMScheduler._handle_start_vm,
70
                                on_end   = VMScheduler._handle_end_vm)
71

    
72
        self.handlers[ShutdownResourceReservation] = ReservationEventHandler(
73
                                sched    = self,
74
                                on_start = VMScheduler._handle_start_shutdown,
75
                                on_end   = VMScheduler._handle_end_shutdown)
76

    
77
        self.handlers[SuspensionResourceReservation] = ReservationEventHandler(
78
                                sched    = self,
79
                                on_start = VMScheduler._handle_start_suspend,
80
                                on_end   = VMScheduler._handle_end_suspend)
81

    
82
        self.handlers[ResumptionResourceReservation] = ReservationEventHandler(
83
                                sched    = self,
84
                                on_start = VMScheduler._handle_start_resume,
85
                                on_end   = VMScheduler._handle_end_resume)
86

    
87
        self.handlers[MemImageMigrationResourceReservation] = ReservationEventHandler(
88
                                sched    = self,
89
                                on_start = VMScheduler._handle_start_migrate,
90
                                on_end   = VMScheduler._handle_end_migrate)
91
        
92
        self.max_in_future = max_in_future
93
        
94
        self.future_leases = set()
95

    
96

    
97
    def schedule(self, lease, duration, nexttime, earliest, override_state = None):
98
        """ The scheduling function
99
        
100
        This particular function doesn't do much except call __schedule_asap
101
        and __schedule_exact (which do all the work).
102
        
103
        Arguments:
104
        lease -- Lease to schedule
105
        nexttime -- The next time at which the scheduler can allocate resources.
106
        earliest -- The earliest possible starting times on each physical node
107
        """        
108
        if lease.get_type() == Lease.BEST_EFFORT:
109
            return self.__schedule_asap(lease, duration, nexttime, earliest, allow_in_future = self.can_schedule_in_future())
110
        elif lease.get_type() == Lease.ADVANCE_RESERVATION:
111
            return self.__schedule_exact(lease, duration, nexttime, earliest)
112
        elif lease.get_type() == Lease.IMMEDIATE:
113
            return self.__schedule_asap(lease, duration, nexttime, earliest, allow_in_future = False)
114
        elif lease.get_type() == Lease.DEADLINE:
115
            return self.__schedule_deadline(lease, duration, nexttime, earliest, override_state)
116

    
117

    
118
    def estimate_migration_time(self, lease):
119
        """ Estimates the time required to migrate a lease's VMs
120

121
        This function conservatively estimates that all the VMs are going to
122
        be migrated to other nodes. Since all the transfers are intra-node,
123
        the bottleneck is the transfer from whatever node has the most
124
        memory to transfer.
125
        
126
        Note that this method only estimates the time to migrate the memory
127
        state files for the VMs. Migrating the software environment (which may
128
        or may not be a disk image) is the responsibility of the preparation
129
        scheduler, which has it's own set of migration scheduling methods.
130

131
        Arguments:
132
        lease -- Lease that might be migrated
133
        """                
134
        migration = get_config().get("migration")
135
        if migration == constants.MIGRATE_YES:
136
            bandwidth = self.resourcepool.info.get_migration_bandwidth()            
137
            vmrr = lease.get_last_vmrr()
138
            #mem_in_pnode = dict([(pnode,0) for pnode in set(vmrr.nodes.values())])
139
            transfer_time = 0
140
            for pnode in vmrr.nodes.values():
141
                mem = vmrr.resources_in_pnode[pnode].get_by_type(constants.RES_MEM)
142
                transfer_time += estimate_transfer_time(mem, bandwidth)
143
            return transfer_time
144
        elif migration == constants.MIGRATE_YES_NOTRANSFER:
145
            return TimeDelta(seconds=0)        
146

    
147
    def schedule_migration(self, lease, vmrr, nexttime):
148
        """ Schedules migrations for a lease
149

150
        Arguments:
151
        lease -- Lease being migrated
152
        vmrr -- The VM reservation before which the migration will take place
153
        nexttime -- The next time at which the scheduler can allocate resources.
154
        """
155
        
156
        # Determine what migrations have to be done.
157
        last_vmrr = lease.get_last_vmrr()
158
        
159
        vnode_mappings = self.resourcepool.get_ram_image_mappings(lease)
160
        vnode_migrations = dict([(vnode, (vnode_mappings[vnode], vmrr.nodes[vnode])) for vnode in vmrr.nodes if vnode_mappings[vnode] != vmrr.nodes[vnode]])
161
        
162
        # Determine if we actually have to migrate
163
        mustmigrate = False
164
        for vnode in vnode_migrations:
165
            if vnode_migrations[vnode][0] != vnode_migrations[vnode][1]:
166
                mustmigrate = True
167
                break
168
            
169
        if not mustmigrate:
170
            return []
171

    
172
        # If Haizea is configured to migrate without doing any transfers,
173
        # then we just return a nil-duration migration RR
174
        if get_config().get("migration") == constants.MIGRATE_YES_NOTRANSFER:
175
            start = nexttime
176
            end = nexttime
177
            res = {}
178
            migr_rr = MemImageMigrationResourceReservation(lease, start, end, res, vmrr, vnode_migrations)
179
            migr_rr.state = ResourceReservation.STATE_SCHEDULED
180
            return [migr_rr]
181

    
182
        # Figure out what migrations can be done simultaneously
183
        migrations = []
184
        while len(vnode_migrations) > 0:
185
            pnodes = set()
186
            migration = {}
187
            for vnode in vnode_migrations:
188
                origin = vnode_migrations[vnode][0]
189
                dest = vnode_migrations[vnode][1]
190
                if not origin in pnodes and not dest in pnodes:
191
                    migration[vnode] = vnode_migrations[vnode]
192
                    pnodes.add(origin)
193
                    pnodes.add(dest)
194
            for vnode in migration:
195
                del vnode_migrations[vnode]
196
            migrations.append(migration)
197
        
198
        # Create migration RRs
199
        start = max(last_vmrr.post_rrs[-1].end, nexttime)
200
        bandwidth = self.resourcepool.info.get_migration_bandwidth()
201
        migr_rrs = []
202
        for m in migrations:
203
            vnodes_to_migrate = m.keys()
204
            max_mem_to_migrate = max([lease.requested_resources[vnode].get_quantity(constants.RES_MEM) for vnode in vnodes_to_migrate])
205
            migr_time = estimate_transfer_time(max_mem_to_migrate, bandwidth)
206
            end = start + migr_time
207
            res = {}
208
            for (origin,dest) in m.values():
209
                resorigin = Capacity([constants.RES_NETOUT])
210
                resorigin.set_quantity(constants.RES_NETOUT, bandwidth)
211
                resdest = Capacity([constants.RES_NETIN])
212
                resdest.set_quantity(constants.RES_NETIN, bandwidth)
213
                res[origin] = self.slottable.create_resource_tuple_from_capacity(resorigin)
214
                res[dest] = self.slottable.create_resource_tuple_from_capacity(resdest)                
215
            migr_rr = MemImageMigrationResourceReservation(lease, start, start + migr_time, res, vmrr, m)
216
            migr_rr.state = ResourceReservation.STATE_SCHEDULED
217
            migr_rrs.append(migr_rr)
218
            start = end
219
            
220
        return migr_rrs
221

    
222
    def cancel_vm(self, vmrr):
223
        """ Cancels a VM resource reservation
224

225
        Arguments:
226
        vmrr -- VM RR to be cancelled
227
        """         
228
        
229
        # If this VM RR is part of a lease that was scheduled in the future,
230
        # remove that lease from the set of future leases.
231
        if vmrr.lease in self.future_leases:
232
            self.future_leases.remove(vmrr.lease)
233
            get_persistence().persist_future_leases(self.future_leases)
234

    
235
        # If there are any pre-RRs that are scheduled or active, remove them
236
        for rr in vmrr.pre_rrs:
237
            if rr.state != ResourceReservation.STATE_DONE:
238
                self.slottable.remove_reservation(rr)
239

    
240
        # If there are any post RRs, remove them
241
        for rr in vmrr.post_rrs:
242
            self.slottable.remove_reservation(rr)
243
        
244
        # Remove the reservation itself
245
        self.slottable.remove_reservation(vmrr)
246

    
247

    
248
    def can_suspend_at(self, lease, t, nexttime=None):
249
        """ Determines if it is possible to suspend a lease before a given time
250

251
        Arguments:
252
        vmrr -- VM RR to be preempted
253
        t -- Time by which the VM must be preempted
254
        """
255
        vmrr = lease.get_vmrr_at(t)
256
        if t < vmrr.start:
257
            return False # t could be during a resume
258
        by_t = min(vmrr.end, t) # t could be during a suspend
259
        if nexttime == None:
260
            time_until_suspend = t - vmrr.start
261
        else:
262
            time_until_suspend = min( (t - vmrr.start, t - nexttime))
263
        min_duration = self.__compute_scheduling_threshold(lease)
264
        can_suspend = time_until_suspend >= min_duration       
265
        return can_suspend
266
 
267
    
268
    def preempt_vm(self, vmrr, t):
269
        """ Preempts a VM reservation at a given time
270

271
        This method assumes that the lease is, in fact, preemptable,
272
        that the VMs are running at the given time, and that there is 
273
        enough time to suspend the VMs before the given time (all these
274
        checks are done in the lease scheduler).
275
        
276
        Arguments:
277
        vmrr -- VM RR to be preempted
278
        t -- Time by which the VM must be preempted
279
        """             
280
        
281
        # Save original start and end time of the vmrr
282
        old_start = vmrr.start
283
        old_end = vmrr.end
284
        
285
        # Schedule the VM suspension
286
        self.__schedule_suspension(vmrr, t)
287
        
288
        # Update the VMRR in the slot table
289
        self.slottable.update_reservation(vmrr, old_start, old_end)
290
        
291
        # Add the suspension RRs to the VM's post-RRs
292
        for susprr in vmrr.post_rrs:
293
            self.slottable.add_reservation(susprr)
294
            
295
            
296
    def get_future_reschedulable_leases(self):
297
        """ Returns a list of future leases that are reschedulable.
298

299
        Currently, this list is just the best-effort leases scheduled
300
        in the future as determined by the backfilling algorithm.
301
        Advance reservation leases, by their nature, cannot be 
302
        rescheduled to find a "better" starting time.
303
        """             
304
        return list(self.future_leases)
305
    
306

    
307
    def can_schedule_in_future(self):
308
        """ Returns True if the backfilling algorithm would allow a lease
309
        to be scheduled in the future.
310

311
        """             
312
        if self.max_in_future == -1: # Unlimited
313
            return True
314
        else:
315
            return len(self.future_leases) < self.max_in_future
316

    
317
        
318
    def get_utilization(self, time):
319
        """ Computes resource utilization (currently just CPU-based)
320
        
321
        This utilization information shows what 
322
        portion of the physical resources is used by each type of reservation 
323
        (e.g., 70% are running a VM, 5% are doing suspensions, etc.)
324

325
        Arguments:
326
        time -- Time at which to determine utilization
327
        """         
328
        total = self.slottable.get_total_capacity(restype = constants.RES_CPU)
329
        util = {}
330
        reservations = self.slottable.get_reservations_at(time)
331
        for r in reservations:
332
            for node in r.resources_in_pnode:
333
                if isinstance(r, VMResourceReservation):
334
                    use = r.resources_in_pnode[node].get_by_type(constants.RES_CPU)
335
                    util[type(r)] = use + util.setdefault(type(r),0.0)
336
                elif isinstance(r, SuspensionResourceReservation) or isinstance(r, ResumptionResourceReservation) or isinstance(r, ShutdownResourceReservation):
337
                    use = r.vmrr.resources_in_pnode[node].get_by_type(constants.RES_CPU)
338
                    util[type(r)] = 0.0
339
                    #util[type(r)] = use + util.setdefault(type(r),0.0)
340
        util[None] = total - sum(util.values())
341

    
342
        if total != 0:
343
            for k in util:
344
                util[k] /= total
345

    
346
        return util
347
        
348

    
349
    def __schedule_exact(self, lease, duration, nexttime, earliest):
350
        """ Schedules VMs that must start at an exact time
351
        
352
        This type of lease is "easy" to schedule because we know the exact
353
        start time, which means that's the only starting time we have to
354
        check. So, this method does little more than call the mapper.
355
        
356
        Arguments:
357
        lease -- Lease to schedule
358
        nexttime -- The next time at which the scheduler can allocate resources.
359
        earliest -- The earliest possible starting times on each physical node
360
        """             
361

    
362
        # Determine the start and end time
363
        shutdown_time = lease.estimate_shutdown_time()
364
        start = lease.start.requested
365
        end = start + lease.duration.requested + shutdown_time
366
        
367
        # Convert Capacity objects in lease object into ResourceTuples that
368
        # we can hand over to the mapper.
369
        requested_resources = dict([(k,self.slottable.create_resource_tuple_from_capacity(v)) for k,v in lease.requested_resources.items()])
370

    
371
        # Let the mapper do its magiv
372
        mapping, actualend, preemptions = self.mapper.map(lease, 
373
                                                          requested_resources,
374
                                                          start, 
375
                                                          end, 
376
                                                          strictend = True,
377
                                                          allow_preemption = True)
378

    
379
        # If no mapping was found, tell the lease scheduler about it
380
        if mapping == None:
381
            raise NotSchedulableException, "Not enough resources in specified interval"
382
        
383
        # Create VM resource reservations
384
        res = {}
385
        
386
        for (vnode,pnode) in mapping.items():
387
            vnode_res = requested_resources[vnode]
388
            if res.has_key(pnode):
389
                res[pnode].incr(vnode_res)
390
            else:
391
                res[pnode] = ResourceTuple.copy(vnode_res)
392
        
393
        vmrr = VMResourceReservation(lease, start, end, mapping, res)
394
        vmrr.state = ResourceReservation.STATE_SCHEDULED
395

    
396
        # Schedule shutdown for the VM
397
        self.__schedule_shutdown(vmrr)
398

    
399
        return vmrr, preemptions
400

    
401

    
402
    def __schedule_asap(self, lease, duration, nexttime, earliest, allow_in_future = None, override_state = None):
403
        """ Schedules VMs as soon as possible
404
        
405
        This method is a bit more complex that __schedule_exact because
406
        we need to figure out what "as soon as possible" actually is.
407
        This involves attempting several mappings, at different points
408
        in time, before we can schedule the lease.
409
        
410
        This method will always check, at least, if the lease can be scheduled
411
        at the earliest possible moment at which the lease could be prepared
412
        (e.g., if the lease can't start until 1 hour in the future because that's
413
        the earliest possible time at which the disk images it requires can
414
        be transferred, then that's when the scheduler will check). Note, however,
415
        that this "earliest possible moment" is determined by the preparation
416
        scheduler.
417
        
418
        Additionally, if the lease can't be scheduled at the earliest
419
        possible moment, it can also check if the lease can be scheduled
420
        in the future. This partially implements a backfilling algorithm
421
        (the maximum number of future leases is stored in the max_in_future
422
        attribute of VMScheduler), the other part being implemented in the
423
        __process_queue method of LeaseScheduler.
424
        
425
        Note that, if the method is allowed to scheduled in the future,
426
        and assuming that the lease doesn't request more resources than
427
        the site itself, this method will always schedule the VMs succesfully
428
        (since there's always an empty spot somewhere in the future).
429
        
430
        
431
        Arguments:
432
        lease -- Lease to schedule
433
        nexttime -- The next time at which the scheduler can allocate resources.
434
        earliest -- The earliest possible starting times on each physical node
435
        allow_in_future -- Boolean indicating whether the scheduler is
436
        allowed to schedule the VMs in the future.
437
        """                
438
        
439
        #
440
        # STEP 1: PROLEGOMENA
441
        #
442
        
443
        lease_id = lease.id
444
        remaining_duration = duration
445
        shutdown_time = lease.estimate_shutdown_time()
446
        
447
        if override_state != None:
448
            state = override_state
449
        else:
450
            state = lease.get_state()
451
        
452
        # We might be scheduling a suspended lease. If so, we will
453
        # also have to schedule its resumption. Right now, just 
454
        # figure out if this is such a lease.
455
        mustresume = (state in (Lease.STATE_SUSPENDED_PENDING, Lease.STATE_SUSPENDED_QUEUED, Lease.STATE_SUSPENDED_SCHEDULED))
456

    
457
        # This is the minimum duration that we must be able to schedule.
458
        # See __compute_scheduling_threshold for more details.
459
        min_duration = self.__compute_scheduling_threshold(lease)
460
        
461

    
462
        #
463
        # STEP 2: FIND THE CHANGEPOINTS
464
        #
465

    
466
        # Find the changepoints, and the available nodes at each changepoint
467
        # We need to do this because the preparation scheduler may have
468
        # determined that some nodes might require more time to prepare
469
        # than others (e.g., if using disk image caching, some nodes
470
        # might have the required disk image predeployed, while others
471
        # may require transferring the image to that node).
472
        # 
473
        # The end result of this step is a list (cps) where each entry
474
        # is a (t,nodes) pair, where "t" is the time of the changepoint
475
        # and "nodes" is the set of nodes that are available at that time.
476
        
477
        if not mustresume:
478
            # If this is not a suspended lease, then the changepoints
479
            # are determined based on the "earliest" parameter.
480
            cps = [(node, e.time) for node, e in earliest.items()]
481
            cps.sort(key=itemgetter(1))
482
            curcp = None
483
            changepoints = []
484
            nodes = []
485
            for node, time in cps:
486
                nodes.append(node)
487
                if time != curcp:
488
                    changepoints.append([time, set(nodes)])
489
                    curcp = time
490
                else:
491
                    changepoints[-1][1] = set(nodes)
492
        else:
493
            # If the lease is suspended, we take into account that, if
494
            # migration is disabled, we can only schedule the lease
495
            # on the nodes it is currently scheduled on.
496
            if get_config().get("migration") == constants.MIGRATE_NO:
497
                vmrr = lease.get_last_vmrr()
498
                onlynodes = set(vmrr.nodes.values())
499
            else:
500
                onlynodes = None               
501
            changepoints = list(set([x.time for x in earliest.values()]))
502
            changepoints.sort()
503
            changepoints = [(x, onlynodes) for x in changepoints]
504

    
505
        # If we can schedule VMs in the future,
506
        # we also consider future changepoints
507
        if allow_in_future:
508
            res = self.slottable.get_reservations_ending_after(changepoints[-1][0])
509
            # We really only care about changepoints where VMs end (which is
510
            # when resources become available)
511
            futurecp = [r.get_final_end() for r in res if isinstance(r, VMResourceReservation)]
512
            # Corner case: Sometimes we're right in the middle of a ShutdownReservation, so it won't be
513
            # included in futurecp.
514
            futurecp += [r.end for r in res if isinstance(r, ShutdownResourceReservation) and not r.vmrr in res]
515
            if not mustresume:
516
                futurecp = [(p,None) for p in futurecp]
517
            else:
518
                futurecp = [(p,onlynodes) for p in futurecp]                
519
        else:
520
            futurecp = []
521
            
522
        futurecp.sort()
523

    
524
        if lease.deadline != None:
525
            changepoints = [cp for cp in changepoints if cp[0] <= lease.deadline - duration]
526
            futurecp = [cp for cp in futurecp if cp[0] <= lease.deadline - duration]
527

    
528
        #
529
        # STEP 3: FIND A MAPPING
530
        #
531
        
532
        # In this step we find a starting time and a mapping for the VMs,
533
        # which involves going through the changepoints in order and seeing
534
        # if we can find a mapping.
535
        # Most of the work is done in the __find_fit_at_points
536
        
537
        # If resuming, we also have to allocate enough time for the resumption
538
        if mustresume:
539
            duration = remaining_duration + lease.estimate_resume_time()
540
        else:
541
            duration = remaining_duration
542

    
543
        duration += shutdown_time
544

    
545
        in_future = False
546

    
547
        # Convert Capacity objects in lease object into ResourceTuples that
548
        # we can hand over to the mapper.
549
        requested_resources = dict([(k,self.slottable.create_resource_tuple_from_capacity(v)) for k,v in lease.requested_resources.items()])
550

    
551
        # First, try to find a mapping assuming we can't schedule in the future
552
        start, end, mapping, preemptions = self.__find_fit_at_points(lease,
553
                                                                     requested_resources,
554
                                                                     changepoints, 
555
                                                                     duration, 
556
                                                                     min_duration,
557
                                                                     shutdown_time)
558
        
559
        if start == None and not allow_in_future:
560
            # We did not find a suitable starting time. This can happen
561
            # if we're unable to schedule in the future
562
            raise NotSchedulableException, "Could not find enough resources for this request"
563

    
564
        # If we haven't been able to fit the lease, check if we can
565
        # reserve it in the future
566
        if start == None and allow_in_future:
567
            start, end, mapping, preemptions = self.__find_fit_at_points(lease,
568
                                                                         requested_resources,
569
                                                                         futurecp, 
570
                                                                         duration, 
571
                                                                         min_duration,
572
                                                                         shutdown_time
573
                                                                         )
574
            # TODO: The following will also raise an exception if a lease
575
            # makes a request that could *never* be satisfied with the
576
            # current resources.
577
            if start == None:
578
                if lease.deadline != None:
579
                    raise NotSchedulableException, "Could not find enough resources for this request before deadline"
580
                else:
581
                    raise InconsistentScheduleError, "Could not find a mapping in the future (this should not happen)"
582

    
583
            in_future = True
584

    
585
        #
586
        # STEP 4: CREATE RESERVATIONS
587
        #
588
        
589
        # At this point, the lease is feasible. We just need to create
590
        # the reservations for the VMs and, possibly, for the VM resumption,
591
        # suspension, and shutdown.    
592
        
593
        # VM resource reservation
594
        res = {}
595
        
596
        for (vnode,pnode) in mapping.items():
597
            vnode_res = requested_resources[vnode]
598
            if res.has_key(pnode):
599
                res[pnode].incr(vnode_res)
600
            else:
601
                res[pnode] = ResourceTuple.copy(vnode_res)
602

    
603
        vmrr = VMResourceReservation(lease, start, end, mapping, res)
604
        vmrr.state = ResourceReservation.STATE_SCHEDULED
605

    
606
        # VM resumption resource reservation
607
        if mustresume:
608
            self.__schedule_resumption(vmrr, start)
609

    
610
        # If the mapper couldn't find a mapping for the full duration
611
        # of the lease, then we need to schedule a suspension.
612
        mustsuspend = (vmrr.end - vmrr.start) - shutdown_time < remaining_duration
613
        if mustsuspend:
614
            self.__schedule_suspension(vmrr, end)
615
        else:
616
            # Compensate for any overestimation
617
            if (vmrr.end - vmrr.start) > remaining_duration + shutdown_time:
618
                vmrr.end = vmrr.start + remaining_duration + shutdown_time
619
            self.__schedule_shutdown(vmrr)
620
        
621
        if in_future:
622
            self.future_leases.add(lease)
623
            get_persistence().persist_future_leases(self.future_leases)
624

    
625
        susp_str = res_str = ""
626
        if mustresume:
627
            res_str = " (resuming)"
628
        if mustsuspend:
629
            susp_str = " (suspending)"
630
        self.logger.info("Lease #%i can be scheduled on nodes %s from %s%s to %s%s" % (lease.id, mapping.values(), start, res_str, end, susp_str))
631

    
632
        return vmrr, preemptions
633

    
634

    
635
    def __schedule_deadline(self, lease, duration, nexttime, earliest, override_state):   
636
        
637
        earliest_time = nexttime
638
        for n in earliest:
639
            earliest[n].time = max(lease.start.requested, earliest[n].time)
640
            earliest_time = max(earliest_time, earliest[n].time)
641

    
642
        slack = (lease.deadline - lease.start.requested) / lease.duration.requested
643
        if slack <= 2.0:
644
            try:
645
                self.logger.debug("Trying to schedule lease #%i as an advance reservation..." % lease.id)
646
                vmrr, preemptions = self.__schedule_exact(lease, duration, nexttime, earliest)
647
                # Don't return preemptions. They have already been preempted by the deadline mapper
648
                return vmrr, []
649
            except NotSchedulableException:
650
                self.logger.debug("Lease #%i cannot be scheduled as an advance reservation, trying as best-effort..." % lease.id)
651
                try:
652
                    vmrr, preemptions = self.__schedule_asap(lease, duration, nexttime, earliest, allow_in_future = True, override_state=override_state)
653
                except NotSchedulableException:
654
                    vmrr = None
655
                    preemptions = []
656
                if vmrr == None or vmrr.end - vmrr.start != duration or vmrr.end > lease.deadline or len(preemptions)>0:
657
                    self.logger.debug("Lease #%i cannot be scheduled before deadline using best-effort." % lease.id)
658
                    #raise NotSchedulableException, "Could not schedule before deadline without making other leases miss deadline"
659
                else:
660
                    return vmrr, preemptions
661
        else:
662
            self.logger.debug("Trying to schedule lease #%i as best-effort..." % lease.id)
663
            try:
664
                vmrr, preemptions = self.__schedule_asap(lease, duration, nexttime, earliest, allow_in_future = True, override_state=override_state)
665
            except NotSchedulableException:
666
                vmrr = None
667
                preemptions = []
668

    
669
        if vmrr == None or vmrr.end - vmrr.start != duration or vmrr.end > lease.deadline or len(preemptions)>0:
670
            self.logger.debug("Trying to schedule lease #%i by rescheduling other leases..." % lease.id)
671
            dirtynodes = set()
672
            dirtytime = earliest_time
673

    
674
            future_vmrrs = self.slottable.get_reservations_on_or_after(earliest_time)
675
            future_vmrrs.sort(key=operator.attrgetter("start"))
676
            future_vmrrs = [rr for rr in future_vmrrs 
677
                            if isinstance(rr, VMResourceReservation) 
678
                            and rr.state == ResourceReservation.STATE_SCHEDULED
679
                            and reduce(operator.and_, [(prerr.state == ResourceReservation.STATE_SCHEDULED) for prerr in rr.pre_rrs], True)]
680

    
681
            leases = list(set([future_vmrr.lease for future_vmrr in future_vmrrs]))
682

    
683
            self.slottable.push_state(leases)
684
            
685
            for future_vmrr in future_vmrrs:
686
                #print "REMOVE", future_vmrr.lease.id, future_vmrr.start, future_vmrr.end
687
                future_vmrr.lease.remove_vmrr(future_vmrr)
688
                self.cancel_vm(future_vmrr)
689
            
690
            orig_vmrrs = dict([(l,[rr for rr in future_vmrrs if rr.lease == l]) for l in leases])
691
            
692
            leases.append(lease)
693
            leases.sort(key= lambda l: (l.deadline - earliest_time) / l.get_remaining_duration_at(nexttime))
694

    
695
            new_vmrrs = {}
696

    
697
            self.logger.debug("Attempting to reschedule leases %s" % [l.id for l in leases])
698

    
699
            # First pass
700
            scheduled = set()
701
            for lease2 in leases:
702
                
703
                last_vmrr = lease2.get_last_vmrr()
704
                if last_vmrr != None and last_vmrr.is_suspending():
705
                    override_state = Lease.STATE_SUSPENDED_PENDING
706
                    l_earliest_time = max(last_vmrr.post_rrs[-1].end, earliest_time)
707
                else:
708
                    override_state = None
709
                    l_earliest_time = earliest_time
710
                    
711
                for n in earliest:
712
                    earliest[n].time = max(lease2.start.requested, l_earliest_time)
713
                    
714
                self.logger.debug("Rescheduling lease %s" % lease2.id)
715
                dur = lease2.get_remaining_duration_at(l_earliest_time)                
716

    
717
                try:
718
                    vmrr, preemptions = self.__schedule_asap(lease2, dur, nexttime, earliest, allow_in_future = True, override_state=override_state)
719
                except NotSchedulableException:
720
                    vmrr = None
721
                    preemptions = []
722
                if vmrr == None or vmrr.end - vmrr.start != dur or vmrr.end > lease2.deadline or len(preemptions) != 0:
723
                    self.logger.debug("Lease %s could not be rescheduled, undoing changes." % lease2.id)
724
                    self.slottable.pop_state()
725

    
726
                    raise NotSchedulableException, "Could not schedule before deadline without making other leases miss deadline"
727
                    
728
                dirtytime = max(vmrr.end, dirtytime)
729
                dirtynodes.update(vmrr.resources_in_pnode.keys())
730
                    
731
                for rr in vmrr.pre_rrs:
732
                    self.slottable.add_reservation(rr)                
733
                self.slottable.add_reservation(vmrr)
734
                for rr in vmrr.post_rrs:
735
                    self.slottable.add_reservation(rr)                    
736
                scheduled.add(lease2)
737
                if lease2 == lease:
738
                    return_vmrr = vmrr
739
                    break
740
                else:
741
                    new_vmrrs[lease2] = vmrr
742
            
743
            # We've scheduled the lease. Now we try to schedule the rest of the leases but,
744
            # since now we know the nodes the new lease is in, we can do a few optimizations
745
            # Restore the leases in nodes we haven't used, and that would not be
746
            # affected by the new lease. We need to find what this set of nodes is.
747
            
748
            to_schedule = [l for l in leases if l not in scheduled]
749
            dirtynodes, cleanleases = self.find_dirty_nodes(to_schedule, dirtynodes, orig_vmrrs)
750
            self.logger.debug("Rescheduling only leases on nodes %s" % dirtynodes)
751
            self.logger.debug("Leases %s can be skipped" % [l.id for l in cleanleases])
752
            
753
            # Restore the leases
754
            restored_leases = set()
755
            for l in leases:
756
                if l in cleanleases:
757
                    # Restore
758
                    for l_vmrr in orig_vmrrs[l]:
759
                        for rr in l_vmrr.pre_rrs:
760
                            self.slottable.add_reservation(rr)                
761
                        self.slottable.add_reservation(l_vmrr)
762
                        for rr in l_vmrr.post_rrs:
763
                            self.slottable.add_reservation(rr)   
764
                        l.append_vmrr(l_vmrr)
765
                        scheduled.add(l)
766
                        restored_leases.add(l)
767
                            
768
            to_schedule = [l for l in leases if l not in scheduled]
769
            try:
770
                (more_scheduled, add_vmrrs, dirtytime) = self.reschedule_deadline_leases(to_schedule, orig_vmrrs, earliest_time, earliest, nexttime, dirtytime)
771
                scheduled.update(more_scheduled)
772
                new_vmrrs.update(add_vmrrs)
773
            except NotSchedulableException:
774
                self.logger.debug("Lease %s could not be rescheduled, undoing changes." % l.id)
775
                self.slottable.pop_state()
776
                raise
777
                                
778
            self.slottable.pop_state(discard=True)
779

    
780
            for l in leases:
781
                if l not in scheduled:
782
                    for l_vmrr in orig_vmrrs[l]:
783
                        for rr in l_vmrr.pre_rrs:
784
                            self.slottable.add_reservation(rr)                
785
                        self.slottable.add_reservation(l_vmrr)
786
                        for rr in l_vmrr.post_rrs:
787
                            self.slottable.add_reservation(rr)
788
                        l.append_vmrr(l_vmrr)
789
                        restored_leases.add(l)
790
    
791
            for lease2, vmrr in new_vmrrs.items():
792
                lease2.append_vmrr(vmrr)
793
                    
794
            # Remove from slottable, because lease_scheduler is the one that actually
795
            # adds the RRs
796
            for rr in return_vmrr.pre_rrs:
797
                self.slottable.remove_reservation(rr)                
798
            self.slottable.remove_reservation(return_vmrr)
799
            for rr in return_vmrr.post_rrs:
800
                self.slottable.remove_reservation(rr)             
801
            
802
            for l in leases:
803
                if l in scheduled:
804
                    self.logger.vdebug("Lease %i after rescheduling:" % l.id)
805
                    l.print_contents()                   
806

    
807
            return return_vmrr, []
808
        else:
809
            return vmrr, preemptions
810

    
811

    
812
    def find_dirty_nodes(self, to_schedule, dirtynodes, orig_vmrrs):
813
        dirtynodes = set(dirtynodes)
814
        done = False
815
        while not done:
816
            stable = True
817
            cleanleases = set()            
818
            for l in to_schedule:
819
                pnodes = set()
820
                for l_vmrr in orig_vmrrs[l]:
821
                    pnodes.update(l_vmrr.resources_in_pnode.keys())
822
                in_dirty = dirtynodes & pnodes
823
                in_clean = pnodes - dirtynodes
824
                if len(in_dirty) > 0 and len(in_clean) > 0:
825
                    stable = False
826
                    dirtynodes.update(in_clean)
827
                if len(in_clean) > 0 and len(in_dirty) == 0:
828
                    cleanleases.add(l)
829
            if stable == True:
830
                done = True
831
                
832
        return dirtynodes, cleanleases
833

    
834

    
835
    def reschedule_deadline_leases(self, leases, orig_vmrrs, earliest_time, earliest, nexttime, dirtytime):
836
        scheduled = set()
837
        new_vmrrs = {}
838
        for l in leases:
839
            if len(scheduled) < len(leases) and dirtytime != None:
840
                min_future_start = min([min([rr.start for rr in lrr]) for l2, lrr in orig_vmrrs.items() if l2 in leases and l2 not in scheduled])
841
                if min_future_start > dirtytime:
842
                    break
843
                
844
            last_vmrr = l.get_last_vmrr()
845
            if last_vmrr != None and last_vmrr.is_suspending():
846
                override_state = Lease.STATE_SUSPENDED_PENDING
847
                l_earliest_time = max(last_vmrr.post_rrs[-1].end, earliest_time)
848
            else:
849
                override_state = None
850
                l_earliest_time = earliest_time
851
                
852
            for n in earliest:
853
                earliest[n].time = max(l.start.requested, l_earliest_time)
854
                
855
            self.logger.debug("Rescheduling lease %s" % l.id)
856
            dur = l.get_remaining_duration_at(l_earliest_time)
857
            
858
            try:
859
                vmrr, preemptions = self.__schedule_asap(l, dur, nexttime, earliest, allow_in_future = True, override_state=override_state)
860
            except NotSchedulableException:
861
                vmrr = None
862
                preemptions = []
863
            
864
            if vmrr == None or vmrr.end - vmrr.start != dur or vmrr.end > l.deadline or len(preemptions) != 0:
865
                raise NotSchedulableException, "Could not schedule before deadline without making other leases miss deadline"
866
            
867
            if dirtytime != None:
868
                dirtytime = max(vmrr.end, dirtytime)
869
            
870
            for rr in vmrr.pre_rrs:
871
                self.slottable.add_reservation(rr)                
872
            self.slottable.add_reservation(vmrr)
873
            for rr in vmrr.post_rrs:
874
                self.slottable.add_reservation(rr)                    
875
            new_vmrrs[l] = vmrr
876
            scheduled.add(l)                    
877
                
878
        return scheduled, new_vmrrs, dirtytime
879
        
880

    
881
    def reschedule_deadline(self, lease, duration, nexttime, earliest, override_state = None):
882
        for n in earliest:
883
            earliest[n].time = max(lease.start.requested, earliest[n].time)
884
        
885
        try:
886
            vmrr, preemptions = self.__schedule_asap(lease, duration, nexttime, earliest, allow_in_future = True, override_state=override_state)
887
        except NotSchedulableException:
888
            vmrr = None
889
            preemptions = []
890
            
891
        if vmrr == None or vmrr.end - vmrr.start != duration or vmrr.end > lease.deadline or len(preemptions)>0:
892
            self.logger.debug("Lease #%i cannot be scheduled before deadline using best-effort." % lease.id)
893
            raise NotSchedulableException, "Could not schedule before deadline without making other leases miss deadline"
894
        else:
895
            return vmrr, preemptions
896
        
897
        
898

    
899
    def __find_fit_at_points(self, lease, requested_resources, changepoints, duration, min_duration, shutdown_time):
900
        """ Tries to map a lease in a given list of points in time
901
        
902
        This method goes through a given list of points in time and tries
903
        to find the earliest time at which that lease can be allocated
904
        resources.
905
        
906
        Arguments:
907
        lease -- Lease to schedule
908
        requested_resources -- A dictionary of lease node -> ResourceTuple.
909
        changepoints -- The list of changepoints
910
        duration -- The amount of time requested
911
        min_duration -- The minimum amount of time that should be allocated
912
        
913
        Returns:
914
        start -- The time at which resources have been found for the lease
915
        actualend -- The time at which the resources won't be available. Note
916
        that this is not necessarily (start + duration) since the mapper
917
        might be unable to find enough resources for the full requested duration.
918
        mapping -- A mapping of lease nodes to physical nodes
919
        preemptions -- A list of 
920
        (if no mapping is found, all these values are set to None)
921
        """                 
922
        found = False
923
        
924
        for time, onlynodes in changepoints:
925
            start = time
926
            end = start + duration
927
            self.logger.debug("Attempting to map from %s to %s" % (start, end))
928
            
929
            # If suspension is disabled, we will only accept mappings that go
930
            # from "start" strictly until "end".
931
            susptype = get_config().get("suspension")
932
            # TODO: Remove the "if is deadline lease" condition and replace it
933
            # with something cleaner
934
            if susptype == constants.SUSPENSION_NONE or (lease.numnodes > 1 and susptype == constants.SUSPENSION_SERIAL) or lease.get_type() == Lease.DEADLINE: 
935
                strictend = True
936
            else:
937
                strictend = False
938

    
939
            # Let the mapper work its magic
940
            mapping, actualend, preemptions = self.mapper.map(lease, 
941
                                                              requested_resources,
942
                                                              start, 
943
                                                              end, 
944
                                                              strictend = strictend,
945
                                                              onlynodes = onlynodes,
946
                                                              allow_preemption = False)
947

    
948
            # We have a mapping; we still have to check if it satisfies
949
            # the minimum duration.
950
            if mapping != None:
951
                if actualend < end:
952
                    actualduration = actualend - start
953
                    if actualduration >= min_duration:
954
                        if duration - shutdown_time >= actualduration:
955
                            self.logger.debug("This lease can be scheduled from %s to %s (will require suspension)" % (start, actualend))
956
                            found = True
957
                            break
958
                        else:
959
                            self.logger.debug("This lease requires suspension, but doing so would involve 'suspending' the shutdown.")
960
                    else:
961
                        self.logger.debug("This starting time does not allow for the requested minimum duration (%s < %s)" % (actualduration, min_duration))
962
                else:
963
                    self.logger.debug("This lease can be scheduled from %s to %s (full duration)" % (start, end))
964
                    found = True
965
                    break
966
        
967
        if found:
968
            return start, actualend, mapping, preemptions
969
        else:
970
            return None, None, None, None
971
    
972
    
973
    def __compute_susprem_times(self, vmrr, time, direction, exclusion, rate, override = None):
974
        """ Computes the times at which suspend/resume operations would have to start
975
        
976
        When suspending or resuming a VM, the VM's memory is dumped to a
977
        file on disk. To correctly estimate the time required to suspend
978
        a lease with multiple VMs, Haizea makes sure that no two 
979
        suspensions/resumptions happen at the same time (e.g., if eight
980
        memory files were being saved at the same time to disk, the disk's
981
        performance would be reduced in a way that is not as easy to estimate
982
        as if only one file were being saved at a time). Based on a number
983
        of parameters, this method estimates the times at which the 
984
        suspend/resume commands would have to be sent to guarantee this
985
        exclusion.
986
                    
987
        Arguments:
988
        vmrr -- The VM reservation that will be suspended/resumed
989
        time -- The time at which the suspend should end or the resume should start.
990
        direction -- DIRECTION_BACKWARD: start at "time" and compute the times going
991
        backward (for suspensions) DIRECTION_FORWARD: start at time "time" and compute
992
        the times going forward.
993
        exclusion -- SUSPRES_EXCLUSION_GLOBAL (memory is saved to global filesystem)
994
        or SUSPRES_EXCLUSION_LOCAL (saved to local filesystem)
995
        rate -- The rate at which an individual VM is suspended/resumed
996
        override -- If specified, then instead of computing the time to 
997
        suspend/resume VM based on its memory and the "rate" parameter,
998
        use this override value.
999
        
1000
        """         
1001
        times = [] # (start, end, {pnode -> vnodes})
1002
        enactment_overhead = get_config().get("enactment-overhead") 
1003

    
1004
        if override != None:
1005
            override = TimeDelta(seconds=override)
1006

    
1007
        if exclusion == constants.SUSPRES_EXCLUSION_GLOBAL:
1008
            # Global exclusion (which represents, e.g., reading/writing the memory image files
1009
            # from a global file system) meaning no two suspensions/resumptions can happen at 
1010
            # the same time in the entire resource pool.
1011
            
1012
            t = time
1013
            t_prev = None
1014
                
1015
            for (vnode,pnode) in vmrr.nodes.items():
1016
                if override == None:
1017
                    mem = vmrr.lease.requested_resources[vnode].get_quantity(constants.RES_MEM)
1018
                    op_time = compute_suspend_resume_time(mem, rate)
1019
                else:
1020
                    op_time = override
1021

    
1022
                op_time += enactment_overhead
1023
                    
1024
                t_prev = t
1025
                
1026
                if direction == constants.DIRECTION_FORWARD:
1027
                    t += op_time
1028
                    times.append((t_prev, t, {pnode:[vnode]}))
1029
                elif direction == constants.DIRECTION_BACKWARD:
1030
                    t -= op_time
1031
                    times.append((t, t_prev, {pnode:[vnode]}))
1032

    
1033
        elif exclusion == constants.SUSPRES_EXCLUSION_LOCAL:
1034
            # Local exclusion (which represents, e.g., reading the memory image files
1035
            # from a local file system) means no two resumptions can happen at the same
1036
            # time in the same physical node.
1037
            pervnode_times = [] # (start, end, vnode)
1038
            vnodes_in_pnode = {}
1039
            for (vnode,pnode) in vmrr.nodes.items():
1040
                vnodes_in_pnode.setdefault(pnode, []).append(vnode)
1041
            for pnode in vnodes_in_pnode:
1042
                t = time
1043
                t_prev = None
1044
                for vnode in vnodes_in_pnode[pnode]:
1045
                    if override == None:
1046
                        mem = vmrr.lease.requested_resources[vnode].get_quantity(constants.RES_MEM)
1047
                        op_time = compute_suspend_resume_time(mem, rate)
1048
                    else:
1049
                        op_time = override                    
1050
                    
1051
                    t_prev = t
1052
                    
1053
                    if direction == constants.DIRECTION_FORWARD:
1054
                        t += op_time
1055
                        pervnode_times.append((t_prev, t, vnode))
1056
                    elif direction == constants.DIRECTION_BACKWARD:
1057
                        t -= op_time
1058
                        pervnode_times.append((t, t_prev, vnode))
1059
            
1060
            # Consolidate suspend/resume operations happening at the same time
1061
            uniq_times = set([(start, end) for (start, end, vnode) in pervnode_times])
1062
            for (start, end) in uniq_times:
1063
                vnodes = [x[2] for x in pervnode_times if x[0] == start and x[1] == end]
1064
                node_mappings = {}
1065
                for vnode in vnodes:
1066
                    pnode = vmrr.nodes[vnode]
1067
                    node_mappings.setdefault(pnode, []).append(vnode)
1068
                times.append([start,end,node_mappings])
1069
        
1070
            # Add the enactment overhead
1071
            for t in times:
1072
                num_vnodes = sum([len(vnodes) for vnodes in t[2].values()])
1073
                overhead = TimeDelta(seconds = num_vnodes * enactment_overhead)
1074
                if direction == constants.DIRECTION_FORWARD:
1075
                    t[1] += overhead
1076
                elif direction == constants.DIRECTION_BACKWARD:
1077
                    t[0] -= overhead
1078
                    
1079
            # Fix overlaps
1080
            if direction == constants.DIRECTION_FORWARD:
1081
                times.sort(key=itemgetter(0))
1082
            elif direction == constants.DIRECTION_BACKWARD:
1083
                times.sort(key=itemgetter(1))
1084
                times.reverse()
1085
                
1086
            prev_start = None
1087
            prev_end = None
1088
            for t in times:
1089
                if prev_start != None:
1090
                    start = t[0]
1091
                    end = t[1]
1092
                    if direction == constants.DIRECTION_FORWARD:
1093
                        if start < prev_end:
1094
                            diff = prev_end - start
1095
                            t[0] += diff
1096
                            t[1] += diff
1097
                    elif direction == constants.DIRECTION_BACKWARD:
1098
                        if end > prev_start:
1099
                            diff = end - prev_start
1100
                            t[0] -= diff
1101
                            t[1] -= diff
1102
                prev_start = t[0]
1103
                prev_end = t[1]
1104
        
1105
        return times
1106
    
1107
    
1108
    def __schedule_shutdown(self, vmrr):
1109
        """ Schedules the shutdown of a VM reservation
1110
                            
1111
        Arguments:
1112
        vmrr -- The VM reservation that will be shutdown
1113
        
1114
        """                 
1115
        shutdown_time = vmrr.lease.estimate_shutdown_time()
1116

    
1117
        start = vmrr.end - shutdown_time
1118
        end = vmrr.end
1119
        
1120
        shutdown_rr = ShutdownResourceReservation(vmrr.lease, start, end, vmrr.resources_in_pnode, vmrr.nodes, vmrr)
1121
        shutdown_rr.state = ResourceReservation.STATE_SCHEDULED
1122
                
1123
        vmrr.update_end(start)
1124
        
1125
        # If there are any post RRs, remove them
1126
        for rr in vmrr.post_rrs:
1127
            self.slottable.remove_reservation(rr)
1128
        vmrr.post_rrs = []
1129

    
1130
        vmrr.post_rrs.append(shutdown_rr)
1131

    
1132

    
1133
    def __schedule_suspension(self, vmrr, suspend_by):
1134
        """ Schedules the suspension of a VM reservation
1135
                         
1136
        Most of the work is done in __compute_susprem_times. See that
1137
        method's documentation for more details.
1138
                            
1139
        Arguments:
1140
        vmrr -- The VM reservation that will be suspended
1141
        suspend_by -- The time by which the VMs should be suspended.
1142
        
1143
        """            
1144
        config = get_config()
1145
        susp_exclusion = config.get("suspendresume-exclusion")
1146
        override = get_config().get("override-suspend-time")
1147
        rate = config.get("suspend-rate") 
1148

    
1149
        if suspend_by < vmrr.start or suspend_by > vmrr.end:
1150
            raise InconsistentScheduleError, "Tried to schedule a suspension by %s, which is outside the VMRR's duration (%s-%s)" % (suspend_by, vmrr.start, vmrr.end)
1151

    
1152
        # Find the suspension times
1153
        times = self.__compute_susprem_times(vmrr, suspend_by, constants.DIRECTION_BACKWARD, susp_exclusion, rate, override)
1154
        
1155
        # Create the suspension resource reservations
1156
        suspend_rrs = []
1157
        for (start, end, node_mappings) in times:
1158
            suspres = {}
1159
            all_vnodes = []
1160
            for (pnode,vnodes) in node_mappings.items():
1161
                num_vnodes = len(vnodes)
1162
                r = Capacity([constants.RES_CPU, constants.RES_MEM,constants.RES_DISK])
1163
                mem = 0
1164
                for vnode in vnodes:
1165
                    mem += vmrr.lease.requested_resources[vnode].get_quantity(constants.RES_MEM)
1166
                r.set_ninstances(constants.RES_CPU, num_vnodes)
1167
                for i in xrange(num_vnodes):
1168
                    r.set_quantity_instance(constants.RES_CPU, i +1, 100)
1169
                r.set_quantity(constants.RES_MEM, mem * num_vnodes)
1170
                r.set_quantity(constants.RES_DISK, mem * num_vnodes)
1171
                suspres[pnode] = self.slottable.create_resource_tuple_from_capacity(r)          
1172
                all_vnodes += vnodes     
1173
                             
1174
            susprr = SuspensionResourceReservation(vmrr.lease, start, end, suspres, all_vnodes, vmrr)
1175
            susprr.state = ResourceReservation.STATE_SCHEDULED
1176
            suspend_rrs.append(susprr)
1177
                
1178
        suspend_rrs.sort(key=attrgetter("start"))
1179
            
1180
        susp_start = suspend_rrs[0].start
1181
        if susp_start < vmrr.start:
1182
            raise InconsistentScheduleError, "Determined suspension should start at %s, before the VMRR's start (%s) -- Suspend time not being properly estimated?" % (susp_start, vmrr.start)
1183
        
1184
        vmrr.update_end(susp_start)
1185
        
1186
        # If there are any post RRs, remove them
1187
        for rr in vmrr.post_rrs:
1188
            self.slottable.remove_reservation(rr)
1189
        vmrr.post_rrs = []
1190

    
1191
        # Add the suspension RRs to the VM RR
1192
        for susprr in suspend_rrs:
1193
            vmrr.post_rrs.append(susprr)       
1194
            
1195
            
1196
    def __schedule_resumption(self, vmrr, resume_at):
1197
        """ Schedules the resumption of a VM reservation
1198
                         
1199
        Most of the work is done in __compute_susprem_times. See that
1200
        method's documentation for more details.
1201
                            
1202
        Arguments:
1203
        vmrr -- The VM reservation that will be resumed
1204
        resume_at -- The time at which the resumption should start
1205
        
1206
        """                 
1207
        config = get_config()
1208
        resm_exclusion = config.get("suspendresume-exclusion")        
1209
        override = get_config().get("override-resume-time")
1210
        rate = config.get("resume-rate") 
1211

    
1212
        if resume_at < vmrr.start or resume_at > vmrr.end:
1213
            raise InconsistentScheduleError, "Tried to schedule a resumption at %s, which is outside the VMRR's duration (%s-%s)" % (resume_at, vmrr.start, vmrr.end)
1214

    
1215
        # Find the resumption times
1216
        times = self.__compute_susprem_times(vmrr, resume_at, constants.DIRECTION_FORWARD, resm_exclusion, rate, override)
1217
        
1218
        # Create the resumption resource reservations
1219
        resume_rrs = []
1220
        for (start, end, node_mappings) in times:
1221
            resmres = {}
1222
            all_vnodes = []
1223
            for (pnode,vnodes) in node_mappings.items():
1224
                num_vnodes = len(vnodes)
1225
                r = Capacity([constants.RES_CPU, constants.RES_MEM, constants.RES_DISK])
1226
                mem = 0
1227
                for vnode in vnodes:
1228
                    mem += vmrr.lease.requested_resources[vnode].get_quantity(constants.RES_MEM)
1229
                r.set_ninstances(constants.RES_CPU, num_vnodes)
1230
                for i in xrange(num_vnodes):
1231
                    r.set_quantity_instance(constants.RES_CPU, i +1, 100)
1232
                r.set_quantity(constants.RES_MEM, mem * num_vnodes)
1233
                r.set_quantity(constants.RES_DISK, mem * num_vnodes)
1234
                resmres[pnode] = self.slottable.create_resource_tuple_from_capacity(r)
1235
                all_vnodes += vnodes
1236
            resmrr = ResumptionResourceReservation(vmrr.lease, start, end, resmres, all_vnodes, vmrr)
1237
            resmrr.state = ResourceReservation.STATE_SCHEDULED
1238
            resume_rrs.append(resmrr)
1239
                
1240
        resume_rrs.sort(key=attrgetter("start"))
1241
            
1242
        resm_end = resume_rrs[-1].end
1243
        if resm_end > vmrr.end:
1244
            raise InconsistentScheduleError, "Determined resumption would end at %s, after the VMRR's end (%s) -- Resume time not being properly estimated?" % (resm_end, vmrr.end)
1245
        
1246
        vmrr.update_start(resm_end)
1247
        
1248
        # Add the resumption RRs to the VM RR
1249
        for resmrr in resume_rrs:
1250
            vmrr.pre_rrs.append(resmrr)        
1251

    
1252

    
1253
    def __compute_scheduling_threshold(self, lease):
1254
        """ Compute the scheduling threshold (the 'minimum duration') of a lease
1255
        
1256
        To avoid thrashing, Haizea will not schedule a lease unless all overheads
1257
        can be correctly scheduled (which includes image transfers, suspensions, etc.).
1258
        However, this can still result in situations where a lease is prepared,
1259
        and then immediately suspended because of a blocking lease in the future.
1260
        The scheduling threshold is used to specify that a lease must
1261
        not be scheduled unless it is guaranteed to run for a minimum amount of
1262
        time (the rationale behind this is that you ideally don't want leases
1263
        to be scheduled if they're not going to be active for at least as much time
1264
        as was spent in overheads).
1265
        
1266
        An important part of computing this value is the "scheduling threshold factor".
1267
        The default value is 1, meaning that the lease will be active for at least
1268
        as much time T as was spent on overheads (e.g., if preparing the lease requires
1269
        60 seconds, and we know that it will have to be suspended, requiring 30 seconds,
1270
        Haizea won't schedule the lease unless it can run for at least 90 minutes).
1271
        In other words, a scheduling factor of F required a minimum duration of 
1272
        F*T. A value of 0 could lead to thrashing, since Haizea could end up with
1273
        situations where a lease starts and immediately gets suspended.         
1274
        
1275
        Arguments:
1276
        lease -- Lease for which we want to find the scheduling threshold
1277
        """
1278
        # TODO: Take into account other things like boot overhead, migration overhead, etc.
1279
        config = get_config()
1280
        threshold = config.get("force-scheduling-threshold")
1281
        if threshold != None:
1282
            # If there is a hard-coded threshold, use that
1283
            return threshold
1284
        
1285
        if config.get("suspension") != constants.SUSPENSION_NONE:
1286
            factor = config.get("scheduling-threshold-factor")
1287
            
1288
            # First, figure out the "safe duration" (the minimum duration
1289
            # so that we at least allocate enough time for all the
1290
            # overheads).
1291
            susp_overhead = lease.estimate_suspend_time()
1292
            safe_duration = susp_overhead
1293
            
1294
            if lease.get_state() == Lease.STATE_SUSPENDED_QUEUED:
1295
                resm_overhead = lease.estimate_resume_time()
1296
                safe_duration += resm_overhead
1297
        else:
1298
            safe_duration = 0
1299
            
1300
        # TODO: Incorporate other overheads into the minimum duration
1301
        min_duration = safe_duration
1302
        
1303
        # At the very least, we want to allocate enough time for the
1304
        # safe duration (otherwise, we'll end up with incorrect schedules,
1305
        # where a lease is scheduled to suspend, but isn't even allocated
1306
        # enough time to suspend). 
1307
        # The factor is assumed to be non-negative. i.e., a factor of 0
1308
        # means we only allocate enough time for potential suspend/resume
1309
        # operations, while a factor of 1 means the lease will get as much
1310
        # running time as spend on the runtime overheads involved in setting
1311
        # it up
1312
        threshold = safe_duration + (min_duration * factor)
1313
        return threshold
1314

    
1315

    
1316
    #-------------------------------------------------------------------#
1317
    #                                                                   #
1318
    #                  SLOT TABLE EVENT HANDLERS                        #
1319
    #                                                                   #
1320
    #-------------------------------------------------------------------#
1321

    
1322
    def _handle_start_vm(self, l, rr):
1323
        """ Handles the start of a VMResourceReservation       
1324
        
1325
        Arguments:
1326
        l -- Lease the VMResourceReservation belongs to
1327
        rr -- THe VMResourceReservation
1328
        """        
1329
        self.logger.debug("LEASE-%i Start of handleStartVM" % l.id)
1330
        l.print_contents()
1331

    
1332
        lease_state = l.get_state()
1333

    
1334
        if get_config().get("lease-preparation") == "imagetransfer":
1335
            if not self.resourcepool.verify_deploy(l, rr):
1336
                self.logger.error("Deployment of lease %i was not complete." % l.id)
1337
                raise # TODO raise something better
1338

    
1339
        # Kludge: Should be done by the preparations scheduler
1340
        if l.get_state() == Lease.STATE_SCHEDULED:
1341
            # Piggybacking
1342
            l.set_state(Lease.STATE_READY)
1343
            lease_state = l.get_state()
1344
            
1345
        if lease_state == Lease.STATE_READY:
1346
            l.set_state(Lease.STATE_ACTIVE)
1347
            rr.state = ResourceReservation.STATE_ACTIVE
1348
            now_time = get_clock().get_time()
1349
            l.start.actual = now_time
1350
            
1351
            try:
1352
                self.resourcepool.start_vms(l, rr)
1353
            except EnactmentError, exc:
1354
                self.logger.error("Enactment error when starting VMs.")
1355
                # Right now, this is a non-recoverable error, so we just
1356
                # propagate it upwards to the lease scheduler
1357
                # In the future, it may be possible to react to these
1358
                # kind of errors.
1359
                raise
1360
                
1361
        elif lease_state == Lease.STATE_RESUMED_READY:
1362
            l.set_state(Lease.STATE_ACTIVE)
1363
            rr.state = ResourceReservation.STATE_ACTIVE
1364
            # No enactment to do here, since all the suspend/resume actions are
1365
            # handled during the suspend/resume RRs
1366
        else:
1367
            raise InconsistentLeaseStateError(l, doing = "starting a VM")
1368
        
1369
        # If this was a future reservation (as determined by backfilling),
1370
        # remove that status, since the future is now.
1371
        if rr.lease in self.future_leases:
1372
            self.future_leases.remove(l)
1373
            get_persistence().persist_future_leases(self.future_leases)
1374
        
1375
        l.print_contents()
1376
        self.logger.debug("LEASE-%i End of handleStartVM" % l.id)
1377
        self.logger.info("Started VMs for lease %i on nodes %s" % (l.id, rr.nodes.values()))
1378

    
1379

    
1380
    def _handle_end_vm(self, l, rr):
1381
        """ Handles the end of a VMResourceReservation       
1382
        
1383
        Arguments:
1384
        l -- Lease the VMResourceReservation belongs to
1385
        rr -- THe VMResourceReservation
1386
        """        
1387
        self.logger.debug("LEASE-%i Start of handleEndVM" % l.id)
1388
        self.logger.vdebug("LEASE-%i Before:" % l.id)
1389
        l.print_contents()
1390
        now_time = round_datetime(get_clock().get_time())
1391
        diff = now_time - rr.start
1392
        l.duration.accumulate_duration(diff)
1393
        rr.state = ResourceReservation.STATE_DONE
1394
                
1395
        self.logger.vdebug("LEASE-%i After:" % l.id)
1396
        l.print_contents()
1397
        self.logger.debug("LEASE-%i End of handleEndVM" % l.id)
1398
        self.logger.info("Stopped VMs for lease %i on nodes %s" % (l.id, rr.nodes.values()))
1399

    
1400

    
1401
    def _handle_unscheduled_end_vm(self, l, vmrr):
1402
        """ Handles the unexpected end of a VMResourceReservation
1403
        
1404
        Arguments:
1405
        l -- Lease the VMResourceReservation belongs to
1406
        rr -- THe VMResourceReservation
1407
        """  
1408
        
1409
        self.logger.info("LEASE-%i The VM has ended prematurely." % l.id)
1410
        for rr in vmrr.post_rrs:
1411
            self.slottable.remove_reservation(rr)
1412
        vmrr.post_rrs = []
1413
        vmrr.end = get_clock().get_time()
1414
        self._handle_end_vm(l, vmrr)
1415

    
1416

    
1417
    def _handle_start_suspend(self, l, rr):
1418
        """ Handles the start of a SuspensionResourceReservation       
1419
        
1420
        Arguments:
1421
        l -- Lease the SuspensionResourceReservation belongs to
1422
        rr -- The SuspensionResourceReservation
1423
        
1424
        """
1425
        self.logger.debug("LEASE-%i Start of handleStartSuspend" % l.id)
1426
        l.print_contents()
1427
        rr.state = ResourceReservation.STATE_ACTIVE
1428
        
1429
        try:
1430
            self.resourcepool.suspend_vms(l, rr)
1431
        except EnactmentError, exc:
1432
            self.logger.error("Enactment error when suspending VMs.")
1433
            # Right now, this is a non-recoverable error, so we just
1434
            # propagate it upwards to the lease scheduler
1435
            # In the future, it may be possible to react to these
1436
            # kind of errors.
1437
            raise            
1438

    
1439
        if rr.is_first():
1440
            l.set_state(Lease.STATE_SUSPENDING)
1441
            l.print_contents()
1442
            self.logger.info("Suspending lease %i..." % (l.id))
1443
        self.logger.debug("LEASE-%i End of handleStartSuspend" % l.id)
1444

    
1445

    
1446
    def _handle_end_suspend(self, l, rr):
1447
        """ Handles the end of a SuspensionResourceReservation       
1448
        
1449
        Arguments:
1450
        l -- Lease the SuspensionResourceReservation belongs to
1451
        rr -- The SuspensionResourceReservation
1452
        """               
1453
        self.logger.debug("LEASE-%i Start of handleEndSuspend" % l.id)
1454
        l.print_contents()
1455
        # TODO: React to incomplete suspend
1456
        self.resourcepool.verify_suspend(l, rr)
1457
        rr.state = ResourceReservation.STATE_DONE
1458
        if rr.is_last():
1459
            if l.get_type() == Lease.DEADLINE:
1460
                l.set_state(Lease.STATE_SUSPENDED_PENDING)
1461
                l.set_state(Lease.STATE_SUSPENDED_SCHEDULED)
1462
            else:
1463
                l.set_state(Lease.STATE_SUSPENDED_PENDING)
1464
        l.print_contents()
1465
        self.logger.debug("LEASE-%i End of handleEndSuspend" % l.id)
1466
        self.logger.info("Lease %i suspended." % (l.id))
1467
        
1468
        if l.get_state() == Lease.STATE_SUSPENDED_PENDING:
1469
            raise RescheduleLeaseException
1470

    
1471

    
1472
    def _handle_start_resume(self, l, rr):
1473
        """ Handles the start of a ResumptionResourceReservation       
1474
        
1475
        Arguments:
1476
        l -- Lease the ResumptionResourceReservation belongs to
1477
        rr -- The ResumptionResourceReservation
1478
        
1479
        """             
1480
        self.logger.debug("LEASE-%i Start of handleStartResume" % l.id)
1481
        l.print_contents()
1482
        
1483
        try:
1484
            self.resourcepool.resume_vms(l, rr)
1485
        except EnactmentError, exc:
1486
            self.logger.error("Enactment error when resuming VMs.")
1487
            # Right now, this is a non-recoverable error, so we just
1488
            # propagate it upwards to the lease scheduler
1489
            # In the future, it may be possible to react to these
1490
            # kind of errors.
1491
            raise
1492
                    
1493
        rr.state = ResourceReservation.STATE_ACTIVE
1494
        if rr.is_first():
1495
            l.set_state(Lease.STATE_RESUMING)
1496
            l.print_contents()
1497
            self.logger.info("Resuming lease %i..." % (l.id))
1498
        self.logger.debug("LEASE-%i End of handleStartResume" % l.id)
1499

    
1500

    
1501
    def _handle_end_resume(self, l, rr):
1502
        """ Handles the end of a ResumptionResourceReservation       
1503
        
1504
        Arguments:
1505
        l -- Lease the ResumptionResourceReservation belongs to
1506
        rr -- The ResumptionResourceReservation
1507
        
1508
        """        
1509
        self.logger.debug("LEASE-%i Start of handleEndResume" % l.id)
1510
        l.print_contents()
1511
        # TODO: React to incomplete resume
1512
        self.resourcepool.verify_resume(l, rr)
1513
        rr.state = ResourceReservation.STATE_DONE
1514
        if rr.is_last():
1515
            l.set_state(Lease.STATE_RESUMED_READY)
1516
            self.logger.info("Resumed lease %i" % (l.id))
1517
        for vnode, pnode in rr.vmrr.nodes.items():
1518
            self.resourcepool.remove_ramfile(pnode, l, vnode)
1519
        l.print_contents()
1520
        self.logger.debug("LEASE-%i End of handleEndResume" % l.id)
1521

    
1522

    
1523
    def _handle_start_shutdown(self, l, rr):
1524
        """ Handles the start of a ShutdownResourceReservation       
1525
        
1526
        Arguments:
1527
        l -- Lease the SuspensionResourceReservation belongs to
1528
        rr -- The SuspensionResourceReservation
1529
        """        
1530
        
1531
        self.logger.debug("LEASE-%i Start of handleStartShutdown" % l.id)
1532
        l.print_contents()
1533
        rr.state = ResourceReservation.STATE_ACTIVE
1534
        try:
1535
            self.resourcepool.stop_vms(l, rr)
1536
        except EnactmentError, exc:
1537
            self.logger.error("Enactment error when shutting down VMs.")
1538
            # Right now, this is a non-recoverable error, so we just
1539
            # propagate it upwards to the lease scheduler
1540
            # In the future, it may be possible to react to these
1541
            # kind of errors.
1542
            raise
1543
        
1544
        l.print_contents()
1545
        self.logger.debug("LEASE-%i End of handleStartShutdown" % l.id)
1546

    
1547

    
1548
    def _handle_end_shutdown(self, l, rr):
1549
        """ Handles the end of a SuspensionResourceReservation       
1550
        
1551
        Arguments:
1552
        l -- Lease the SuspensionResourceReservation belongs to
1553
        rr -- The SuspensionResourceReservation
1554
        
1555
        """
1556
        self.logger.debug("LEASE-%i Start of handleEndShutdown" % l.id)
1557
        l.print_contents()
1558
        rr.state = ResourceReservation.STATE_DONE
1559
        l.print_contents()
1560
        self.logger.debug("LEASE-%i End of handleEndShutdown" % l.id)
1561
        self.logger.info("Lease %i's VMs have shutdown." % (l.id))
1562
        raise NormalEndLeaseException
1563
    
1564

    
1565
    def _handle_start_migrate(self, l, rr):
1566
        """ Handles the start of a MemImageMigrationResourceReservation       
1567
        
1568
        Arguments:
1569
        l -- Lease the MemImageMigrationResourceReservation belongs to
1570
        rr -- The MemImageMigrationResourceReservation
1571
        
1572
        """             
1573
        self.logger.debug("LEASE-%i Start of handleStartMigrate" % l.id)
1574
        l.print_contents()
1575
        rr.state = ResourceReservation.STATE_ACTIVE
1576
        l.print_contents()
1577
        self.logger.debug("LEASE-%i End of handleStartMigrate" % l.id)
1578
        self.logger.info("Migrating lease %i..." % (l.id))
1579

    
1580

    
1581
    def _handle_end_migrate(self, l, rr):
1582
        """ Handles the end of a MemImageMigrationResourceReservation       
1583
        
1584
        Arguments:
1585
        l -- Lease the MemImageMigrationResourceReservation belongs to
1586
        rr -- The MemImageMigrationResourceReservation
1587
        
1588
        """                
1589
        self.logger.debug("LEASE-%i Start of handleEndMigrate" % l.id)
1590
        l.print_contents()
1591

    
1592
        for vnode in rr.transfers:
1593
            origin = rr.transfers[vnode][0]
1594
            dest = rr.transfers[vnode][1]
1595
            
1596
            # Update RAM files
1597
            self.resourcepool.remove_ramfile(origin, l, vnode)
1598
            self.resourcepool.add_ramfile(dest, l, vnode, l.requested_resources[vnode].get_quantity(constants.RES_MEM))
1599
        
1600
        rr.state = ResourceReservation.STATE_DONE
1601
        l.print_contents()
1602
        self.logger.debug("LEASE-%i End of handleEndMigrate" % l.id)
1603
        self.logger.info("Migrated lease %i..." % (l.id))
1604

    
1605

    
1606

    
1607
class VMResourceReservation(ResourceReservation):
1608
    def __init__(self, lease, start, end, nodes, res):
1609
        ResourceReservation.__init__(self, lease, start, end, res)
1610
        self.nodes = nodes # { vnode -> pnode }
1611
        self.pre_rrs = []
1612
        self.post_rrs = []
1613
        self.prematureend = None
1614

    
1615
    def update_start(self, time):
1616
        self.start = time
1617
        # ONLY for simulation
1618
        self.lease._update_prematureend()
1619

    
1620
    def update_end(self, time):
1621
        self.end = time
1622
        # ONLY for simulation
1623
        self.lease._update_prematureend()
1624
        
1625
    def get_first_start(self):
1626
        if len(self.pre_rrs) == 0:
1627
            return self.start
1628
        else:
1629
            return [rr for rr in self.pre_rrs if isinstance(rr, ResumptionResourceReservation)][0].start
1630

    
1631
    def get_final_end(self):
1632
        if len(self.post_rrs) == 0:
1633
            return self.end
1634
        else:
1635
            return self.post_rrs[-1].end
1636

    
1637
    def is_resuming(self):
1638
        return len(self.pre_rrs) > 0 and reduce(operator.or_, [isinstance(rr, ResumptionResourceReservation) for rr in self.pre_rrs])
1639

    
1640
    def is_suspending(self):
1641
        return len(self.post_rrs) > 0 and isinstance(self.post_rrs[0], SuspensionResourceReservation)
1642

    
1643
    def is_shutting_down(self):
1644
        return len(self.post_rrs) > 0 and isinstance(self.post_rrs[0], ShutdownResourceReservation)
1645
    
1646
    def clear_rrs(self):
1647
        for rr in self.pre_rrs:
1648
            rr.clear_rrs()
1649
        for rr in self.post_rrs:
1650
            rr.clear_rrs()
1651
        self.pre_rrs = None
1652
        self.post_rrs = None
1653

    
1654
    def print_contents(self, loglevel=constants.LOGLEVEL_VDEBUG):
1655
        logger = logging.getLogger("LEASES")
1656
        for resmrr in self.pre_rrs:
1657
            resmrr.print_contents(loglevel)
1658
            logger.log(loglevel, "--")
1659
        logger.log(loglevel, "Type           : VM")
1660
        logger.log(loglevel, "Nodes          : %s" % pretty_nodemap(self.nodes))
1661
        if self.prematureend != None:
1662
            logger.log(loglevel, "Premature end  : %s" % self.prematureend)
1663
        ResourceReservation.print_contents(self, loglevel)
1664
        for susprr in self.post_rrs:
1665
            logger.log(loglevel, "--")
1666
            susprr.print_contents(loglevel)
1667

    
1668
        
1669
class SuspensionResourceReservation(ResourceReservation):
1670
    def __init__(self, lease, start, end, res, vnodes, vmrr):
1671
        ResourceReservation.__init__(self, lease, start, end, res)
1672
        self.vmrr = vmrr
1673
        self.vnodes = vnodes
1674

    
1675
    def print_contents(self, loglevel=constants.LOGLEVEL_VDEBUG):
1676
        logger = logging.getLogger("LEASES")
1677
        logger.log(loglevel, "Type           : SUSPEND")
1678
        logger.log(loglevel, "Vnodes         : %s" % self.vnodes)
1679
        ResourceReservation.print_contents(self, loglevel)
1680
        
1681
    def is_first(self):
1682
        return (self == self.vmrr.post_rrs[0])
1683

    
1684
    def is_last(self):
1685
        return (self == self.vmrr.post_rrs[-1])   
1686
        
1687
    def clear_rrs(self):
1688
        self.vmrr = None
1689
        
1690
class ResumptionResourceReservation(ResourceReservation):
1691
    def __init__(self, lease, start, end, res, vnodes, vmrr):
1692
        ResourceReservation.__init__(self, lease, start, end, res)
1693
        self.vmrr = vmrr
1694
        self.vnodes = vnodes
1695

    
1696
    def print_contents(self, loglevel=constants.LOGLEVEL_VDEBUG):
1697
        logger = logging.getLogger("LEASES")
1698
        logger.log(loglevel, "Type           : RESUME")
1699
        logger.log(loglevel, "Vnodes         : %s" % self.vnodes)
1700
        ResourceReservation.print_contents(self, loglevel)
1701

    
1702
    def is_first(self):
1703
        resm_rrs = [r for r in self.vmrr.pre_rrs if isinstance(r, ResumptionResourceReservation)]
1704
        return (self == resm_rrs[0])
1705

    
1706
    def is_last(self):
1707
        resm_rrs = [r for r in self.vmrr.pre_rrs if isinstance(r, ResumptionResourceReservation)]
1708
        return (self == resm_rrs[-1])
1709
    
1710
    def clear_rrs(self):
1711
        self.vmrr = None
1712
    
1713
class ShutdownResourceReservation(ResourceReservation):
1714
    def __init__(self, lease, start, end, res, vnodes, vmrr):
1715
        ResourceReservation.__init__(self, lease, start, end, res)
1716
        self.vmrr = vmrr
1717
        self.vnodes = vnodes
1718

    
1719
    def print_contents(self, loglevel=constants.LOGLEVEL_VDEBUG):
1720
        logger = logging.getLogger("LEASES")
1721
        logger.log(loglevel, "Type           : SHUTDOWN")
1722
        ResourceReservation.print_contents(self, loglevel)
1723

    
1724
    def clear_rrs(self):
1725
        self.vmrr = None
1726

    
1727
class MemImageMigrationResourceReservation(MigrationResourceReservation):
1728
    def __init__(self, lease, start, end, res, vmrr, transfers):
1729
        MigrationResourceReservation.__init__(self, lease, start, end, res, vmrr, transfers)
1730
  
1731
    def print_contents(self, loglevel=constants.LOGLEVEL_VDEBUG):
1732
        logger = logging.getLogger("LEASES")
1733
        logger.log(loglevel, "Type           : MEM IMAGE MIGRATION")
1734
        logger.log(loglevel, "Transfers      : %s" % self.transfers)
1735
        ResourceReservation.print_contents(self, loglevel)