Project

General

Profile

root / branches / 1.1 / src / haizea / core / scheduler / vm_scheduler.py @ 849

1 632 borja
# -------------------------------------------------------------------------- #
2 641 borja
# Copyright 2006-2009, University of Chicago                                 #
3
# Copyright 2008-2009, Distributed Systems Architecture Group, Universidad   #
4 632 borja
# Complutense de Madrid (dsa-research.org)                                   #
5
#                                                                            #
6
# Licensed under the Apache License, Version 2.0 (the "License"); you may    #
7
# not use this file except in compliance with the License. You may obtain    #
8
# a copy of the License at                                                   #
9
#                                                                            #
10
# http://www.apache.org/licenses/LICENSE-2.0                                 #
11
#                                                                            #
12
# Unless required by applicable law or agreed to in writing, software        #
13
# distributed under the License is distributed on an "AS IS" BASIS,          #
14
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.   #
15
# See the License for the specific language governing permissions and        #
16
# limitations under the License.                                             #
17
# -------------------------------------------------------------------------- #
18
19
"""This module provides the main classes for Haizea's VM Scheduler. All the
20
scheduling code that decides when and where a lease is scheduled is contained
21
in the VMScheduler class (except for the code that specifically decides
22
what physical machines each virtual machine is mapped to, which is factored out
23
into the "mapper" module). This module also provides the classes for the
24
reservations that will be placed in the slot table and correspond to VMs.
25
"""
26
27
import haizea.common.constants as constants
28 741 borja
from haizea.common.utils import round_datetime_delta, round_datetime, estimate_transfer_time, pretty_nodemap, get_config, get_clock, get_persistence, compute_suspend_resume_time
29 632 borja
from haizea.core.leases import Lease, Capacity
30
from haizea.core.scheduler.slottable import ResourceReservation, ResourceTuple
31
from haizea.core.scheduler import ReservationEventHandler, RescheduleLeaseException, NormalEndLeaseException, EnactmentError, NotSchedulableException, InconsistentScheduleError, InconsistentLeaseStateError, MigrationResourceReservation
32
from operator import attrgetter, itemgetter
33
from mx.DateTime import TimeDelta
34
35
import logging
36 767 borja
import operator
37 632 borja
38
39
class VMScheduler(object):
40
    """The Haizea VM Scheduler
41

42
    This class is responsible for taking a lease and scheduling VMs to satisfy
43
    the requirements of that lease.
44
    """
45
46 664 borja
    def __init__(self, slottable, resourcepool, mapper, max_in_future):
47 632 borja
        """Constructor
48

49
        The constructor does little more than create the VM scheduler's
50
        attributes. However, it does expect (in the arguments) a fully-constructed
51
        SlotTable, ResourcePool, and Mapper (these are constructed in the
52
        Manager's constructor).
53

54
        Arguments:
55
        slottable -- Slot table
56
        resourcepool -- Resource pool where enactment commands will be sent to
57
        mapper -- Mapper
58
        """
59
        self.slottable = slottable
60
        self.resourcepool = resourcepool
61
        self.mapper = mapper
62
        self.logger = logging.getLogger("VMSCHED")
63
64
        # Register the handlers for the types of reservations used by
65
        # the VM scheduler
66
        self.handlers = {}
67
        self.handlers[VMResourceReservation] = ReservationEventHandler(
68
                                sched    = self,
69
                                on_start = VMScheduler._handle_start_vm,
70
                                on_end   = VMScheduler._handle_end_vm)
71
72
        self.handlers[ShutdownResourceReservation] = ReservationEventHandler(
73
                                sched    = self,
74
                                on_start = VMScheduler._handle_start_shutdown,
75
                                on_end   = VMScheduler._handle_end_shutdown)
76
77
        self.handlers[SuspensionResourceReservation] = ReservationEventHandler(
78
                                sched    = self,
79
                                on_start = VMScheduler._handle_start_suspend,
80
                                on_end   = VMScheduler._handle_end_suspend)
81
82
        self.handlers[ResumptionResourceReservation] = ReservationEventHandler(
83
                                sched    = self,
84
                                on_start = VMScheduler._handle_start_resume,
85
                                on_end   = VMScheduler._handle_end_resume)
86
87
        self.handlers[MemImageMigrationResourceReservation] = ReservationEventHandler(
88
                                sched    = self,
89
                                on_start = VMScheduler._handle_start_migrate,
90
                                on_end   = VMScheduler._handle_end_migrate)
91
92 664 borja
        self.max_in_future = max_in_future
93
94 632 borja
        self.future_leases = set()
95
96
97 751 borja
    def schedule(self, lease, duration, nexttime, earliest, override_state = None):
98 632 borja
        """ The scheduling function
99

100
        This particular function doesn't do much except call __schedule_asap
101
        and __schedule_exact (which do all the work).
102

103
        Arguments:
104
        lease -- Lease to schedule
105
        nexttime -- The next time at which the scheduler can allocate resources.
106
        earliest -- The earliest possible starting times on each physical node
107
        """
108
        if lease.get_type() == Lease.BEST_EFFORT:
109 751 borja
            return self.__schedule_asap(lease, duration, nexttime, earliest, allow_in_future = self.can_schedule_in_future())
110 632 borja
        elif lease.get_type() == Lease.ADVANCE_RESERVATION:
111 751 borja
            return self.__schedule_exact(lease, duration, nexttime, earliest)
112 632 borja
        elif lease.get_type() == Lease.IMMEDIATE:
113 751 borja
            return self.__schedule_asap(lease, duration, nexttime, earliest, allow_in_future = False)
114 683 borja
        elif lease.get_type() == Lease.DEADLINE:
115 751 borja
            return self.__schedule_deadline(lease, duration, nexttime, earliest, override_state)
116 632 borja
117
118
    def estimate_migration_time(self, lease):
119
        """ Estimates the time required to migrate a lease's VMs
120

121
        This function conservatively estimates that all the VMs are going to
122
        be migrated to other nodes. Since all the transfers are intra-node,
123
        the bottleneck is the transfer from whatever node has the most
124
        memory to transfer.
125

126
        Note that this method only estimates the time to migrate the memory
127
        state files for the VMs. Migrating the software environment (which may
128
        or may not be a disk image) is the responsibility of the preparation
129
        scheduler, which has it's own set of migration scheduling methods.
130

131
        Arguments:
132
        lease -- Lease that might be migrated
133
        """
134
        migration = get_config().get("migration")
135
        if migration == constants.MIGRATE_YES:
136 808 borja
            bandwidth = self.resourcepool.info.get_migration_bandwidth()
137 632 borja
            vmrr = lease.get_last_vmrr()
138 801 borja
            #mem_in_pnode = dict([(pnode,0) for pnode in set(vmrr.nodes.values())])
139 808 borja
            transfer_time = 0
140 658 borja
            for pnode in vmrr.nodes.values():
141 632 borja
                mem = vmrr.resources_in_pnode[pnode].get_by_type(constants.RES_MEM)
142 808 borja
                transfer_time += estimate_transfer_time(mem, bandwidth)
143
            return transfer_time
144 632 borja
        elif migration == constants.MIGRATE_YES_NOTRANSFER:
145
            return TimeDelta(seconds=0)
146
147
    def schedule_migration(self, lease, vmrr, nexttime):
148
        """ Schedules migrations for a lease
149

150
        Arguments:
151
        lease -- Lease being migrated
152
        vmrr -- The VM reservation before which the migration will take place
153
        nexttime -- The next time at which the scheduler can allocate resources.
154
        """
155
156 834 borja
        # Determine what migrations have to be done.
157 632 borja
        last_vmrr = lease.get_last_vmrr()
158
159 834 borja
        vnode_mappings = self.resourcepool.get_ram_image_mappings(lease)
160
        vnode_migrations = dict([(vnode, (vnode_mappings[vnode], vmrr.nodes[vnode])) for vnode in vmrr.nodes if vnode_mappings[vnode] != vmrr.nodes[vnode]])
161
162 632 borja
        # Determine if we actually have to migrate
163
        mustmigrate = False
164
        for vnode in vnode_migrations:
165
            if vnode_migrations[vnode][0] != vnode_migrations[vnode][1]:
166
                mustmigrate = True
167
                break
168
169
        if not mustmigrate:
170
            return []
171
172
        # If Haizea is configured to migrate without doing any transfers,
173
        # then we just return a nil-duration migration RR
174
        if get_config().get("migration") == constants.MIGRATE_YES_NOTRANSFER:
175
            start = nexttime
176
            end = nexttime
177
            res = {}
178
            migr_rr = MemImageMigrationResourceReservation(lease, start, end, res, vmrr, vnode_migrations)
179
            migr_rr.state = ResourceReservation.STATE_SCHEDULED
180
            return [migr_rr]
181
182
        # Figure out what migrations can be done simultaneously
183
        migrations = []
184
        while len(vnode_migrations) > 0:
185
            pnodes = set()
186
            migration = {}
187
            for vnode in vnode_migrations:
188
                origin = vnode_migrations[vnode][0]
189
                dest = vnode_migrations[vnode][1]
190
                if not origin in pnodes and not dest in pnodes:
191
                    migration[vnode] = vnode_migrations[vnode]
192
                    pnodes.add(origin)
193
                    pnodes.add(dest)
194
            for vnode in migration:
195
                del vnode_migrations[vnode]
196
            migrations.append(migration)
197
198
        # Create migration RRs
199
        start = max(last_vmrr.post_rrs[-1].end, nexttime)
200
        bandwidth = self.resourcepool.info.get_migration_bandwidth()
201
        migr_rrs = []
202
        for m in migrations:
203
            vnodes_to_migrate = m.keys()
204
            max_mem_to_migrate = max([lease.requested_resources[vnode].get_quantity(constants.RES_MEM) for vnode in vnodes_to_migrate])
205
            migr_time = estimate_transfer_time(max_mem_to_migrate, bandwidth)
206
            end = start + migr_time
207
            res = {}
208
            for (origin,dest) in m.values():
209
                resorigin = Capacity([constants.RES_NETOUT])
210
                resorigin.set_quantity(constants.RES_NETOUT, bandwidth)
211
                resdest = Capacity([constants.RES_NETIN])
212
                resdest.set_quantity(constants.RES_NETIN, bandwidth)
213
                res[origin] = self.slottable.create_resource_tuple_from_capacity(resorigin)
214
                res[dest] = self.slottable.create_resource_tuple_from_capacity(resdest)
215
            migr_rr = MemImageMigrationResourceReservation(lease, start, start + migr_time, res, vmrr, m)
216
            migr_rr.state = ResourceReservation.STATE_SCHEDULED
217
            migr_rrs.append(migr_rr)
218
            start = end
219
220
        return migr_rrs
221
222
    def cancel_vm(self, vmrr):
223
        """ Cancels a VM resource reservation
224

225
        Arguments:
226
        vmrr -- VM RR to be cancelled
227
        """
228
229
        # If this VM RR is part of a lease that was scheduled in the future,
230
        # remove that lease from the set of future leases.
231
        if vmrr.lease in self.future_leases:
232
            self.future_leases.remove(vmrr.lease)
233 647 borja
            get_persistence().persist_future_leases(self.future_leases)
234 632 borja
235 834 borja
        # If there are any pre-RRs that are scheduled or active, remove them
236 632 borja
        for rr in vmrr.pre_rrs:
237 834 borja
            if rr.state != ResourceReservation.STATE_DONE:
238 632 borja
                self.slottable.remove_reservation(rr)
239
240
        # If there are any post RRs, remove them
241
        for rr in vmrr.post_rrs:
242
            self.slottable.remove_reservation(rr)
243
244
        # Remove the reservation itself
245
        self.slottable.remove_reservation(vmrr)
246
247
248 770 borja
    def can_suspend_at(self, lease, t, nexttime=None):
249 632 borja
        """ Determines if it is possible to suspend a lease before a given time
250

251
        Arguments:
252
        vmrr -- VM RR to be preempted
253
        t -- Time by which the VM must be preempted
254 764 borja
        """
255
        vmrr = lease.get_vmrr_at(t)
256 778 borja
        if t < vmrr.start:
257
            return False # t could be during a resume
258
        by_t = min(vmrr.end, t) # t could be during a suspend
259 770 borja
        if nexttime == None:
260
            time_until_suspend = t - vmrr.start
261
        else:
262
            time_until_suspend = min( (t - vmrr.start, t - nexttime))
263 632 borja
        min_duration = self.__compute_scheduling_threshold(lease)
264 778 borja
        can_suspend = time_until_suspend >= min_duration
265 632 borja
        return can_suspend
266 778 borja
267 632 borja
268
    def preempt_vm(self, vmrr, t):
269
        """ Preempts a VM reservation at a given time
270

271
        This method assumes that the lease is, in fact, preemptable,
272
        that the VMs are running at the given time, and that there is
273
        enough time to suspend the VMs before the given time (all these
274
        checks are done in the lease scheduler).
275

276
        Arguments:
277
        vmrr -- VM RR to be preempted
278
        t -- Time by which the VM must be preempted
279
        """
280
281
        # Save original start and end time of the vmrr
282
        old_start = vmrr.start
283
        old_end = vmrr.end
284
285
        # Schedule the VM suspension
286
        self.__schedule_suspension(vmrr, t)
287
288
        # Update the VMRR in the slot table
289 752 borja
        self.slottable.update_reservation(vmrr, old_start, old_end)
290 632 borja
291
        # Add the suspension RRs to the VM's post-RRs
292
        for susprr in vmrr.post_rrs:
293
            self.slottable.add_reservation(susprr)
294
295
296
    def get_future_reschedulable_leases(self):
297
        """ Returns a list of future leases that are reschedulable.
298

299
        Currently, this list is just the best-effort leases scheduled
300
        in the future as determined by the backfilling algorithm.
301
        Advance reservation leases, by their nature, cannot be
302
        rescheduled to find a "better" starting time.
303
        """
304
        return list(self.future_leases)
305
306
307
    def can_schedule_in_future(self):
308
        """ Returns True if the backfilling algorithm would allow a lease
309
        to be scheduled in the future.
310

311
        """
312
        if self.max_in_future == -1: # Unlimited
313
            return True
314
        else:
315
            return len(self.future_leases) < self.max_in_future
316
317
318
    def get_utilization(self, time):
319
        """ Computes resource utilization (currently just CPU-based)
320 647 borja

321
        This utilization information shows what
322
        portion of the physical resources is used by each type of reservation
323
        (e.g., 70% are running a VM, 5% are doing suspensions, etc.)
324 632 borja

325
        Arguments:
326
        time -- Time at which to determine utilization
327
        """
328
        total = self.slottable.get_total_capacity(restype = constants.RES_CPU)
329
        util = {}
330
        reservations = self.slottable.get_reservations_at(time)
331
        for r in reservations:
332
            for node in r.resources_in_pnode:
333
                if isinstance(r, VMResourceReservation):
334
                    use = r.resources_in_pnode[node].get_by_type(constants.RES_CPU)
335
                    util[type(r)] = use + util.setdefault(type(r),0.0)
336
                elif isinstance(r, SuspensionResourceReservation) or isinstance(r, ResumptionResourceReservation) or isinstance(r, ShutdownResourceReservation):
337
                    use = r.vmrr.resources_in_pnode[node].get_by_type(constants.RES_CPU)
338 802 borja
                    util[type(r)] = 0.0
339
                    #util[type(r)] = use + util.setdefault(type(r),0.0)
340 632 borja
        util[None] = total - sum(util.values())
341 802 borja
342 676 borja
        if total != 0:
343
            for k in util:
344
                util[k] /= total
345 802 borja
346
        return util
347 632 borja
348
349 751 borja
    def __schedule_exact(self, lease, duration, nexttime, earliest):
350 632 borja
        """ Schedules VMs that must start at an exact time
351

352
        This type of lease is "easy" to schedule because we know the exact
353
        start time, which means that's the only starting time we have to
354
        check. So, this method does little more than call the mapper.
355

356
        Arguments:
357
        lease -- Lease to schedule
358
        nexttime -- The next time at which the scheduler can allocate resources.
359
        earliest -- The earliest possible starting times on each physical node
360
        """
361 751 borja
362 632 borja
        # Determine the start and end time
363 800 borja
        shutdown_time = lease.estimate_shutdown_time()
364 632 borja
        start = lease.start.requested
365 800 borja
        end = start + lease.duration.requested + shutdown_time
366 632 borja
367
        # Convert Capacity objects in lease object into ResourceTuples that
368
        # we can hand over to the mapper.
369
        requested_resources = dict([(k,self.slottable.create_resource_tuple_from_capacity(v)) for k,v in lease.requested_resources.items()])
370
371
        # Let the mapper do its magiv
372
        mapping, actualend, preemptions = self.mapper.map(lease,
373
                                                          requested_resources,
374
                                                          start,
375
                                                          end,
376 741 borja
                                                          strictend = True,
377
                                                          allow_preemption = True)
378 751 borja
379 632 borja
        # If no mapping was found, tell the lease scheduler about it
380
        if mapping == None:
381
            raise NotSchedulableException, "Not enough resources in specified interval"
382
383
        # Create VM resource reservations
384
        res = {}
385
386
        for (vnode,pnode) in mapping.items():
387
            vnode_res = requested_resources[vnode]
388
            if res.has_key(pnode):
389
                res[pnode].incr(vnode_res)
390
            else:
391
                res[pnode] = ResourceTuple.copy(vnode_res)
392
393
        vmrr = VMResourceReservation(lease, start, end, mapping, res)
394
        vmrr.state = ResourceReservation.STATE_SCHEDULED
395
396
        # Schedule shutdown for the VM
397
        self.__schedule_shutdown(vmrr)
398 751 borja
399 632 borja
        return vmrr, preemptions
400
401
402 751 borja
    def __schedule_asap(self, lease, duration, nexttime, earliest, allow_in_future = None, override_state = None):
403 632 borja
        """ Schedules VMs as soon as possible
404

405
        This method is a bit more complex that __schedule_exact because
406
        we need to figure out what "as soon as possible" actually is.
407
        This involves attempting several mappings, at different points
408
        in time, before we can schedule the lease.
409

410
        This method will always check, at least, if the lease can be scheduled
411
        at the earliest possible moment at which the lease could be prepared
412
        (e.g., if the lease can't start until 1 hour in the future because that's
413
        the earliest possible time at which the disk images it requires can
414
        be transferred, then that's when the scheduler will check). Note, however,
415
        that this "earliest possible moment" is determined by the preparation
416
        scheduler.
417

418
        Additionally, if the lease can't be scheduled at the earliest
419
        possible moment, it can also check if the lease can be scheduled
420
        in the future. This partially implements a backfilling algorithm
421
        (the maximum number of future leases is stored in the max_in_future
422
        attribute of VMScheduler), the other part being implemented in the
423
        __process_queue method of LeaseScheduler.
424

425
        Note that, if the method is allowed to scheduled in the future,
426
        and assuming that the lease doesn't request more resources than
427
        the site itself, this method will always schedule the VMs succesfully
428
        (since there's always an empty spot somewhere in the future).
429

430

431
        Arguments:
432
        lease -- Lease to schedule
433
        nexttime -- The next time at which the scheduler can allocate resources.
434
        earliest -- The earliest possible starting times on each physical node
435
        allow_in_future -- Boolean indicating whether the scheduler is
436
        allowed to schedule the VMs in the future.
437
        """
438
439
        #
440
        # STEP 1: PROLEGOMENA
441
        #
442
443
        lease_id = lease.id
444 751 borja
        remaining_duration = duration
445 741 borja
        shutdown_time = lease.estimate_shutdown_time()
446 632 borja
447 751 borja
        if override_state != None:
448
            state = override_state
449
        else:
450
            state = lease.get_state()
451
452 632 borja
        # We might be scheduling a suspended lease. If so, we will
453
        # also have to schedule its resumption. Right now, just
454
        # figure out if this is such a lease.
455 751 borja
        mustresume = (state in (Lease.STATE_SUSPENDED_PENDING, Lease.STATE_SUSPENDED_QUEUED, Lease.STATE_SUSPENDED_SCHEDULED))
456 632 borja
457
        # This is the minimum duration that we must be able to schedule.
458
        # See __compute_scheduling_threshold for more details.
459
        min_duration = self.__compute_scheduling_threshold(lease)
460
461
462
        #
463
        # STEP 2: FIND THE CHANGEPOINTS
464
        #
465
466
        # Find the changepoints, and the available nodes at each changepoint
467
        # We need to do this because the preparation scheduler may have
468
        # determined that some nodes might require more time to prepare
469
        # than others (e.g., if using disk image caching, some nodes
470
        # might have the required disk image predeployed, while others
471
        # may require transferring the image to that node).
472
        #
473
        # The end result of this step is a list (cps) where each entry
474
        # is a (t,nodes) pair, where "t" is the time of the changepoint
475
        # and "nodes" is the set of nodes that are available at that time.
476
477
        if not mustresume:
478
            # If this is not a suspended lease, then the changepoints
479
            # are determined based on the "earliest" parameter.
480
            cps = [(node, e.time) for node, e in earliest.items()]
481
            cps.sort(key=itemgetter(1))
482
            curcp = None
483
            changepoints = []
484
            nodes = []
485
            for node, time in cps:
486
                nodes.append(node)
487
                if time != curcp:
488
                    changepoints.append([time, set(nodes)])
489
                    curcp = time
490
                else:
491
                    changepoints[-1][1] = set(nodes)
492
        else:
493
            # If the lease is suspended, we take into account that, if
494
            # migration is disabled, we can only schedule the lease
495
            # on the nodes it is currently scheduled on.
496
            if get_config().get("migration") == constants.MIGRATE_NO:
497
                vmrr = lease.get_last_vmrr()
498
                onlynodes = set(vmrr.nodes.values())
499
            else:
500
                onlynodes = None
501
            changepoints = list(set([x.time for x in earliest.values()]))
502
            changepoints.sort()
503
            changepoints = [(x, onlynodes) for x in changepoints]
504
505
        # If we can schedule VMs in the future,
506
        # we also consider future changepoints
507
        if allow_in_future:
508
            res = self.slottable.get_reservations_ending_after(changepoints[-1][0])
509
            # We really only care about changepoints where VMs end (which is
510
            # when resources become available)
511
            futurecp = [r.get_final_end() for r in res if isinstance(r, VMResourceReservation)]
512
            # Corner case: Sometimes we're right in the middle of a ShutdownReservation, so it won't be
513
            # included in futurecp.
514
            futurecp += [r.end for r in res if isinstance(r, ShutdownResourceReservation) and not r.vmrr in res]
515
            if not mustresume:
516
                futurecp = [(p,None) for p in futurecp]
517
            else:
518
                futurecp = [(p,onlynodes) for p in futurecp]
519
        else:
520
            futurecp = []
521
522 775 borja
        futurecp.sort()
523 632 borja
524 780 borja
        if lease.deadline != None:
525 782 borja
            changepoints = [cp for cp in changepoints if cp[0] <= lease.deadline - duration]
526 780 borja
            futurecp = [cp for cp in futurecp if cp[0] <= lease.deadline - duration]
527
528 632 borja
        #
529
        # STEP 3: FIND A MAPPING
530
        #
531
532
        # In this step we find a starting time and a mapping for the VMs,
533
        # which involves going through the changepoints in order and seeing
534
        # if we can find a mapping.
535
        # Most of the work is done in the __find_fit_at_points
536
537
        # If resuming, we also have to allocate enough time for the resumption
538
        if mustresume:
539 741 borja
            duration = remaining_duration + lease.estimate_resume_time()
540 632 borja
        else:
541
            duration = remaining_duration
542
543
        duration += shutdown_time
544
545
        in_future = False
546
547
        # Convert Capacity objects in lease object into ResourceTuples that
548
        # we can hand over to the mapper.
549
        requested_resources = dict([(k,self.slottable.create_resource_tuple_from_capacity(v)) for k,v in lease.requested_resources.items()])
550
551
        # First, try to find a mapping assuming we can't schedule in the future
552
        start, end, mapping, preemptions = self.__find_fit_at_points(lease,
553
                                                                     requested_resources,
554
                                                                     changepoints,
555
                                                                     duration,
556 835 borja
                                                                     min_duration,
557
                                                                     shutdown_time)
558 632 borja
559
        if start == None and not allow_in_future:
560 658 borja
            # We did not find a suitable starting time. This can happen
561
            # if we're unable to schedule in the future
562
            raise NotSchedulableException, "Could not find enough resources for this request"
563 632 borja
564
        # If we haven't been able to fit the lease, check if we can
565
        # reserve it in the future
566
        if start == None and allow_in_future:
567
            start, end, mapping, preemptions = self.__find_fit_at_points(lease,
568
                                                                         requested_resources,
569
                                                                         futurecp,
570
                                                                         duration,
571 835 borja
                                                                         min_duration,
572
                                                                         shutdown_time
573 632 borja
                                                                         )
574
            # TODO: The following will also raise an exception if a lease
575
            # makes a request that could *never* be satisfied with the
576
            # current resources.
577
            if start == None:
578 780 borja
                if lease.deadline != None:
579
                    raise NotSchedulableException, "Could not find enough resources for this request before deadline"
580
                else:
581
                    raise InconsistentScheduleError, "Could not find a mapping in the future (this should not happen)"
582 632 borja
583
            in_future = True
584
585
        #
586
        # STEP 4: CREATE RESERVATIONS
587
        #
588
589
        # At this point, the lease is feasible. We just need to create
590
        # the reservations for the VMs and, possibly, for the VM resumption,
591
        # suspension, and shutdown.
592
593
        # VM resource reservation
594
        res = {}
595
596
        for (vnode,pnode) in mapping.items():
597
            vnode_res = requested_resources[vnode]
598
            if res.has_key(pnode):
599
                res[pnode].incr(vnode_res)
600
            else:
601
                res[pnode] = ResourceTuple.copy(vnode_res)
602
603
        vmrr = VMResourceReservation(lease, start, end, mapping, res)
604
        vmrr.state = ResourceReservation.STATE_SCHEDULED
605
606
        # VM resumption resource reservation
607
        if mustresume:
608
            self.__schedule_resumption(vmrr, start)
609
610
        # If the mapper couldn't find a mapping for the full duration
611
        # of the lease, then we need to schedule a suspension.
612 801 borja
        mustsuspend = (vmrr.end - vmrr.start) - shutdown_time < remaining_duration
613 632 borja
        if mustsuspend:
614
            self.__schedule_suspension(vmrr, end)
615
        else:
616
            # Compensate for any overestimation
617
            if (vmrr.end - vmrr.start) > remaining_duration + shutdown_time:
618
                vmrr.end = vmrr.start + remaining_duration + shutdown_time
619
            self.__schedule_shutdown(vmrr)
620
621
        if in_future:
622
            self.future_leases.add(lease)
623 647 borja
            get_persistence().persist_future_leases(self.future_leases)
624 632 borja
625
        susp_str = res_str = ""
626
        if mustresume:
627
            res_str = " (resuming)"
628
        if mustsuspend:
629
            susp_str = " (suspending)"
630 754 borja
        self.logger.info("Lease #%i can be scheduled on nodes %s from %s%s to %s%s" % (lease.id, mapping.values(), start, res_str, end, susp_str))
631 632 borja
632
        return vmrr, preemptions
633
634
635 751 borja
    def __schedule_deadline(self, lease, duration, nexttime, earliest, override_state):
636 700 borja
637 764 borja
        earliest_time = nexttime
638 735 borja
        for n in earliest:
639 751 borja
            earliest[n].time = max(lease.start.requested, earliest[n].time)
640 764 borja
            earliest_time = max(earliest_time, earliest[n].time)
641 735 borja
642 752 borja
        slack = (lease.deadline - lease.start.requested) / lease.duration.requested
643
        if slack <= 2.0:
644
            try:
645 754 borja
                self.logger.debug("Trying to schedule lease #%i as an advance reservation..." % lease.id)
646 752 borja
                vmrr, preemptions = self.__schedule_exact(lease, duration, nexttime, earliest)
647 784 borja
                # Don't return preemptions. They have already been preempted by the deadline mapper
648
                return vmrr, []
649 765 borja
            except NotSchedulableException:
650 754 borja
                self.logger.debug("Lease #%i cannot be scheduled as an advance reservation, trying as best-effort..." % lease.id)
651 782 borja
                try:
652
                    vmrr, preemptions = self.__schedule_asap(lease, duration, nexttime, earliest, allow_in_future = True, override_state=override_state)
653
                except NotSchedulableException:
654 765 borja
                    vmrr = None
655 782 borja
                    preemptions = []
656
                if vmrr == None or vmrr.end - vmrr.start != duration or vmrr.end > lease.deadline or len(preemptions)>0:
657 752 borja
                    self.logger.debug("Lease #%i cannot be scheduled before deadline using best-effort." % lease.id)
658 765 borja
                    #raise NotSchedulableException, "Could not schedule before deadline without making other leases miss deadline"
659 752 borja
                else:
660
                    return vmrr, preemptions
661
        else:
662 784 borja
            self.logger.debug("Trying to schedule lease #%i as best-effort..." % lease.id)
663 780 borja
            try:
664
                vmrr, preemptions = self.__schedule_asap(lease, duration, nexttime, earliest, allow_in_future = True, override_state=override_state)
665
            except NotSchedulableException:
666
                vmrr = None
667
                preemptions = []
668 751 borja
669 752 borja
        if vmrr == None or vmrr.end - vmrr.start != duration or vmrr.end > lease.deadline or len(preemptions)>0:
670 784 borja
            self.logger.debug("Trying to schedule lease #%i by rescheduling other leases..." % lease.id)
671 764 borja
            dirtynodes = set()
672
            dirtytime = earliest_time
673
674
            future_vmrrs = self.slottable.get_reservations_on_or_after(earliest_time)
675 775 borja
            future_vmrrs.sort(key=operator.attrgetter("start"))
676 762 borja
            future_vmrrs = [rr for rr in future_vmrrs
677
                            if isinstance(rr, VMResourceReservation)
678 767 borja
                            and rr.state == ResourceReservation.STATE_SCHEDULED
679
                            and reduce(operator.and_, [(prerr.state == ResourceReservation.STATE_SCHEDULED) for prerr in rr.pre_rrs], True)]
680
681
            leases = list(set([future_vmrr.lease for future_vmrr in future_vmrrs]))
682
683 784 borja
            self.slottable.push_state(leases)
684 735 borja
685 764 borja
            for future_vmrr in future_vmrrs:
686 778 borja
                #print "REMOVE", future_vmrr.lease.id, future_vmrr.start, future_vmrr.end
687 766 borja
                future_vmrr.lease.remove_vmrr(future_vmrr)
688 764 borja
                self.cancel_vm(future_vmrr)
689 735 borja
690 764 borja
            orig_vmrrs = dict([(l,[rr for rr in future_vmrrs if rr.lease == l]) for l in leases])
691
692 735 borja
            leases.append(lease)
693 764 borja
            leases.sort(key= lambda l: (l.deadline - earliest_time) / l.get_remaining_duration_at(nexttime))
694 735 borja
695
            new_vmrrs = {}
696
697
            self.logger.debug("Attempting to reschedule leases %s" % [l.id for l in leases])
698
699 764 borja
            # First pass
700
            scheduled = set()
701 735 borja
            for lease2 in leases:
702 767 borja
703
                last_vmrr = lease2.get_last_vmrr()
704
                if last_vmrr != None and last_vmrr.is_suspending():
705
                    override_state = Lease.STATE_SUSPENDED_PENDING
706 769 borja
                    l_earliest_time = max(last_vmrr.post_rrs[-1].end, earliest_time)
707 767 borja
                else:
708
                    override_state = None
709
                    l_earliest_time = earliest_time
710
711 735 borja
                for n in earliest:
712 785 borja
                    earliest[n].time = max(lease2.start.requested, l_earliest_time)
713 767 borja
714 735 borja
                self.logger.debug("Rescheduling lease %s" % lease2.id)
715 767 borja
                dur = lease2.get_remaining_duration_at(l_earliest_time)
716 782 borja
717
                try:
718
                    vmrr, preemptions = self.__schedule_asap(lease2, dur, nexttime, earliest, allow_in_future = True, override_state=override_state)
719
                except NotSchedulableException:
720
                    vmrr = None
721
                    preemptions = []
722
                if vmrr == None or vmrr.end - vmrr.start != dur or vmrr.end > lease2.deadline or len(preemptions) != 0:
723 735 borja
                    self.logger.debug("Lease %s could not be rescheduled, undoing changes." % lease2.id)
724 784 borja
                    self.slottable.pop_state()
725 751 borja
726 735 borja
                    raise NotSchedulableException, "Could not schedule before deadline without making other leases miss deadline"
727 751 borja
728 764 borja
                dirtytime = max(vmrr.end, dirtytime)
729
                dirtynodes.update(vmrr.resources_in_pnode.keys())
730
731 735 borja
                for rr in vmrr.pre_rrs:
732
                    self.slottable.add_reservation(rr)
733
                self.slottable.add_reservation(vmrr)
734
                for rr in vmrr.post_rrs:
735
                    self.slottable.add_reservation(rr)
736 764 borja
                scheduled.add(lease2)
737 735 borja
                if lease2 == lease:
738
                    return_vmrr = vmrr
739 764 borja
                    break
740 735 borja
                else:
741
                    new_vmrrs[lease2] = vmrr
742 764 borja
743
            # We've scheduled the lease. Now we try to schedule the rest of the leases but,
744
            # since now we know the nodes the new lease is in, we can do a few optimizations
745
            # Restore the leases in nodes we haven't used, and that would not be
746
            # affected by the new lease. We need to find what this set of nodes is.
747
748
            to_schedule = [l for l in leases if l not in scheduled]
749
            dirtynodes, cleanleases = self.find_dirty_nodes(to_schedule, dirtynodes, orig_vmrrs)
750 775 borja
            self.logger.debug("Rescheduling only leases on nodes %s" % dirtynodes)
751
            self.logger.debug("Leases %s can be skipped" % [l.id for l in cleanleases])
752 764 borja
753
            # Restore the leases
754
            restored_leases = set()
755
            for l in leases:
756
                if l in cleanleases:
757
                    # Restore
758
                    for l_vmrr in orig_vmrrs[l]:
759
                        for rr in l_vmrr.pre_rrs:
760
                            self.slottable.add_reservation(rr)
761
                        self.slottable.add_reservation(l_vmrr)
762
                        for rr in l_vmrr.post_rrs:
763
                            self.slottable.add_reservation(rr)
764 768 borja
                        l.append_vmrr(l_vmrr)
765 764 borja
                        scheduled.add(l)
766
                        restored_leases.add(l)
767
768
            to_schedule = [l for l in leases if l not in scheduled]
769
            try:
770
                (more_scheduled, add_vmrrs, dirtytime) = self.reschedule_deadline_leases(to_schedule, orig_vmrrs, earliest_time, earliest, nexttime, dirtytime)
771
                scheduled.update(more_scheduled)
772
                new_vmrrs.update(add_vmrrs)
773
            except NotSchedulableException:
774
                self.logger.debug("Lease %s could not be rescheduled, undoing changes." % l.id)
775 784 borja
                self.slottable.pop_state()
776 764 borja
                raise
777
778 784 borja
            self.slottable.pop_state(discard=True)
779
780 764 borja
            for l in leases:
781
                if l not in scheduled:
782
                    for l_vmrr in orig_vmrrs[l]:
783
                        for rr in l_vmrr.pre_rrs:
784
                            self.slottable.add_reservation(rr)
785
                        self.slottable.add_reservation(l_vmrr)
786
                        for rr in l_vmrr.post_rrs:
787 768 borja
                            self.slottable.add_reservation(rr)
788
                        l.append_vmrr(l_vmrr)
789 764 borja
                        restored_leases.add(l)
790
791 735 borja
            for lease2, vmrr in new_vmrrs.items():
792
                lease2.append_vmrr(vmrr)
793
794 764 borja
            # Remove from slottable, because lease_scheduler is the one that actually
795
            # adds the RRs
796 735 borja
            for rr in return_vmrr.pre_rrs:
797
                self.slottable.remove_reservation(rr)
798
            self.slottable.remove_reservation(return_vmrr)
799
            for rr in return_vmrr.post_rrs:
800
                self.slottable.remove_reservation(rr)
801
802 766 borja
            for l in leases:
803
                if l in scheduled:
804
                    self.logger.vdebug("Lease %i after rescheduling:" % l.id)
805
                    l.print_contents()
806 784 borja
807 735 borja
            return return_vmrr, []
808 700 borja
        else:
809
            return vmrr, preemptions
810
811 764 borja
812
    def find_dirty_nodes(self, to_schedule, dirtynodes, orig_vmrrs):
813
        dirtynodes = set(dirtynodes)
814
        done = False
815
        while not done:
816
            stable = True
817
            cleanleases = set()
818
            for l in to_schedule:
819
                pnodes = set()
820
                for l_vmrr in orig_vmrrs[l]:
821
                    pnodes.update(l_vmrr.resources_in_pnode.keys())
822
                in_dirty = dirtynodes & pnodes
823
                in_clean = pnodes - dirtynodes
824
                if len(in_dirty) > 0 and len(in_clean) > 0:
825
                    stable = False
826
                    dirtynodes.update(in_clean)
827
                if len(in_clean) > 0 and len(in_dirty) == 0:
828
                    cleanleases.add(l)
829
            if stable == True:
830
                done = True
831
832
        return dirtynodes, cleanleases
833
834
835
    def reschedule_deadline_leases(self, leases, orig_vmrrs, earliest_time, earliest, nexttime, dirtytime):
836
        scheduled = set()
837
        new_vmrrs = {}
838
        for l in leases:
839
            if len(scheduled) < len(leases) and dirtytime != None:
840
                min_future_start = min([min([rr.start for rr in lrr]) for l2, lrr in orig_vmrrs.items() if l2 in leases and l2 not in scheduled])
841
                if min_future_start > dirtytime:
842
                    break
843 766 borja
844
            last_vmrr = l.get_last_vmrr()
845
            if last_vmrr != None and last_vmrr.is_suspending():
846
                override_state = Lease.STATE_SUSPENDED_PENDING
847 769 borja
                l_earliest_time = max(last_vmrr.post_rrs[-1].end, earliest_time)
848 766 borja
            else:
849
                override_state = None
850
                l_earliest_time = earliest_time
851
852 764 borja
            for n in earliest:
853 785 borja
                earliest[n].time = max(l.start.requested, l_earliest_time)
854 766 borja
855 764 borja
            self.logger.debug("Rescheduling lease %s" % l.id)
856 766 borja
            dur = l.get_remaining_duration_at(l_earliest_time)
857 782 borja
858
            try:
859
                vmrr, preemptions = self.__schedule_asap(l, dur, nexttime, earliest, allow_in_future = True, override_state=override_state)
860
            except NotSchedulableException:
861
                vmrr = None
862
                preemptions = []
863
864
            if vmrr == None or vmrr.end - vmrr.start != dur or vmrr.end > l.deadline or len(preemptions) != 0:
865 764 borja
                raise NotSchedulableException, "Could not schedule before deadline without making other leases miss deadline"
866
867
            if dirtytime != None:
868
                dirtytime = max(vmrr.end, dirtytime)
869
870
            for rr in vmrr.pre_rrs:
871
                self.slottable.add_reservation(rr)
872
            self.slottable.add_reservation(vmrr)
873
            for rr in vmrr.post_rrs:
874
                self.slottable.add_reservation(rr)
875
            new_vmrrs[l] = vmrr
876
            scheduled.add(l)
877
878
        return scheduled, new_vmrrs, dirtytime
879
880
881 752 borja
    def reschedule_deadline(self, lease, duration, nexttime, earliest, override_state = None):
882
        for n in earliest:
883
            earliest[n].time = max(lease.start.requested, earliest[n].time)
884 782 borja
885
        try:
886
            vmrr, preemptions = self.__schedule_asap(lease, duration, nexttime, earliest, allow_in_future = True, override_state=override_state)
887
        except NotSchedulableException:
888
            vmrr = None
889
            preemptions = []
890
891 752 borja
        if vmrr == None or vmrr.end - vmrr.start != duration or vmrr.end > lease.deadline or len(preemptions)>0:
892
            self.logger.debug("Lease #%i cannot be scheduled before deadline using best-effort." % lease.id)
893
            raise NotSchedulableException, "Could not schedule before deadline without making other leases miss deadline"
894
        else:
895
            return vmrr, preemptions
896
897
898 735 borja
899 835 borja
    def __find_fit_at_points(self, lease, requested_resources, changepoints, duration, min_duration, shutdown_time):
900 632 borja
        """ Tries to map a lease in a given list of points in time
901

902
        This method goes through a given list of points in time and tries
903
        to find the earliest time at which that lease can be allocated
904
        resources.
905

906
        Arguments:
907
        lease -- Lease to schedule
908
        requested_resources -- A dictionary of lease node -> ResourceTuple.
909
        changepoints -- The list of changepoints
910
        duration -- The amount of time requested
911
        min_duration -- The minimum amount of time that should be allocated
912

913
        Returns:
914
        start -- The time at which resources have been found for the lease
915
        actualend -- The time at which the resources won't be available. Note
916
        that this is not necessarily (start + duration) since the mapper
917
        might be unable to find enough resources for the full requested duration.
918
        mapping -- A mapping of lease nodes to physical nodes
919
        preemptions -- A list of
920
        (if no mapping is found, all these values are set to None)
921
        """
922
        found = False
923
924
        for time, onlynodes in changepoints:
925
            start = time
926
            end = start + duration
927
            self.logger.debug("Attempting to map from %s to %s" % (start, end))
928
929
            # If suspension is disabled, we will only accept mappings that go
930
            # from "start" strictly until "end".
931
            susptype = get_config().get("suspension")
932 741 borja
            # TODO: Remove the "if is deadline lease" condition and replace it
933
            # with something cleaner
934
            if susptype == constants.SUSPENSION_NONE or (lease.numnodes > 1 and susptype == constants.SUSPENSION_SERIAL) or lease.get_type() == Lease.DEADLINE:
935 632 borja
                strictend = True
936
            else:
937
                strictend = False
938
939
            # Let the mapper work its magic
940
            mapping, actualend, preemptions = self.mapper.map(lease,
941
                                                              requested_resources,
942
                                                              start,
943
                                                              end,
944
                                                              strictend = strictend,
945 741 borja
                                                              onlynodes = onlynodes,
946
                                                              allow_preemption = False)
947 700 borja
948 632 borja
            # We have a mapping; we still have to check if it satisfies
949
            # the minimum duration.
950
            if mapping != None:
951
                if actualend < end:
952
                    actualduration = actualend - start
953
                    if actualduration >= min_duration:
954 835 borja
                        if duration - shutdown_time >= actualduration:
955
                            self.logger.debug("This lease can be scheduled from %s to %s (will require suspension)" % (start, actualend))
956
                            found = True
957
                            break
958
                        else:
959
                            self.logger.debug("This lease requires suspension, but doing so would involve 'suspending' the shutdown.")
960 632 borja
                    else:
961
                        self.logger.debug("This starting time does not allow for the requested minimum duration (%s < %s)" % (actualduration, min_duration))
962
                else:
963
                    self.logger.debug("This lease can be scheduled from %s to %s (full duration)" % (start, end))
964
                    found = True
965
                    break
966
967
        if found:
968
            return start, actualend, mapping, preemptions
969
        else:
970
            return None, None, None, None
971
972
973
    def __compute_susprem_times(self, vmrr, time, direction, exclusion, rate, override = None):
974
        """ Computes the times at which suspend/resume operations would have to start
975

976
        When suspending or resuming a VM, the VM's memory is dumped to a
977
        file on disk. To correctly estimate the time required to suspend
978
        a lease with multiple VMs, Haizea makes sure that no two
979
        suspensions/resumptions happen at the same time (e.g., if eight
980
        memory files were being saved at the same time to disk, the disk's
981
        performance would be reduced in a way that is not as easy to estimate
982
        as if only one file were being saved at a time). Based on a number
983
        of parameters, this method estimates the times at which the
984
        suspend/resume commands would have to be sent to guarantee this
985
        exclusion.
986

987
        Arguments:
988
        vmrr -- The VM reservation that will be suspended/resumed
989
        time -- The time at which the suspend should end or the resume should start.
990
        direction -- DIRECTION_BACKWARD: start at "time" and compute the times going
991
        backward (for suspensions) DIRECTION_FORWARD: start at time "time" and compute
992
        the times going forward.
993
        exclusion -- SUSPRES_EXCLUSION_GLOBAL (memory is saved to global filesystem)
994
        or SUSPRES_EXCLUSION_LOCAL (saved to local filesystem)
995
        rate -- The rate at which an individual VM is suspended/resumed
996
        override -- If specified, then instead of computing the time to
997
        suspend/resume VM based on its memory and the "rate" parameter,
998
        use this override value.
999

1000
        """
1001
        times = [] # (start, end, {pnode -> vnodes})
1002
        enactment_overhead = get_config().get("enactment-overhead")
1003
1004 788 borja
        if override != None:
1005
            override = TimeDelta(seconds=override)
1006
1007 632 borja
        if exclusion == constants.SUSPRES_EXCLUSION_GLOBAL:
1008
            # Global exclusion (which represents, e.g., reading/writing the memory image files
1009
            # from a global file system) meaning no two suspensions/resumptions can happen at
1010
            # the same time in the entire resource pool.
1011
1012
            t = time
1013
            t_prev = None
1014
1015
            for (vnode,pnode) in vmrr.nodes.items():
1016
                if override == None:
1017 677 borja
                    mem = vmrr.lease.requested_resources[vnode].get_quantity(constants.RES_MEM)
1018 741 borja
                    op_time = compute_suspend_resume_time(mem, rate)
1019 632 borja
                else:
1020
                    op_time = override
1021
1022
                op_time += enactment_overhead
1023
1024
                t_prev = t
1025
1026
                if direction == constants.DIRECTION_FORWARD:
1027
                    t += op_time
1028
                    times.append((t_prev, t, {pnode:[vnode]}))
1029
                elif direction == constants.DIRECTION_BACKWARD:
1030
                    t -= op_time
1031
                    times.append((t, t_prev, {pnode:[vnode]}))
1032
1033
        elif exclusion == constants.SUSPRES_EXCLUSION_LOCAL:
1034
            # Local exclusion (which represents, e.g., reading the memory image files
1035
            # from a local file system) means no two resumptions can happen at the same
1036
            # time in the same physical node.
1037
            pervnode_times = [] # (start, end, vnode)
1038
            vnodes_in_pnode = {}
1039
            for (vnode,pnode) in vmrr.nodes.items():
1040
                vnodes_in_pnode.setdefault(pnode, []).append(vnode)
1041
            for pnode in vnodes_in_pnode:
1042
                t = time
1043
                t_prev = None
1044
                for vnode in vnodes_in_pnode[pnode]:
1045
                    if override == None:
1046
                        mem = vmrr.lease.requested_resources[vnode].get_quantity(constants.RES_MEM)
1047 741 borja
                        op_time = compute_suspend_resume_time(mem, rate)
1048 632 borja
                    else:
1049
                        op_time = override
1050
1051
                    t_prev = t
1052
1053
                    if direction == constants.DIRECTION_FORWARD:
1054
                        t += op_time
1055
                        pervnode_times.append((t_prev, t, vnode))
1056
                    elif direction == constants.DIRECTION_BACKWARD:
1057
                        t -= op_time
1058
                        pervnode_times.append((t, t_prev, vnode))
1059
1060
            # Consolidate suspend/resume operations happening at the same time
1061
            uniq_times = set([(start, end) for (start, end, vnode) in pervnode_times])
1062
            for (start, end) in uniq_times:
1063
                vnodes = [x[2] for x in pervnode_times if x[0] == start and x[1] == end]
1064
                node_mappings = {}
1065
                for vnode in vnodes:
1066
                    pnode = vmrr.nodes[vnode]
1067
                    node_mappings.setdefault(pnode, []).append(vnode)
1068
                times.append([start,end,node_mappings])
1069
1070
            # Add the enactment overhead
1071
            for t in times:
1072
                num_vnodes = sum([len(vnodes) for vnodes in t[2].values()])
1073
                overhead = TimeDelta(seconds = num_vnodes * enactment_overhead)
1074
                if direction == constants.DIRECTION_FORWARD:
1075
                    t[1] += overhead
1076
                elif direction == constants.DIRECTION_BACKWARD:
1077
                    t[0] -= overhead
1078
1079
            # Fix overlaps
1080
            if direction == constants.DIRECTION_FORWARD:
1081
                times.sort(key=itemgetter(0))
1082
            elif direction == constants.DIRECTION_BACKWARD:
1083
                times.sort(key=itemgetter(1))
1084
                times.reverse()
1085
1086
            prev_start = None
1087
            prev_end = None
1088
            for t in times:
1089
                if prev_start != None:
1090
                    start = t[0]
1091
                    end = t[1]
1092
                    if direction == constants.DIRECTION_FORWARD:
1093
                        if start < prev_end:
1094
                            diff = prev_end - start
1095
                            t[0] += diff
1096
                            t[1] += diff
1097
                    elif direction == constants.DIRECTION_BACKWARD:
1098
                        if end > prev_start:
1099
                            diff = end - prev_start
1100
                            t[0] -= diff
1101
                            t[1] -= diff
1102
                prev_start = t[0]
1103
                prev_end = t[1]
1104
1105
        return times
1106
1107
1108
    def __schedule_shutdown(self, vmrr):
1109
        """ Schedules the shutdown of a VM reservation
1110

1111
        Arguments:
1112
        vmrr -- The VM reservation that will be shutdown
1113

1114
        """
1115 741 borja
        shutdown_time = vmrr.lease.estimate_shutdown_time()
1116 632 borja
1117
        start = vmrr.end - shutdown_time
1118
        end = vmrr.end
1119
1120
        shutdown_rr = ShutdownResourceReservation(vmrr.lease, start, end, vmrr.resources_in_pnode, vmrr.nodes, vmrr)
1121
        shutdown_rr.state = ResourceReservation.STATE_SCHEDULED
1122
1123
        vmrr.update_end(start)
1124
1125
        # If there are any post RRs, remove them
1126
        for rr in vmrr.post_rrs:
1127
            self.slottable.remove_reservation(rr)
1128
        vmrr.post_rrs = []
1129
1130
        vmrr.post_rrs.append(shutdown_rr)
1131
1132
1133
    def __schedule_suspension(self, vmrr, suspend_by):
1134
        """ Schedules the suspension of a VM reservation
1135

1136
        Most of the work is done in __compute_susprem_times. See that
1137
        method's documentation for more details.
1138

1139
        Arguments:
1140
        vmrr -- The VM reservation that will be suspended
1141
        suspend_by -- The time by which the VMs should be suspended.
1142

1143
        """
1144
        config = get_config()
1145
        susp_exclusion = config.get("suspendresume-exclusion")
1146
        override = get_config().get("override-suspend-time")
1147
        rate = config.get("suspend-rate")
1148
1149
        if suspend_by < vmrr.start or suspend_by > vmrr.end:
1150
            raise InconsistentScheduleError, "Tried to schedule a suspension by %s, which is outside the VMRR's duration (%s-%s)" % (suspend_by, vmrr.start, vmrr.end)
1151
1152
        # Find the suspension times
1153
        times = self.__compute_susprem_times(vmrr, suspend_by, constants.DIRECTION_BACKWARD, susp_exclusion, rate, override)
1154
1155
        # Create the suspension resource reservations
1156
        suspend_rrs = []
1157
        for (start, end, node_mappings) in times:
1158
            suspres = {}
1159
            all_vnodes = []
1160
            for (pnode,vnodes) in node_mappings.items():
1161
                num_vnodes = len(vnodes)
1162 837 borja
                r = Capacity([constants.RES_CPU, constants.RES_MEM,constants.RES_DISK])
1163 632 borja
                mem = 0
1164
                for vnode in vnodes:
1165
                    mem += vmrr.lease.requested_resources[vnode].get_quantity(constants.RES_MEM)
1166 837 borja
                r.set_ninstances(constants.RES_CPU, num_vnodes)
1167
                for i in xrange(num_vnodes):
1168
                    r.set_quantity_instance(constants.RES_CPU, i +1, 100)
1169 632 borja
                r.set_quantity(constants.RES_MEM, mem * num_vnodes)
1170
                r.set_quantity(constants.RES_DISK, mem * num_vnodes)
1171
                suspres[pnode] = self.slottable.create_resource_tuple_from_capacity(r)
1172
                all_vnodes += vnodes
1173
1174
            susprr = SuspensionResourceReservation(vmrr.lease, start, end, suspres, all_vnodes, vmrr)
1175
            susprr.state = ResourceReservation.STATE_SCHEDULED
1176
            suspend_rrs.append(susprr)
1177
1178
        suspend_rrs.sort(key=attrgetter("start"))
1179
1180
        susp_start = suspend_rrs[0].start
1181
        if susp_start < vmrr.start:
1182
            raise InconsistentScheduleError, "Determined suspension should start at %s, before the VMRR's start (%s) -- Suspend time not being properly estimated?" % (susp_start, vmrr.start)
1183
1184
        vmrr.update_end(susp_start)
1185
1186
        # If there are any post RRs, remove them
1187 752 borja
        for rr in vmrr.post_rrs:
1188
            self.slottable.remove_reservation(rr)
1189 632 borja
        vmrr.post_rrs = []
1190
1191
        # Add the suspension RRs to the VM RR
1192
        for susprr in suspend_rrs:
1193
            vmrr.post_rrs.append(susprr)
1194
1195
1196
    def __schedule_resumption(self, vmrr, resume_at):
1197
        """ Schedules the resumption of a VM reservation
1198

1199
        Most of the work is done in __compute_susprem_times. See that
1200
        method's documentation for more details.
1201

1202
        Arguments:
1203
        vmrr -- The VM reservation that will be resumed
1204
        resume_at -- The time at which the resumption should start
1205

1206
        """
1207
        config = get_config()
1208
        resm_exclusion = config.get("suspendresume-exclusion")
1209
        override = get_config().get("override-resume-time")
1210
        rate = config.get("resume-rate")
1211
1212
        if resume_at < vmrr.start or resume_at > vmrr.end:
1213
            raise InconsistentScheduleError, "Tried to schedule a resumption at %s, which is outside the VMRR's duration (%s-%s)" % (resume_at, vmrr.start, vmrr.end)
1214
1215
        # Find the resumption times
1216
        times = self.__compute_susprem_times(vmrr, resume_at, constants.DIRECTION_FORWARD, resm_exclusion, rate, override)
1217
1218
        # Create the resumption resource reservations
1219
        resume_rrs = []
1220
        for (start, end, node_mappings) in times:
1221
            resmres = {}
1222
            all_vnodes = []
1223
            for (pnode,vnodes) in node_mappings.items():
1224
                num_vnodes = len(vnodes)
1225 837 borja
                r = Capacity([constants.RES_CPU, constants.RES_MEM, constants.RES_DISK])
1226 632 borja
                mem = 0
1227
                for vnode in vnodes:
1228
                    mem += vmrr.lease.requested_resources[vnode].get_quantity(constants.RES_MEM)
1229 837 borja
                r.set_ninstances(constants.RES_CPU, num_vnodes)
1230
                for i in xrange(num_vnodes):
1231
                    r.set_quantity_instance(constants.RES_CPU, i +1, 100)
1232 632 borja
                r.set_quantity(constants.RES_MEM, mem * num_vnodes)
1233
                r.set_quantity(constants.RES_DISK, mem * num_vnodes)
1234
                resmres[pnode] = self.slottable.create_resource_tuple_from_capacity(r)
1235
                all_vnodes += vnodes
1236
            resmrr = ResumptionResourceReservation(vmrr.lease, start, end, resmres, all_vnodes, vmrr)
1237
            resmrr.state = ResourceReservation.STATE_SCHEDULED
1238
            resume_rrs.append(resmrr)
1239
1240
        resume_rrs.sort(key=attrgetter("start"))
1241
1242
        resm_end = resume_rrs[-1].end
1243
        if resm_end > vmrr.end:
1244
            raise InconsistentScheduleError, "Determined resumption would end at %s, after the VMRR's end (%s) -- Resume time not being properly estimated?" % (resm_end, vmrr.end)
1245
1246
        vmrr.update_start(resm_end)
1247
1248
        # Add the resumption RRs to the VM RR
1249
        for resmrr in resume_rrs:
1250
            vmrr.pre_rrs.append(resmrr)
1251
1252
1253
    def __compute_scheduling_threshold(self, lease):
1254
        """ Compute the scheduling threshold (the 'minimum duration') of a lease
1255

1256
        To avoid thrashing, Haizea will not schedule a lease unless all overheads
1257
        can be correctly scheduled (which includes image transfers, suspensions, etc.).
1258
        However, this can still result in situations where a lease is prepared,
1259
        and then immediately suspended because of a blocking lease in the future.
1260
        The scheduling threshold is used to specify that a lease must
1261
        not be scheduled unless it is guaranteed to run for a minimum amount of
1262
        time (the rationale behind this is that you ideally don't want leases
1263
        to be scheduled if they're not going to be active for at least as much time
1264
        as was spent in overheads).
1265

1266
        An important part of computing this value is the "scheduling threshold factor".
1267
        The default value is 1, meaning that the lease will be active for at least
1268
        as much time T as was spent on overheads (e.g., if preparing the lease requires
1269
        60 seconds, and we know that it will have to be suspended, requiring 30 seconds,
1270
        Haizea won't schedule the lease unless it can run for at least 90 minutes).
1271
        In other words, a scheduling factor of F required a minimum duration of
1272
        F*T. A value of 0 could lead to thrashing, since Haizea could end up with
1273
        situations where a lease starts and immediately gets suspended.
1274

1275
        Arguments:
1276
        lease -- Lease for which we want to find the scheduling threshold
1277
        """
1278
        # TODO: Take into account other things like boot overhead, migration overhead, etc.
1279
        config = get_config()
1280
        threshold = config.get("force-scheduling-threshold")
1281
        if threshold != None:
1282
            # If there is a hard-coded threshold, use that
1283
            return threshold
1284 849 borja
1285
        if config.get("suspension") != constants.SUSPENSION_NONE:
1286 632 borja
            factor = config.get("scheduling-threshold-factor")
1287
1288
            # First, figure out the "safe duration" (the minimum duration
1289
            # so that we at least allocate enough time for all the
1290
            # overheads).
1291 741 borja
            susp_overhead = lease.estimate_suspend_time()
1292 632 borja
            safe_duration = susp_overhead
1293
1294
            if lease.get_state() == Lease.STATE_SUSPENDED_QUEUED:
1295 741 borja
                resm_overhead = lease.estimate_resume_time()
1296 632 borja
                safe_duration += resm_overhead
1297 849 borja
        else:
1298
            safe_duration = 0
1299 632 borja
1300 849 borja
        # TODO: Incorporate other overheads into the minimum duration
1301
        min_duration = safe_duration
1302
1303
        # At the very least, we want to allocate enough time for the
1304
        # safe duration (otherwise, we'll end up with incorrect schedules,
1305
        # where a lease is scheduled to suspend, but isn't even allocated
1306
        # enough time to suspend).
1307
        # The factor is assumed to be non-negative. i.e., a factor of 0
1308
        # means we only allocate enough time for potential suspend/resume
1309
        # operations, while a factor of 1 means the lease will get as much
1310
        # running time as spend on the runtime overheads involved in setting
1311
        # it up
1312
        threshold = safe_duration + (min_duration * factor)
1313
        return threshold
1314 632 borja
1315
1316
    #-------------------------------------------------------------------#
1317
    #                                                                   #
1318
    #                  SLOT TABLE EVENT HANDLERS                        #
1319
    #                                                                   #
1320
    #-------------------------------------------------------------------#
1321
1322
    def _handle_start_vm(self, l, rr):
1323
        """ Handles the start of a VMResourceReservation
1324

1325
        Arguments:
1326
        l -- Lease the VMResourceReservation belongs to
1327
        rr -- THe VMResourceReservation
1328
        """
1329
        self.logger.debug("LEASE-%i Start of handleStartVM" % l.id)
1330
        l.print_contents()
1331 800 borja
1332 632 borja
        lease_state = l.get_state()
1333 800 borja
1334 807 borja
        if get_config().get("lease-preparation") == "imagetransfer":
1335
            if not self.resourcepool.verify_deploy(l, rr):
1336 834 borja
                self.logger.error("Deployment of lease %i was not complete." % l.id)
1337 807 borja
                raise # TODO raise something better
1338 800 borja
1339
        # Kludge: Should be done by the preparations scheduler
1340
        if l.get_state() == Lease.STATE_SCHEDULED:
1341
            # Piggybacking
1342
            l.set_state(Lease.STATE_READY)
1343
            lease_state = l.get_state()
1344
1345 632 borja
        if lease_state == Lease.STATE_READY:
1346
            l.set_state(Lease.STATE_ACTIVE)
1347
            rr.state = ResourceReservation.STATE_ACTIVE
1348
            now_time = get_clock().get_time()
1349
            l.start.actual = now_time
1350
1351
            try:
1352
                self.resourcepool.start_vms(l, rr)
1353
            except EnactmentError, exc:
1354
                self.logger.error("Enactment error when starting VMs.")
1355
                # Right now, this is a non-recoverable error, so we just
1356
                # propagate it upwards to the lease scheduler
1357
                # In the future, it may be possible to react to these
1358
                # kind of errors.
1359
                raise
1360
1361
        elif lease_state == Lease.STATE_RESUMED_READY:
1362
            l.set_state(Lease.STATE_ACTIVE)
1363
            rr.state = ResourceReservation.STATE_ACTIVE
1364
            # No enactment to do here, since all the suspend/resume actions are
1365
            # handled during the suspend/resume RRs
1366
        else:
1367
            raise InconsistentLeaseStateError(l, doing = "starting a VM")
1368
1369
        # If this was a future reservation (as determined by backfilling),
1370
        # remove that status, since the future is now.
1371
        if rr.lease in self.future_leases:
1372
            self.future_leases.remove(l)
1373 647 borja
            get_persistence().persist_future_leases(self.future_leases)
1374 632 borja
1375
        l.print_contents()
1376
        self.logger.debug("LEASE-%i End of handleStartVM" % l.id)
1377
        self.logger.info("Started VMs for lease %i on nodes %s" % (l.id, rr.nodes.values()))
1378
1379
1380
    def _handle_end_vm(self, l, rr):
1381
        """ Handles the end of a VMResourceReservation
1382

1383
        Arguments:
1384
        l -- Lease the VMResourceReservation belongs to
1385
        rr -- THe VMResourceReservation
1386
        """
1387
        self.logger.debug("LEASE-%i Start of handleEndVM" % l.id)
1388
        self.logger.vdebug("LEASE-%i Before:" % l.id)
1389
        l.print_contents()
1390
        now_time = round_datetime(get_clock().get_time())
1391
        diff = now_time - rr.start
1392
        l.duration.accumulate_duration(diff)
1393
        rr.state = ResourceReservation.STATE_DONE
1394
1395
        self.logger.vdebug("LEASE-%i After:" % l.id)
1396
        l.print_contents()
1397
        self.logger.debug("LEASE-%i End of handleEndVM" % l.id)
1398
        self.logger.info("Stopped VMs for lease %i on nodes %s" % (l.id, rr.nodes.values()))
1399
1400
1401
    def _handle_unscheduled_end_vm(self, l, vmrr):
1402
        """ Handles the unexpected end of a VMResourceReservation
1403

1404
        Arguments:
1405
        l -- Lease the VMResourceReservation belongs to
1406
        rr -- THe VMResourceReservation
1407 762 borja
        """
1408 632 borja
1409
        self.logger.info("LEASE-%i The VM has ended prematurely." % l.id)
1410
        for rr in vmrr.post_rrs:
1411
            self.slottable.remove_reservation(rr)
1412
        vmrr.post_rrs = []
1413
        vmrr.end = get_clock().get_time()
1414
        self._handle_end_vm(l, vmrr)
1415
1416
1417
    def _handle_start_suspend(self, l, rr):
1418
        """ Handles the start of a SuspensionResourceReservation
1419

1420
        Arguments:
1421
        l -- Lease the SuspensionResourceReservation belongs to
1422
        rr -- The SuspensionResourceReservation
1423

1424
        """
1425
        self.logger.debug("LEASE-%i Start of handleStartSuspend" % l.id)
1426
        l.print_contents()
1427
        rr.state = ResourceReservation.STATE_ACTIVE
1428
1429
        try:
1430
            self.resourcepool.suspend_vms(l, rr)
1431
        except EnactmentError, exc:
1432
            self.logger.error("Enactment error when suspending VMs.")
1433
            # Right now, this is a non-recoverable error, so we just
1434
            # propagate it upwards to the lease scheduler
1435
            # In the future, it may be possible to react to these
1436
            # kind of errors.
1437
            raise
1438
1439
        if rr.is_first():
1440
            l.set_state(Lease.STATE_SUSPENDING)
1441
            l.print_contents()
1442
            self.logger.info("Suspending lease %i..." % (l.id))
1443
        self.logger.debug("LEASE-%i End of handleStartSuspend" % l.id)
1444
1445
1446
    def _handle_end_suspend(self, l, rr):
1447
        """ Handles the end of a SuspensionResourceReservation
1448

1449
        Arguments:
1450
        l -- Lease the SuspensionResourceReservation belongs to
1451
        rr -- The SuspensionResourceReservation
1452
        """
1453
        self.logger.debug("LEASE-%i Start of handleEndSuspend" % l.id)
1454
        l.print_contents()
1455
        # TODO: React to incomplete suspend
1456
        self.resourcepool.verify_suspend(l, rr)
1457
        rr.state = ResourceReservation.STATE_DONE
1458
        if rr.is_last():
1459 751 borja
            if l.get_type() == Lease.DEADLINE:
1460
                l.set_state(Lease.STATE_SUSPENDED_PENDING)
1461
                l.set_state(Lease.STATE_SUSPENDED_SCHEDULED)
1462
            else:
1463
                l.set_state(Lease.STATE_SUSPENDED_PENDING)
1464 632 borja
        l.print_contents()
1465
        self.logger.debug("LEASE-%i End of handleEndSuspend" % l.id)
1466
        self.logger.info("Lease %i suspended." % (l.id))
1467
1468
        if l.get_state() == Lease.STATE_SUSPENDED_PENDING:
1469
            raise RescheduleLeaseException
1470
1471
1472
    def _handle_start_resume(self, l, rr):
1473
        """ Handles the start of a ResumptionResourceReservation
1474

1475
        Arguments:
1476
        l -- Lease the ResumptionResourceReservation belongs to
1477
        rr -- The ResumptionResourceReservation
1478

1479
        """
1480
        self.logger.debug("LEASE-%i Start of handleStartResume" % l.id)
1481
        l.print_contents()
1482
1483
        try:
1484
            self.resourcepool.resume_vms(l, rr)
1485
        except EnactmentError, exc:
1486
            self.logger.error("Enactment error when resuming VMs.")
1487
            # Right now, this is a non-recoverable error, so we just
1488
            # propagate it upwards to the lease scheduler
1489
            # In the future, it may be possible to react to these
1490
            # kind of errors.
1491
            raise
1492
1493
        rr.state = ResourceReservation.STATE_ACTIVE
1494
        if rr.is_first():
1495
            l.set_state(Lease.STATE_RESUMING)
1496
            l.print_contents()
1497
            self.logger.info("Resuming lease %i..." % (l.id))
1498
        self.logger.debug("LEASE-%i End of handleStartResume" % l.id)
1499
1500
1501
    def _handle_end_resume(self, l, rr):
1502
        """ Handles the end of a ResumptionResourceReservation
1503

1504
        Arguments:
1505
        l -- Lease the ResumptionResourceReservation belongs to
1506
        rr -- The ResumptionResourceReservation
1507

1508
        """
1509
        self.logger.debug("LEASE-%i Start of handleEndResume" % l.id)
1510
        l.print_contents()
1511
        # TODO: React to incomplete resume
1512
        self.resourcepool.verify_resume(l, rr)
1513
        rr.state = ResourceReservation.STATE_DONE
1514
        if rr.is_last():
1515
            l.set_state(Lease.STATE_RESUMED_READY)
1516
            self.logger.info("Resumed lease %i" % (l.id))
1517
        for vnode, pnode in rr.vmrr.nodes.items():
1518 831 borja
            self.resourcepool.remove_ramfile(pnode, l, vnode)
1519 632 borja
        l.print_contents()
1520
        self.logger.debug("LEASE-%i End of handleEndResume" % l.id)
1521
1522
1523
    def _handle_start_shutdown(self, l, rr):
1524
        """ Handles the start of a ShutdownResourceReservation
1525

1526
        Arguments:
1527
        l -- Lease the SuspensionResourceReservation belongs to
1528
        rr -- The SuspensionResourceReservation
1529
        """
1530
1531
        self.logger.debug("LEASE-%i Start of handleStartShutdown" % l.id)
1532
        l.print_contents()
1533
        rr.state = ResourceReservation.STATE_ACTIVE
1534
        try:
1535
            self.resourcepool.stop_vms(l, rr)
1536
        except EnactmentError, exc:
1537
            self.logger.error("Enactment error when shutting down VMs.")
1538
            # Right now, this is a non-recoverable error, so we just
1539
            # propagate it upwards to the lease scheduler
1540
            # In the future, it may be possible to react to these
1541
            # kind of errors.
1542
            raise
1543
1544
        l.print_contents()
1545
        self.logger.debug("LEASE-%i End of handleStartShutdown" % l.id)
1546
1547
1548
    def _handle_end_shutdown(self, l, rr):
1549
        """ Handles the end of a SuspensionResourceReservation
1550

1551
        Arguments:
1552
        l -- Lease the SuspensionResourceReservation belongs to
1553
        rr -- The SuspensionResourceReservation
1554

1555
        """
1556
        self.logger.debug("LEASE-%i Start of handleEndShutdown" % l.id)
1557
        l.print_contents()
1558
        rr.state = ResourceReservation.STATE_DONE
1559
        l.print_contents()
1560
        self.logger.debug("LEASE-%i End of handleEndShutdown" % l.id)
1561
        self.logger.info("Lease %i's VMs have shutdown." % (l.id))
1562
        raise NormalEndLeaseException
1563
1564
1565
    def _handle_start_migrate(self, l, rr):
1566
        """ Handles the start of a MemImageMigrationResourceReservation
1567

1568
        Arguments:
1569
        l -- Lease the MemImageMigrationResourceReservation belongs to
1570
        rr -- The MemImageMigrationResourceReservation
1571

1572
        """
1573
        self.logger.debug("LEASE-%i Start of handleStartMigrate" % l.id)
1574
        l.print_contents()
1575
        rr.state = ResourceReservation.STATE_ACTIVE
1576
        l.print_contents()
1577
        self.logger.debug("LEASE-%i End of handleStartMigrate" % l.id)
1578
        self.logger.info("Migrating lease %i..." % (l.id))
1579
1580
1581
    def _handle_end_migrate(self, l, rr):
1582
        """ Handles the end of a MemImageMigrationResourceReservation
1583

1584
        Arguments:
1585
        l -- Lease the MemImageMigrationResourceReservation belongs to
1586
        rr -- The MemImageMigrationResourceReservation
1587

1588
        """
1589
        self.logger.debug("LEASE-%i Start of handleEndMigrate" % l.id)
1590
        l.print_contents()
1591
1592
        for vnode in rr.transfers:
1593
            origin = rr.transfers[vnode][0]
1594
            dest = rr.transfers[vnode][1]
1595
1596
            # Update RAM files
1597 831 borja
            self.resourcepool.remove_ramfile(origin, l, vnode)
1598
            self.resourcepool.add_ramfile(dest, l, vnode, l.requested_resources[vnode].get_quantity(constants.RES_MEM))
1599 632 borja
1600
        rr.state = ResourceReservation.STATE_DONE
1601
        l.print_contents()
1602
        self.logger.debug("LEASE-%i End of handleEndMigrate" % l.id)
1603
        self.logger.info("Migrated lease %i..." % (l.id))
1604
1605
1606
1607
class VMResourceReservation(ResourceReservation):
1608
    def __init__(self, lease, start, end, nodes, res):
1609
        ResourceReservation.__init__(self, lease, start, end, res)
1610
        self.nodes = nodes # { vnode -> pnode }
1611
        self.pre_rrs = []
1612
        self.post_rrs = []
1613 768 borja
        self.prematureend = None
1614 632 borja
1615
    def update_start(self, time):
1616
        self.start = time
1617
        # ONLY for simulation
1618 768 borja
        self.lease._update_prematureend()
1619 632 borja
1620
    def update_end(self, time):
1621
        self.end = time
1622
        # ONLY for simulation
1623 768 borja
        self.lease._update_prematureend()
1624 632 borja
1625 778 borja
    def get_first_start(self):
1626
        if len(self.pre_rrs) == 0:
1627
            return self.start
1628
        else:
1629 818 borja
            return [rr for rr in self.pre_rrs if isinstance(rr, ResumptionResourceReservation)][0].start
1630 778 borja
1631 632 borja
    def get_final_end(self):
1632
        if len(self.post_rrs) == 0:
1633
            return self.end
1634
        else:
1635
            return self.post_rrs[-1].end
1636
1637 762 borja
    def is_resuming(self):
1638 818 borja
        return len(self.pre_rrs) > 0 and reduce(operator.or_, [isinstance(rr, ResumptionResourceReservation) for rr in self.pre_rrs])
1639 762 borja
1640 632 borja
    def is_suspending(self):
1641
        return len(self.post_rrs) > 0 and isinstance(self.post_rrs[0], SuspensionResourceReservation)
1642
1643
    def is_shutting_down(self):
1644
        return len(self.post_rrs) > 0 and isinstance(self.post_rrs[0], ShutdownResourceReservation)
1645 726 borja
1646
    def clear_rrs(self):
1647
        for rr in self.pre_rrs:
1648
            rr.clear_rrs()
1649
        for rr in self.post_rrs:
1650
            rr.clear_rrs()
1651
        self.pre_rrs = None
1652
        self.post_rrs = None
1653 632 borja
1654
    def print_contents(self, loglevel=constants.LOGLEVEL_VDEBUG):
1655 647 borja
        logger = logging.getLogger("LEASES")
1656 632 borja
        for resmrr in self.pre_rrs:
1657
            resmrr.print_contents(loglevel)
1658 647 borja
            logger.log(loglevel, "--")
1659
        logger.log(loglevel, "Type           : VM")
1660
        logger.log(loglevel, "Nodes          : %s" % pretty_nodemap(self.nodes))
1661 632 borja
        if self.prematureend != None:
1662 647 borja
            logger.log(loglevel, "Premature end  : %s" % self.prematureend)
1663 632 borja
        ResourceReservation.print_contents(self, loglevel)
1664
        for susprr in self.post_rrs:
1665 647 borja
            logger.log(loglevel, "--")
1666 632 borja
            susprr.print_contents(loglevel)
1667
1668
1669
class SuspensionResourceReservation(ResourceReservation):
1670
    def __init__(self, lease, start, end, res, vnodes, vmrr):
1671
        ResourceReservation.__init__(self, lease, start, end, res)
1672
        self.vmrr = vmrr
1673
        self.vnodes = vnodes
1674
1675
    def print_contents(self, loglevel=constants.LOGLEVEL_VDEBUG):
1676 647 borja
        logger = logging.getLogger("LEASES")
1677
        logger.log(loglevel, "Type           : SUSPEND")
1678
        logger.log(loglevel, "Vnodes         : %s" % self.vnodes)
1679 632 borja
        ResourceReservation.print_contents(self, loglevel)
1680
1681
    def is_first(self):
1682
        return (self == self.vmrr.post_rrs[0])
1683
1684
    def is_last(self):
1685
        return (self == self.vmrr.post_rrs[-1])
1686
1687 726 borja
    def clear_rrs(self):
1688
        self.vmrr = None
1689 632 borja
1690
class ResumptionResourceReservation(ResourceReservation):
1691
    def __init__(self, lease, start, end, res, vnodes, vmrr):
1692
        ResourceReservation.__init__(self, lease, start, end, res)
1693
        self.vmrr = vmrr
1694
        self.vnodes = vnodes
1695
1696
    def print_contents(self, loglevel=constants.LOGLEVEL_VDEBUG):
1697 647 borja
        logger = logging.getLogger("LEASES")
1698
        logger.log(loglevel, "Type           : RESUME")
1699
        logger.log(loglevel, "Vnodes         : %s" % self.vnodes)
1700 632 borja
        ResourceReservation.print_contents(self, loglevel)
1701
1702
    def is_first(self):
1703
        resm_rrs = [r for r in self.vmrr.pre_rrs if isinstance(r, ResumptionResourceReservation)]
1704
        return (self == resm_rrs[0])
1705
1706
    def is_last(self):
1707
        resm_rrs = [r for r in self.vmrr.pre_rrs if isinstance(r, ResumptionResourceReservation)]
1708
        return (self == resm_rrs[-1])
1709
1710 726 borja
    def clear_rrs(self):
1711
        self.vmrr = None
1712 632 borja
1713
class ShutdownResourceReservation(ResourceReservation):
1714
    def __init__(self, lease, start, end, res, vnodes, vmrr):
1715
        ResourceReservation.__init__(self, lease, start, end, res)
1716
        self.vmrr = vmrr
1717
        self.vnodes = vnodes
1718
1719
    def print_contents(self, loglevel=constants.LOGLEVEL_VDEBUG):
1720 647 borja
        logger = logging.getLogger("LEASES")
1721
        logger.log(loglevel, "Type           : SHUTDOWN")
1722 632 borja
        ResourceReservation.print_contents(self, loglevel)
1723
1724 726 borja
    def clear_rrs(self):
1725
        self.vmrr = None
1726 632 borja
1727
class MemImageMigrationResourceReservation(MigrationResourceReservation):
1728
    def __init__(self, lease, start, end, res, vmrr, transfers):
1729
        MigrationResourceReservation.__init__(self, lease, start, end, res, vmrr, transfers)
1730
1731
    def print_contents(self, loglevel=constants.LOGLEVEL_VDEBUG):
1732 647 borja
        logger = logging.getLogger("LEASES")
1733
        logger.log(loglevel, "Type           : MEM IMAGE MIGRATION")
1734
        logger.log(loglevel, "Transfers      : %s" % self.transfers)
1735 653 borja
        ResourceReservation.print_contents(self, loglevel)