Project

General

Profile

root / trunk / src / haizea / core / scheduler / vm_scheduler.py @ 632

1
# -------------------------------------------------------------------------- #
2
# Copyright 2006-2008, University of Chicago                                 #
3
# Copyright 2008, Distributed Systems Architecture Group, Universidad        #
4
# Complutense de Madrid (dsa-research.org)                                   #
5
#                                                                            #
6
# Licensed under the Apache License, Version 2.0 (the "License"); you may    #
7
# not use this file except in compliance with the License. You may obtain    #
8
# a copy of the License at                                                   #
9
#                                                                            #
10
# http://www.apache.org/licenses/LICENSE-2.0                                 #
11
#                                                                            #
12
# Unless required by applicable law or agreed to in writing, software        #
13
# distributed under the License is distributed on an "AS IS" BASIS,          #
14
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.   #
15
# See the License for the specific language governing permissions and        #
16
# limitations under the License.                                             #
17
# -------------------------------------------------------------------------- #
18

    
19
"""This module provides the main classes for Haizea's VM Scheduler. All the
20
scheduling code that decides when and where a lease is scheduled is contained
21
in the VMScheduler class (except for the code that specifically decides
22
what physical machines each virtual machine is mapped to, which is factored out
23
into the "mapper" module). This module also provides the classes for the
24
reservations that will be placed in the slot table and correspond to VMs. 
25
"""
26

    
27
import haizea.common.constants as constants
28
from haizea.common.utils import round_datetime_delta, round_datetime, estimate_transfer_time, pretty_nodemap, get_config, get_clock, get_policy
29
from haizea.core.leases import Lease, Capacity
30
from haizea.core.scheduler.slottable import ResourceReservation, ResourceTuple
31
from haizea.core.scheduler import ReservationEventHandler, RescheduleLeaseException, NormalEndLeaseException, EnactmentError, NotSchedulableException, InconsistentScheduleError, InconsistentLeaseStateError, MigrationResourceReservation
32
from operator import attrgetter, itemgetter
33
from mx.DateTime import TimeDelta
34

    
35
import logging
36

    
37

    
38
class VMScheduler(object):
39
    """The Haizea VM Scheduler
40
    
41
    This class is responsible for taking a lease and scheduling VMs to satisfy
42
    the requirements of that lease.
43
    """
44
    
45
    def __init__(self, slottable, resourcepool, mapper):
46
        """Constructor
47
        
48
        The constructor does little more than create the VM scheduler's
49
        attributes. However, it does expect (in the arguments) a fully-constructed 
50
        SlotTable, ResourcePool, and Mapper (these are constructed in the 
51
        Manager's constructor). 
52
        
53
        Arguments:
54
        slottable -- Slot table
55
        resourcepool -- Resource pool where enactment commands will be sent to
56
        mapper -- Mapper
57
        """        
58
        self.slottable = slottable
59
        self.resourcepool = resourcepool
60
        self.mapper = mapper
61
        self.logger = logging.getLogger("VMSCHED")
62
        
63
        # Register the handlers for the types of reservations used by
64
        # the VM scheduler
65
        self.handlers = {}
66
        self.handlers[VMResourceReservation] = ReservationEventHandler(
67
                                sched    = self,
68
                                on_start = VMScheduler._handle_start_vm,
69
                                on_end   = VMScheduler._handle_end_vm)
70

    
71
        self.handlers[ShutdownResourceReservation] = ReservationEventHandler(
72
                                sched    = self,
73
                                on_start = VMScheduler._handle_start_shutdown,
74
                                on_end   = VMScheduler._handle_end_shutdown)
75

    
76
        self.handlers[SuspensionResourceReservation] = ReservationEventHandler(
77
                                sched    = self,
78
                                on_start = VMScheduler._handle_start_suspend,
79
                                on_end   = VMScheduler._handle_end_suspend)
80

    
81
        self.handlers[ResumptionResourceReservation] = ReservationEventHandler(
82
                                sched    = self,
83
                                on_start = VMScheduler._handle_start_resume,
84
                                on_end   = VMScheduler._handle_end_resume)
85

    
86
        self.handlers[MemImageMigrationResourceReservation] = ReservationEventHandler(
87
                                sched    = self,
88
                                on_start = VMScheduler._handle_start_migrate,
89
                                on_end   = VMScheduler._handle_end_migrate)
90
        
91
        # When using backfilling, set the number of leases that can be
92
        # scheduled in the future.
93
        backfilling = get_config().get("backfilling")
94
        if backfilling == constants.BACKFILLING_OFF:
95
            self.max_in_future = 0
96
        elif backfilling == constants.BACKFILLING_AGGRESSIVE:
97
            self.max_in_future = 1
98
        elif backfilling == constants.BACKFILLING_CONSERVATIVE:
99
            self.max_in_future = -1 # Unlimited
100
        elif backfilling == constants.BACKFILLING_INTERMEDIATE:
101
            self.max_in_future = get_config().get("backfilling-reservations")
102
        self.future_leases = set()
103

    
104

    
105
    def schedule(self, lease, nexttime, earliest):
106
        """ The scheduling function
107
        
108
        This particular function doesn't do much except call __schedule_asap
109
        and __schedule_exact (which do all the work).
110
        
111
        Arguments:
112
        lease -- Lease to schedule
113
        nexttime -- The next time at which the scheduler can allocate resources.
114
        earliest -- The earliest possible starting times on each physical node
115
        """        
116
        if lease.get_type() == Lease.BEST_EFFORT:
117
            return self.__schedule_asap(lease, nexttime, earliest, allow_in_future = self.can_schedule_in_future())
118
        elif lease.get_type() == Lease.ADVANCE_RESERVATION:
119
            return self.__schedule_exact(lease, nexttime, earliest)
120
        elif lease.get_type() == Lease.IMMEDIATE:
121
            return self.__schedule_asap(lease, nexttime, earliest, allow_in_future = False)
122

    
123

    
124
    def estimate_migration_time(self, lease):
125
        """ Estimates the time required to migrate a lease's VMs
126

127
        This function conservatively estimates that all the VMs are going to
128
        be migrated to other nodes. Since all the transfers are intra-node,
129
        the bottleneck is the transfer from whatever node has the most
130
        memory to transfer.
131
        
132
        Note that this method only estimates the time to migrate the memory
133
        state files for the VMs. Migrating the software environment (which may
134
        or may not be a disk image) is the responsibility of the preparation
135
        scheduler, which has it's own set of migration scheduling methods.
136

137
        Arguments:
138
        lease -- Lease that might be migrated
139
        """                
140
        migration = get_config().get("migration")
141
        if migration == constants.MIGRATE_YES:
142
            vmrr = lease.get_last_vmrr()
143
            mem_in_pnode = dict([(pnode,0) for pnode in set(vmrr.nodes.values())])
144
            for (vnode,pnode) in vmrr.nodes.items():
145
                mem = vmrr.resources_in_pnode[pnode].get_by_type(constants.RES_MEM)
146
                mem_in_pnode[pnode] += mem
147
            max_mem_to_transfer = max(mem_in_pnode.values())
148
            bandwidth = self.resourcepool.info.get_migration_bandwidth()
149
            return estimate_transfer_time(max_mem_to_transfer, bandwidth)
150
        elif migration == constants.MIGRATE_YES_NOTRANSFER:
151
            return TimeDelta(seconds=0)        
152

    
153
    def schedule_migration(self, lease, vmrr, nexttime):
154
        """ Schedules migrations for a lease
155

156
        Arguments:
157
        lease -- Lease being migrated
158
        vmrr -- The VM reservation before which the migration will take place
159
        nexttime -- The next time at which the scheduler can allocate resources.
160
        """
161
        
162
        # Determine what migrations have to be done. We do this by looking at
163
        # the mapping in the previous VM RR and in the new VM RR
164
        last_vmrr = lease.get_last_vmrr()
165
        vnode_migrations = dict([(vnode, (last_vmrr.nodes[vnode], vmrr.nodes[vnode])) for vnode in vmrr.nodes])
166
        
167
        # Determine if we actually have to migrate
168
        mustmigrate = False
169
        for vnode in vnode_migrations:
170
            if vnode_migrations[vnode][0] != vnode_migrations[vnode][1]:
171
                mustmigrate = True
172
                break
173
            
174
        if not mustmigrate:
175
            return []
176

    
177
        # If Haizea is configured to migrate without doing any transfers,
178
        # then we just return a nil-duration migration RR
179
        if get_config().get("migration") == constants.MIGRATE_YES_NOTRANSFER:
180
            start = nexttime
181
            end = nexttime
182
            res = {}
183
            migr_rr = MemImageMigrationResourceReservation(lease, start, end, res, vmrr, vnode_migrations)
184
            migr_rr.state = ResourceReservation.STATE_SCHEDULED
185
            return [migr_rr]
186

    
187
        # Figure out what migrations can be done simultaneously
188
        migrations = []
189
        while len(vnode_migrations) > 0:
190
            pnodes = set()
191
            migration = {}
192
            for vnode in vnode_migrations:
193
                origin = vnode_migrations[vnode][0]
194
                dest = vnode_migrations[vnode][1]
195
                if not origin in pnodes and not dest in pnodes:
196
                    migration[vnode] = vnode_migrations[vnode]
197
                    pnodes.add(origin)
198
                    pnodes.add(dest)
199
            for vnode in migration:
200
                del vnode_migrations[vnode]
201
            migrations.append(migration)
202
        
203
        # Create migration RRs
204
        start = max(last_vmrr.post_rrs[-1].end, nexttime)
205
        bandwidth = self.resourcepool.info.get_migration_bandwidth()
206
        migr_rrs = []
207
        for m in migrations:
208
            vnodes_to_migrate = m.keys()
209
            max_mem_to_migrate = max([lease.requested_resources[vnode].get_quantity(constants.RES_MEM) for vnode in vnodes_to_migrate])
210
            migr_time = estimate_transfer_time(max_mem_to_migrate, bandwidth)
211
            end = start + migr_time
212
            res = {}
213
            for (origin,dest) in m.values():
214
                resorigin = Capacity([constants.RES_NETOUT])
215
                resorigin.set_quantity(constants.RES_NETOUT, bandwidth)
216
                resdest = Capacity([constants.RES_NETIN])
217
                resdest.set_quantity(constants.RES_NETIN, bandwidth)
218
                res[origin] = self.slottable.create_resource_tuple_from_capacity(resorigin)
219
                res[dest] = self.slottable.create_resource_tuple_from_capacity(resdest)                
220
            migr_rr = MemImageMigrationResourceReservation(lease, start, start + migr_time, res, vmrr, m)
221
            migr_rr.state = ResourceReservation.STATE_SCHEDULED
222
            migr_rrs.append(migr_rr)
223
            start = end
224
            
225
        return migr_rrs
226

    
227
    def cancel_vm(self, vmrr):
228
        """ Cancels a VM resource reservation
229

230
        Arguments:
231
        vmrr -- VM RR to be cancelled
232
        """         
233
        
234
        # If this VM RR is part of a lease that was scheduled in the future,
235
        # remove that lease from the set of future leases.
236
        if vmrr.lease in self.future_leases:
237
            self.future_leases.remove(vmrr.lease)
238

    
239
        # If there are any pre-RRs that are scheduled, remove them
240
        for rr in vmrr.pre_rrs:
241
            if rr.state == ResourceReservation.STATE_SCHEDULED:
242
                self.slottable.remove_reservation(rr)
243

    
244
        # If there are any post RRs, remove them
245
        for rr in vmrr.post_rrs:
246
            self.slottable.remove_reservation(rr)
247
        
248
        # Remove the reservation itself
249
        self.slottable.remove_reservation(vmrr)
250

    
251

    
252
    def can_suspend_at(self, lease, t):
253
        """ Determines if it is possible to suspend a lease before a given time
254

255
        Arguments:
256
        vmrr -- VM RR to be preempted
257
        t -- Time by which the VM must be preempted
258
        """                     
259
        # TODO: Make more general, should determine vmrr based on current time
260
        # This won't currently break, though, since the calling function 
261
        # operates on the last VM RR.
262
        vmrr = lease.get_last_vmrr()
263
        time_until_suspend = t - vmrr.start
264
        min_duration = self.__compute_scheduling_threshold(lease)
265
        can_suspend = time_until_suspend >= min_duration        
266
        return can_suspend
267
    
268
    
269
    def preempt_vm(self, vmrr, t):
270
        """ Preempts a VM reservation at a given time
271

272
        This method assumes that the lease is, in fact, preemptable,
273
        that the VMs are running at the given time, and that there is 
274
        enough time to suspend the VMs before the given time (all these
275
        checks are done in the lease scheduler).
276
        
277
        Arguments:
278
        vmrr -- VM RR to be preempted
279
        t -- Time by which the VM must be preempted
280
        """             
281
        
282
        # Save original start and end time of the vmrr
283
        old_start = vmrr.start
284
        old_end = vmrr.end
285
        
286
        # Schedule the VM suspension
287
        self.__schedule_suspension(vmrr, t)
288
        
289
        # Update the VMRR in the slot table
290
        self.slottable.update_reservation_with_key_change(vmrr, old_start, old_end)
291
        
292
        # Add the suspension RRs to the VM's post-RRs
293
        for susprr in vmrr.post_rrs:
294
            self.slottable.add_reservation(susprr)
295
            
296
            
297
    def get_future_reschedulable_leases(self):
298
        """ Returns a list of future leases that are reschedulable.
299

300
        Currently, this list is just the best-effort leases scheduled
301
        in the future as determined by the backfilling algorithm.
302
        Advance reservation leases, by their nature, cannot be 
303
        rescheduled to find a "better" starting time.
304
        """             
305
        return list(self.future_leases)
306
    
307

    
308
    def can_schedule_in_future(self):
309
        """ Returns True if the backfilling algorithm would allow a lease
310
        to be scheduled in the future.
311

312
        """             
313
        if self.max_in_future == -1: # Unlimited
314
            return True
315
        else:
316
            return len(self.future_leases) < self.max_in_future
317

    
318
        
319
    def get_utilization(self, time):
320
        """ Computes resource utilization (currently just CPU-based)
321

322
        Arguments:
323
        time -- Time at which to determine utilization
324
        """         
325
        total = self.slottable.get_total_capacity(restype = constants.RES_CPU)
326
        util = {}
327
        reservations = self.slottable.get_reservations_at(time)
328
        for r in reservations:
329
            for node in r.resources_in_pnode:
330
                if isinstance(r, VMResourceReservation):
331
                    use = r.resources_in_pnode[node].get_by_type(constants.RES_CPU)
332
                    util[type(r)] = use + util.setdefault(type(r),0.0)
333
                elif isinstance(r, SuspensionResourceReservation) or isinstance(r, ResumptionResourceReservation) or isinstance(r, ShutdownResourceReservation):
334
                    use = r.vmrr.resources_in_pnode[node].get_by_type(constants.RES_CPU)
335
                    util[type(r)] = use + util.setdefault(type(r),0.0)
336
        util[None] = total - sum(util.values())
337
        for k in util:
338
            util[k] /= total
339
            
340
        return util              
341
        
342

    
343
    def __schedule_exact(self, lease, nexttime, earliest):
344
        """ Schedules VMs that must start at an exact time
345
        
346
        This type of lease is "easy" to schedule because we know the exact
347
        start time, which means that's the only starting time we have to
348
        check. So, this method does little more than call the mapper.
349
        
350
        Arguments:
351
        lease -- Lease to schedule
352
        nexttime -- The next time at which the scheduler can allocate resources.
353
        earliest -- The earliest possible starting times on each physical node
354
        """             
355
        
356
        # Determine the start and end time
357
        start = lease.start.requested
358
        end = start + lease.duration.requested
359
        
360
        # Convert Capacity objects in lease object into ResourceTuples that
361
        # we can hand over to the mapper.
362
        requested_resources = dict([(k,self.slottable.create_resource_tuple_from_capacity(v)) for k,v in lease.requested_resources.items()])
363

    
364
        # Let the mapper do its magiv
365
        mapping, actualend, preemptions = self.mapper.map(lease, 
366
                                                          requested_resources,
367
                                                          start, 
368
                                                          end, 
369
                                                          strictend = True)
370
        
371
        # If no mapping was found, tell the lease scheduler about it
372
        if mapping == None:
373
            raise NotSchedulableException, "Not enough resources in specified interval"
374
        
375
        # Create VM resource reservations
376
        res = {}
377
        
378
        for (vnode,pnode) in mapping.items():
379
            vnode_res = requested_resources[vnode]
380
            if res.has_key(pnode):
381
                res[pnode].incr(vnode_res)
382
            else:
383
                res[pnode] = ResourceTuple.copy(vnode_res)
384
        
385
        vmrr = VMResourceReservation(lease, start, end, mapping, res)
386
        vmrr.state = ResourceReservation.STATE_SCHEDULED
387

    
388
        # Schedule shutdown for the VM
389
        self.__schedule_shutdown(vmrr)
390
        
391
        return vmrr, preemptions
392

    
393

    
394
    def __schedule_asap(self, lease, nexttime, earliest, allow_in_future = None):
395
        """ Schedules VMs as soon as possible
396
        
397
        This method is a bit more complex that __schedule_exact because
398
        we need to figure out what "as soon as possible" actually is.
399
        This involves attempting several mappings, at different points
400
        in time, before we can schedule the lease.
401
        
402
        This method will always check, at least, if the lease can be scheduled
403
        at the earliest possible moment at which the lease could be prepared
404
        (e.g., if the lease can't start until 1 hour in the future because that's
405
        the earliest possible time at which the disk images it requires can
406
        be transferred, then that's when the scheduler will check). Note, however,
407
        that this "earliest possible moment" is determined by the preparation
408
        scheduler.
409
        
410
        Additionally, if the lease can't be scheduled at the earliest
411
        possible moment, it can also check if the lease can be scheduled
412
        in the future. This partially implements a backfilling algorithm
413
        (the maximum number of future leases is stored in the max_in_future
414
        attribute of VMScheduler), the other part being implemented in the
415
        __process_queue method of LeaseScheduler.
416
        
417
        Note that, if the method is allowed to scheduled in the future,
418
        and assuming that the lease doesn't request more resources than
419
        the site itself, this method will always schedule the VMs succesfully
420
        (since there's always an empty spot somewhere in the future).
421
        
422
        
423
        Arguments:
424
        lease -- Lease to schedule
425
        nexttime -- The next time at which the scheduler can allocate resources.
426
        earliest -- The earliest possible starting times on each physical node
427
        allow_in_future -- Boolean indicating whether the scheduler is
428
        allowed to schedule the VMs in the future.
429
        """                
430
        
431

    
432

    
433
        #
434
        # STEP 1: PROLEGOMENA
435
        #
436
        
437
        lease_id = lease.id
438
        remaining_duration = lease.duration.get_remaining_duration()
439
        shutdown_time = self.__estimate_shutdown_time(lease)
440
        
441
        # We might be scheduling a suspended lease. If so, we will
442
        # also have to schedule its resumption. Right now, just 
443
        # figure out if this is such a lease.
444
        mustresume = (lease.get_state() in (Lease.STATE_SUSPENDED_PENDING, Lease.STATE_SUSPENDED_QUEUED, Lease.STATE_SUSPENDED_SCHEDULED))
445

    
446
        # This is the minimum duration that we must be able to schedule.
447
        # See __compute_scheduling_threshold for more details.
448
        min_duration = self.__compute_scheduling_threshold(lease)
449
        
450

    
451
        #
452
        # STEP 2: FIND THE CHANGEPOINTS
453
        #
454

    
455
        # Find the changepoints, and the available nodes at each changepoint
456
        # We need to do this because the preparation scheduler may have
457
        # determined that some nodes might require more time to prepare
458
        # than others (e.g., if using disk image caching, some nodes
459
        # might have the required disk image predeployed, while others
460
        # may require transferring the image to that node).
461
        # 
462
        # The end result of this step is a list (cps) where each entry
463
        # is a (t,nodes) pair, where "t" is the time of the changepoint
464
        # and "nodes" is the set of nodes that are available at that time.
465
        
466
        if not mustresume:
467
            # If this is not a suspended lease, then the changepoints
468
            # are determined based on the "earliest" parameter.
469
            cps = [(node, e.time) for node, e in earliest.items()]
470
            cps.sort(key=itemgetter(1))
471
            curcp = None
472
            changepoints = []
473
            nodes = []
474
            for node, time in cps:
475
                nodes.append(node)
476
                if time != curcp:
477
                    changepoints.append([time, set(nodes)])
478
                    curcp = time
479
                else:
480
                    changepoints[-1][1] = set(nodes)
481
        else:
482
            # If the lease is suspended, we take into account that, if
483
            # migration is disabled, we can only schedule the lease
484
            # on the nodes it is currently scheduled on.
485
            if get_config().get("migration") == constants.MIGRATE_NO:
486
                vmrr = lease.get_last_vmrr()
487
                onlynodes = set(vmrr.nodes.values())
488
            else:
489
                onlynodes = None               
490
            changepoints = list(set([x.time for x in earliest.values()]))
491
            changepoints.sort()
492
            changepoints = [(x, onlynodes) for x in changepoints]
493

    
494

    
495
        # If we can schedule VMs in the future,
496
        # we also consider future changepoints
497
        if allow_in_future:
498
            res = self.slottable.get_reservations_ending_after(changepoints[-1][0])
499
            # We really only care about changepoints where VMs end (which is
500
            # when resources become available)
501
            futurecp = [r.get_final_end() for r in res if isinstance(r, VMResourceReservation)]
502
            # Corner case: Sometimes we're right in the middle of a ShutdownReservation, so it won't be
503
            # included in futurecp.
504
            futurecp += [r.end for r in res if isinstance(r, ShutdownResourceReservation) and not r.vmrr in res]
505
            if not mustresume:
506
                futurecp = [(p,None) for p in futurecp]
507
            else:
508
                futurecp = [(p,onlynodes) for p in futurecp]                
509
        else:
510
            futurecp = []
511
            
512

    
513
        #
514
        # STEP 3: FIND A MAPPING
515
        #
516
        
517
        # In this step we find a starting time and a mapping for the VMs,
518
        # which involves going through the changepoints in order and seeing
519
        # if we can find a mapping.
520
        # Most of the work is done in the __find_fit_at_points
521
        
522
        # If resuming, we also have to allocate enough time for the resumption
523
        if mustresume:
524
            duration = remaining_duration + self.__estimate_resume_time(lease)
525
        else:
526
            duration = remaining_duration
527

    
528
        duration += shutdown_time
529

    
530
        in_future = False
531

    
532
        # Convert Capacity objects in lease object into ResourceTuples that
533
        # we can hand over to the mapper.
534
        requested_resources = dict([(k,self.slottable.create_resource_tuple_from_capacity(v)) for k,v in lease.requested_resources.items()])
535

    
536
        # First, try to find a mapping assuming we can't schedule in the future
537
        start, end, mapping, preemptions = self.__find_fit_at_points(lease,
538
                                                                     requested_resources,
539
                                                                     changepoints, 
540
                                                                     duration, 
541
                                                                     min_duration)
542
        
543
        if start == None and not allow_in_future:
544
                # We did not find a suitable starting time. This can happen
545
                # if we're unable to schedule in the future
546
                raise NotSchedulableException, "Could not find enough resources for this request"
547

    
548
        # If we haven't been able to fit the lease, check if we can
549
        # reserve it in the future
550
        if start == None and allow_in_future:
551
            start, end, mapping, preemptions = self.__find_fit_at_points(lease,
552
                                                                         requested_resources,
553
                                                                         futurecp, 
554
                                                                         duration, 
555
                                                                         min_duration
556
                                                                         )
557
            # TODO: The following will also raise an exception if a lease
558
            # makes a request that could *never* be satisfied with the
559
            # current resources.
560
            if start == None:
561
                raise InconsistentScheduleError, "Could not find a mapping in the future (this should not happen)"
562

    
563
            in_future = True
564

    
565
        #
566
        # STEP 4: CREATE RESERVATIONS
567
        #
568
        
569
        # At this point, the lease is feasible. We just need to create
570
        # the reservations for the VMs and, possibly, for the VM resumption,
571
        # suspension, and shutdown.    
572
        
573
        # VM resource reservation
574
        res = {}
575
        
576
        for (vnode,pnode) in mapping.items():
577
            vnode_res = requested_resources[vnode]
578
            if res.has_key(pnode):
579
                res[pnode].incr(vnode_res)
580
            else:
581
                res[pnode] = ResourceTuple.copy(vnode_res)
582

    
583
        vmrr = VMResourceReservation(lease, start, end, mapping, res)
584
        vmrr.state = ResourceReservation.STATE_SCHEDULED
585

    
586
        # VM resumption resource reservation
587
        if mustresume:
588
            self.__schedule_resumption(vmrr, start)
589

    
590
        # If the mapper couldn't find a mapping for the full duration
591
        # of the lease, then we need to schedule a suspension.
592
        mustsuspend = (vmrr.end - vmrr.start) < remaining_duration
593
        if mustsuspend:
594
            self.__schedule_suspension(vmrr, end)
595
        else:
596
            # Compensate for any overestimation
597
            if (vmrr.end - vmrr.start) > remaining_duration + shutdown_time:
598
                vmrr.end = vmrr.start + remaining_duration + shutdown_time
599
            self.__schedule_shutdown(vmrr)
600
        
601
        if in_future:
602
            self.future_leases.add(lease)
603

    
604
        susp_str = res_str = ""
605
        if mustresume:
606
            res_str = " (resuming)"
607
        if mustsuspend:
608
            susp_str = " (suspending)"
609
        self.logger.info("Lease #%i has been scheduled on nodes %s from %s%s to %s%s" % (lease.id, mapping.values(), start, res_str, end, susp_str))
610

    
611
        return vmrr, preemptions
612

    
613

    
614
    def __find_fit_at_points(self, lease, requested_resources, changepoints, duration, min_duration):
615
        """ Tries to map a lease in a given list of points in time
616
        
617
        This method goes through a given list of points in time and tries
618
        to find the earliest time at which that lease can be allocated
619
        resources.
620
        
621
        Arguments:
622
        lease -- Lease to schedule
623
        requested_resources -- A dictionary of lease node -> ResourceTuple.
624
        changepoints -- The list of changepoints
625
        duration -- The amount of time requested
626
        min_duration -- The minimum amount of time that should be allocated
627
        
628
        Returns:
629
        start -- The time at which resources have been found for the lease
630
        actualend -- The time at which the resources won't be available. Note
631
        that this is not necessarily (start + duration) since the mapper
632
        might be unable to find enough resources for the full requested duration.
633
        mapping -- A mapping of lease nodes to physical nodes
634
        preemptions -- A list of 
635
        (if no mapping is found, all these values are set to None)
636
        """                 
637
        found = False
638
        
639
        for time, onlynodes in changepoints:
640
            start = time
641
            end = start + duration
642
            self.logger.debug("Attempting to map from %s to %s" % (start, end))
643
            
644
            # If suspension is disabled, we will only accept mappings that go
645
            # from "start" strictly until "end".
646
            susptype = get_config().get("suspension")
647
            if susptype == constants.SUSPENSION_NONE or (lease.numnodes > 1 and susptype == constants.SUSPENSION_SERIAL):
648
                strictend = True
649
            else:
650
                strictend = False
651

    
652
            # Let the mapper work its magic
653
            mapping, actualend, preemptions = self.mapper.map(lease, 
654
                                                              requested_resources,
655
                                                              start, 
656
                                                              end, 
657
                                                              strictend = strictend,
658
                                                              onlynodes = onlynodes)
659
            
660
            # We have a mapping; we still have to check if it satisfies
661
            # the minimum duration.
662
            if mapping != None:
663
                if actualend < end:
664
                    actualduration = actualend - start
665
                    if actualduration >= min_duration:
666
                        self.logger.debug("This lease can be scheduled from %s to %s (will require suspension)" % (start, actualend))
667
                        found = True
668
                        break
669
                    else:
670
                        self.logger.debug("This starting time does not allow for the requested minimum duration (%s < %s)" % (actualduration, min_duration))
671
                else:
672
                    self.logger.debug("This lease can be scheduled from %s to %s (full duration)" % (start, end))
673
                    found = True
674
                    break
675
        
676
        if found:
677
            return start, actualend, mapping, preemptions
678
        else:
679
            return None, None, None, None
680
    
681
    
682
    def __compute_susprem_times(self, vmrr, time, direction, exclusion, rate, override = None):
683
        """ Computes the times at which suspend/resume operations would have to start
684
        
685
        When suspending or resuming a VM, the VM's memory is dumped to a
686
        file on disk. To correctly estimate the time required to suspend
687
        a lease with multiple VMs, Haizea makes sure that no two 
688
        suspensions/resumptions happen at the same time (e.g., if eight
689
        memory files were being saved at the same time to disk, the disk's
690
        performance would be reduced in a way that is not as easy to estimate
691
        as if only one file were being saved at a time). Based on a number
692
        of parameters, this method estimates the times at which the 
693
        suspend/resume commands would have to be sent to guarantee this
694
        exclusion.
695
                    
696
        Arguments:
697
        vmrr -- The VM reservation that will be suspended/resumed
698
        time -- The time at which the suspend should end or the resume should start.
699
        direction -- DIRECTION_BACKWARD: start at "time" and compute the times going
700
        backward (for suspensions) DIRECTION_FORWARD: start at time "time" and compute
701
        the times going forward.
702
        exclusion -- SUSPRES_EXCLUSION_GLOBAL (memory is saved to global filesystem)
703
        or SUSPRES_EXCLUSION_LOCAL (saved to local filesystem)
704
        rate -- The rate at which an individual VM is suspended/resumed
705
        override -- If specified, then instead of computing the time to 
706
        suspend/resume VM based on its memory and the "rate" parameter,
707
        use this override value.
708
        
709
        """         
710
        times = [] # (start, end, {pnode -> vnodes})
711
        enactment_overhead = get_config().get("enactment-overhead") 
712

    
713
        if exclusion == constants.SUSPRES_EXCLUSION_GLOBAL:
714
            # Global exclusion (which represents, e.g., reading/writing the memory image files
715
            # from a global file system) meaning no two suspensions/resumptions can happen at 
716
            # the same time in the entire resource pool.
717
            
718
            t = time
719
            t_prev = None
720
                
721
            for (vnode,pnode) in vmrr.nodes.items():
722
                if override == None:
723
                    mem = vmrr.lease.requested_resources.get_by_type(constants.RES_MEM)
724
                    op_time = self.__compute_suspend_resume_time(mem, rate)
725
                else:
726
                    op_time = override
727

    
728
                op_time += enactment_overhead
729
                    
730
                t_prev = t
731
                
732
                if direction == constants.DIRECTION_FORWARD:
733
                    t += op_time
734
                    times.append((t_prev, t, {pnode:[vnode]}))
735
                elif direction == constants.DIRECTION_BACKWARD:
736
                    t -= op_time
737
                    times.append((t, t_prev, {pnode:[vnode]}))
738

    
739
        elif exclusion == constants.SUSPRES_EXCLUSION_LOCAL:
740
            # Local exclusion (which represents, e.g., reading the memory image files
741
            # from a local file system) means no two resumptions can happen at the same
742
            # time in the same physical node.
743
            pervnode_times = [] # (start, end, vnode)
744
            vnodes_in_pnode = {}
745
            for (vnode,pnode) in vmrr.nodes.items():
746
                vnodes_in_pnode.setdefault(pnode, []).append(vnode)
747
            for pnode in vnodes_in_pnode:
748
                t = time
749
                t_prev = None
750
                for vnode in vnodes_in_pnode[pnode]:
751
                    if override == None:
752
                        mem = vmrr.lease.requested_resources[vnode].get_quantity(constants.RES_MEM)
753
                        op_time = self.__compute_suspend_resume_time(mem, rate)
754
                    else:
755
                        op_time = override                    
756
                    
757
                    t_prev = t
758
                    
759
                    if direction == constants.DIRECTION_FORWARD:
760
                        t += op_time
761
                        pervnode_times.append((t_prev, t, vnode))
762
                    elif direction == constants.DIRECTION_BACKWARD:
763
                        t -= op_time
764
                        pervnode_times.append((t, t_prev, vnode))
765
            
766
            # Consolidate suspend/resume operations happening at the same time
767
            uniq_times = set([(start, end) for (start, end, vnode) in pervnode_times])
768
            for (start, end) in uniq_times:
769
                vnodes = [x[2] for x in pervnode_times if x[0] == start and x[1] == end]
770
                node_mappings = {}
771
                for vnode in vnodes:
772
                    pnode = vmrr.nodes[vnode]
773
                    node_mappings.setdefault(pnode, []).append(vnode)
774
                times.append([start,end,node_mappings])
775
        
776
            # Add the enactment overhead
777
            for t in times:
778
                num_vnodes = sum([len(vnodes) for vnodes in t[2].values()])
779
                overhead = TimeDelta(seconds = num_vnodes * enactment_overhead)
780
                if direction == constants.DIRECTION_FORWARD:
781
                    t[1] += overhead
782
                elif direction == constants.DIRECTION_BACKWARD:
783
                    t[0] -= overhead
784
                    
785
            # Fix overlaps
786
            if direction == constants.DIRECTION_FORWARD:
787
                times.sort(key=itemgetter(0))
788
            elif direction == constants.DIRECTION_BACKWARD:
789
                times.sort(key=itemgetter(1))
790
                times.reverse()
791
                
792
            prev_start = None
793
            prev_end = None
794
            for t in times:
795
                if prev_start != None:
796
                    start = t[0]
797
                    end = t[1]
798
                    if direction == constants.DIRECTION_FORWARD:
799
                        if start < prev_end:
800
                            diff = prev_end - start
801
                            t[0] += diff
802
                            t[1] += diff
803
                    elif direction == constants.DIRECTION_BACKWARD:
804
                        if end > prev_start:
805
                            diff = end - prev_start
806
                            t[0] -= diff
807
                            t[1] -= diff
808
                prev_start = t[0]
809
                prev_end = t[1]
810
        
811
        return times
812
    
813
    
814
    def __schedule_shutdown(self, vmrr):
815
        """ Schedules the shutdown of a VM reservation
816
                            
817
        Arguments:
818
        vmrr -- The VM reservation that will be shutdown
819
        
820
        """                 
821
        config = get_config()
822
        shutdown_time = self.__estimate_shutdown_time(vmrr.lease)
823

    
824
        start = vmrr.end - shutdown_time
825
        end = vmrr.end
826
        
827
        shutdown_rr = ShutdownResourceReservation(vmrr.lease, start, end, vmrr.resources_in_pnode, vmrr.nodes, vmrr)
828
        shutdown_rr.state = ResourceReservation.STATE_SCHEDULED
829
                
830
        vmrr.update_end(start)
831
        
832
        # If there are any post RRs, remove them
833
        for rr in vmrr.post_rrs:
834
            self.slottable.remove_reservation(rr)
835
        vmrr.post_rrs = []
836

    
837
        vmrr.post_rrs.append(shutdown_rr)
838

    
839

    
840
    def __schedule_suspension(self, vmrr, suspend_by):
841
        """ Schedules the suspension of a VM reservation
842
                         
843
        Most of the work is done in __compute_susprem_times. See that
844
        method's documentation for more details.
845
                            
846
        Arguments:
847
        vmrr -- The VM reservation that will be suspended
848
        suspend_by -- The time by which the VMs should be suspended.
849
        
850
        """            
851
        config = get_config()
852
        susp_exclusion = config.get("suspendresume-exclusion")
853
        override = get_config().get("override-suspend-time")
854
        rate = config.get("suspend-rate") 
855

    
856
        if suspend_by < vmrr.start or suspend_by > vmrr.end:
857
            raise InconsistentScheduleError, "Tried to schedule a suspension by %s, which is outside the VMRR's duration (%s-%s)" % (suspend_by, vmrr.start, vmrr.end)
858

    
859
        # Find the suspension times
860
        times = self.__compute_susprem_times(vmrr, suspend_by, constants.DIRECTION_BACKWARD, susp_exclusion, rate, override)
861
        
862
        # Create the suspension resource reservations
863
        suspend_rrs = []
864
        for (start, end, node_mappings) in times:
865
            suspres = {}
866
            all_vnodes = []
867
            for (pnode,vnodes) in node_mappings.items():
868
                num_vnodes = len(vnodes)
869
                r = Capacity([constants.RES_MEM,constants.RES_DISK])
870
                mem = 0
871
                for vnode in vnodes:
872
                    mem += vmrr.lease.requested_resources[vnode].get_quantity(constants.RES_MEM)
873
                r.set_quantity(constants.RES_MEM, mem * num_vnodes)
874
                r.set_quantity(constants.RES_DISK, mem * num_vnodes)
875
                suspres[pnode] = self.slottable.create_resource_tuple_from_capacity(r)          
876
                all_vnodes += vnodes     
877
                             
878
            susprr = SuspensionResourceReservation(vmrr.lease, start, end, suspres, all_vnodes, vmrr)
879
            susprr.state = ResourceReservation.STATE_SCHEDULED
880
            suspend_rrs.append(susprr)
881
                
882
        suspend_rrs.sort(key=attrgetter("start"))
883
            
884
        susp_start = suspend_rrs[0].start
885
        if susp_start < vmrr.start:
886
            raise InconsistentScheduleError, "Determined suspension should start at %s, before the VMRR's start (%s) -- Suspend time not being properly estimated?" % (susp_start, vmrr.start)
887
        
888
        vmrr.update_end(susp_start)
889
        
890
        # If there are any post RRs, remove them
891
        for rr in vmrr.post_rrs:
892
            self.slottable.remove_reservation(rr)
893
        vmrr.post_rrs = []
894

    
895
        # Add the suspension RRs to the VM RR
896
        for susprr in suspend_rrs:
897
            vmrr.post_rrs.append(susprr)       
898
            
899
            
900
    def __schedule_resumption(self, vmrr, resume_at):
901
        """ Schedules the resumption of a VM reservation
902
                         
903
        Most of the work is done in __compute_susprem_times. See that
904
        method's documentation for more details.
905
                            
906
        Arguments:
907
        vmrr -- The VM reservation that will be resumed
908
        resume_at -- The time at which the resumption should start
909
        
910
        """                 
911
        config = get_config()
912
        resm_exclusion = config.get("suspendresume-exclusion")        
913
        override = get_config().get("override-resume-time")
914
        rate = config.get("resume-rate") 
915

    
916
        if resume_at < vmrr.start or resume_at > vmrr.end:
917
            raise InconsistentScheduleError, "Tried to schedule a resumption at %s, which is outside the VMRR's duration (%s-%s)" % (resume_at, vmrr.start, vmrr.end)
918

    
919
        # Find the resumption times
920
        times = self.__compute_susprem_times(vmrr, resume_at, constants.DIRECTION_FORWARD, resm_exclusion, rate, override)
921
        
922
        # Create the resumption resource reservations
923
        resume_rrs = []
924
        for (start, end, node_mappings) in times:
925
            resmres = {}
926
            all_vnodes = []
927
            for (pnode,vnodes) in node_mappings.items():
928
                num_vnodes = len(vnodes)
929
                r = Capacity([constants.RES_MEM,constants.RES_DISK])
930
                mem = 0
931
                for vnode in vnodes:
932
                    mem += vmrr.lease.requested_resources[vnode].get_quantity(constants.RES_MEM)
933
                r.set_quantity(constants.RES_MEM, mem * num_vnodes)
934
                r.set_quantity(constants.RES_DISK, mem * num_vnodes)
935
                resmres[pnode] = self.slottable.create_resource_tuple_from_capacity(r)
936
                all_vnodes += vnodes
937
            resmrr = ResumptionResourceReservation(vmrr.lease, start, end, resmres, all_vnodes, vmrr)
938
            resmrr.state = ResourceReservation.STATE_SCHEDULED
939
            resume_rrs.append(resmrr)
940
                
941
        resume_rrs.sort(key=attrgetter("start"))
942
            
943
        resm_end = resume_rrs[-1].end
944
        if resm_end > vmrr.end:
945
            raise InconsistentScheduleError, "Determined resumption would end at %s, after the VMRR's end (%s) -- Resume time not being properly estimated?" % (resm_end, vmrr.end)
946
        
947
        vmrr.update_start(resm_end)
948
        
949
        # Add the resumption RRs to the VM RR
950
        for resmrr in resume_rrs:
951
            vmrr.pre_rrs.append(resmrr)        
952
           
953
           
954
    def __compute_suspend_resume_time(self, mem, rate):
955
        """ Compute the time to suspend/resume a single VM
956
                            
957
        Arguments:
958
        mem -- Amount of memory used by the VM
959
        rate -- The rate at which an individual VM is suspended/resumed
960
        
961
        """            
962
        time = float(mem) / rate
963
        time = round_datetime_delta(TimeDelta(seconds = time))
964
        return time
965
    
966
    
967
    def __estimate_suspend_time(self, lease):
968
        """ Estimate the time to suspend an entire lease
969
                            
970
        Most of the work is done in __estimate_suspend_resume_time. See
971
        that method's documentation for more details.
972
        
973
        Arguments:
974
        lease -- Lease that is going to be suspended
975
        
976
        """               
977
        rate = get_config().get("suspend-rate")
978
        override = get_config().get("override-suspend-time")
979
        if override != None:
980
            return override
981
        else:
982
            return self.__estimate_suspend_resume_time(lease, rate)
983

    
984

    
985
    def __estimate_resume_time(self, lease):
986
        """ Estimate the time to resume an entire lease
987
                            
988
        Most of the work is done in __estimate_suspend_resume_time. See
989
        that method's documentation for more details.
990
        
991
        Arguments:
992
        lease -- Lease that is going to be resumed
993
        
994
        """           
995
        rate = get_config().get("resume-rate") 
996
        override = get_config().get("override-resume-time")
997
        if override != None:
998
            return override
999
        else:
1000
            return self.__estimate_suspend_resume_time(lease, rate)    
1001
    
1002
    
1003
    def __estimate_suspend_resume_time(self, lease, rate):
1004
        """ Estimate the time to suspend/resume an entire lease
1005
                            
1006
        Note that, unlike __compute_suspend_resume_time, this estimates
1007
        the time to suspend/resume an entire lease (which may involve
1008
        suspending several VMs)
1009
        
1010
        Arguments:
1011
        lease -- Lease that is going to be suspended/resumed
1012
        rate -- The rate at which an individual VM is suspended/resumed
1013
        
1014
        """              
1015
        susp_exclusion = get_config().get("suspendresume-exclusion")        
1016
        enactment_overhead = get_config().get("enactment-overhead") 
1017
        mem = 0
1018
        for vnode in lease.requested_resources:
1019
            mem += lease.requested_resources[vnode].get_quantity(constants.RES_MEM)
1020
        if susp_exclusion == constants.SUSPRES_EXCLUSION_GLOBAL:
1021
            return lease.numnodes * (self.__compute_suspend_resume_time(mem, rate) + enactment_overhead)
1022
        elif susp_exclusion == constants.SUSPRES_EXCLUSION_LOCAL:
1023
            # Overestimating
1024
            return lease.numnodes * (self.__compute_suspend_resume_time(mem, rate) + enactment_overhead)
1025

    
1026

    
1027
    def __estimate_shutdown_time(self, lease):
1028
        """ Estimate the time to shutdown an entire lease
1029
                            
1030
        Arguments:
1031
        lease -- Lease that is going to be shutdown
1032
        
1033
        """            
1034
        enactment_overhead = get_config().get("enactment-overhead").seconds
1035
        return get_config().get("shutdown-time") + (enactment_overhead * lease.numnodes)
1036

    
1037

    
1038
    def __compute_scheduling_threshold(self, lease):
1039
        """ Compute the scheduling threshold (the 'minimum duration') of a lease
1040
        
1041
        To avoid thrashing, Haizea will not schedule a lease unless all overheads
1042
        can be correctly scheduled (which includes image transfers, suspensions, etc.).
1043
        However, this can still result in situations where a lease is prepared,
1044
        and then immediately suspended because of a blocking lease in the future.
1045
        The scheduling threshold is used to specify that a lease must
1046
        not be scheduled unless it is guaranteed to run for a minimum amount of
1047
        time (the rationale behind this is that you ideally don't want leases
1048
        to be scheduled if they're not going to be active for at least as much time
1049
        as was spent in overheads).
1050
        
1051
        An important part of computing this value is the "scheduling threshold factor".
1052
        The default value is 1, meaning that the lease will be active for at least
1053
        as much time T as was spent on overheads (e.g., if preparing the lease requires
1054
        60 seconds, and we know that it will have to be suspended, requiring 30 seconds,
1055
        Haizea won't schedule the lease unless it can run for at least 90 minutes).
1056
        In other words, a scheduling factor of F required a minimum duration of 
1057
        F*T. A value of 0 could lead to thrashing, since Haizea could end up with
1058
        situations where a lease starts and immediately gets suspended.         
1059
        
1060
        Arguments:
1061
        lease -- Lease for which we want to find the scheduling threshold
1062
        """
1063
        # TODO: Take into account other things like boot overhead, migration overhead, etc.
1064
        config = get_config()
1065
        threshold = config.get("force-scheduling-threshold")
1066
        if threshold != None:
1067
            # If there is a hard-coded threshold, use that
1068
            return threshold
1069
        else:
1070
            factor = config.get("scheduling-threshold-factor")
1071
            
1072
            # First, figure out the "safe duration" (the minimum duration
1073
            # so that we at least allocate enough time for all the
1074
            # overheads).
1075
            susp_overhead = self.__estimate_suspend_time(lease)
1076
            safe_duration = susp_overhead
1077
            
1078
            if lease.get_state() == Lease.STATE_SUSPENDED_QUEUED:
1079
                resm_overhead = self.__estimate_resume_time(lease)
1080
                safe_duration += resm_overhead
1081
            
1082
            # TODO: Incorporate other overheads into the minimum duration
1083
            min_duration = safe_duration
1084
            
1085
            # At the very least, we want to allocate enough time for the
1086
            # safe duration (otherwise, we'll end up with incorrect schedules,
1087
            # where a lease is scheduled to suspend, but isn't even allocated
1088
            # enough time to suspend). 
1089
            # The factor is assumed to be non-negative. i.e., a factor of 0
1090
            # means we only allocate enough time for potential suspend/resume
1091
            # operations, while a factor of 1 means the lease will get as much
1092
            # running time as spend on the runtime overheads involved in setting
1093
            # it up
1094
            threshold = safe_duration + (min_duration * factor)
1095
            return threshold
1096

    
1097

    
1098
    #-------------------------------------------------------------------#
1099
    #                                                                   #
1100
    #                  SLOT TABLE EVENT HANDLERS                        #
1101
    #                                                                   #
1102
    #-------------------------------------------------------------------#
1103

    
1104
    def _handle_start_vm(self, l, rr):
1105
        """ Handles the start of a VMResourceReservation       
1106
        
1107
        Arguments:
1108
        l -- Lease the VMResourceReservation belongs to
1109
        rr -- THe VMResourceReservation
1110
        """        
1111
        self.logger.debug("LEASE-%i Start of handleStartVM" % l.id)
1112
        l.print_contents()
1113
        lease_state = l.get_state()
1114
        if lease_state == Lease.STATE_READY:
1115
            l.set_state(Lease.STATE_ACTIVE)
1116
            rr.state = ResourceReservation.STATE_ACTIVE
1117
            now_time = get_clock().get_time()
1118
            l.start.actual = now_time
1119
            
1120
            try:
1121
                self.resourcepool.start_vms(l, rr)
1122
            except EnactmentError, exc:
1123
                self.logger.error("Enactment error when starting VMs.")
1124
                # Right now, this is a non-recoverable error, so we just
1125
                # propagate it upwards to the lease scheduler
1126
                # In the future, it may be possible to react to these
1127
                # kind of errors.
1128
                raise
1129
                
1130
        elif lease_state == Lease.STATE_RESUMED_READY:
1131
            l.set_state(Lease.STATE_ACTIVE)
1132
            rr.state = ResourceReservation.STATE_ACTIVE
1133
            # No enactment to do here, since all the suspend/resume actions are
1134
            # handled during the suspend/resume RRs
1135
        else:
1136
            raise InconsistentLeaseStateError(l, doing = "starting a VM")
1137
        
1138
        # If this was a future reservation (as determined by backfilling),
1139
        # remove that status, since the future is now.
1140
        if rr.lease in self.future_leases:
1141
            self.future_leases.remove(l)
1142
        
1143
        l.print_contents()
1144
        self.logger.debug("LEASE-%i End of handleStartVM" % l.id)
1145
        self.logger.info("Started VMs for lease %i on nodes %s" % (l.id, rr.nodes.values()))
1146

    
1147

    
1148
    def _handle_end_vm(self, l, rr):
1149
        """ Handles the end of a VMResourceReservation       
1150
        
1151
        Arguments:
1152
        l -- Lease the VMResourceReservation belongs to
1153
        rr -- THe VMResourceReservation
1154
        """        
1155
        self.logger.debug("LEASE-%i Start of handleEndVM" % l.id)
1156
        self.logger.vdebug("LEASE-%i Before:" % l.id)
1157
        l.print_contents()
1158
        now_time = round_datetime(get_clock().get_time())
1159
        diff = now_time - rr.start
1160
        l.duration.accumulate_duration(diff)
1161
        rr.state = ResourceReservation.STATE_DONE
1162
                
1163
        self.logger.vdebug("LEASE-%i After:" % l.id)
1164
        l.print_contents()
1165
        self.logger.debug("LEASE-%i End of handleEndVM" % l.id)
1166
        self.logger.info("Stopped VMs for lease %i on nodes %s" % (l.id, rr.nodes.values()))
1167

    
1168

    
1169
    def _handle_unscheduled_end_vm(self, l, vmrr):
1170
        """ Handles the unexpected end of a VMResourceReservation
1171
        
1172
        Arguments:
1173
        l -- Lease the VMResourceReservation belongs to
1174
        rr -- THe VMResourceReservation
1175
        """        
1176
        
1177
        self.logger.info("LEASE-%i The VM has ended prematurely." % l.id)
1178
        for rr in vmrr.post_rrs:
1179
            self.slottable.remove_reservation(rr)
1180
        vmrr.post_rrs = []
1181
        vmrr.end = get_clock().get_time()
1182
        self._handle_end_vm(l, vmrr)
1183

    
1184

    
1185
    def _handle_start_suspend(self, l, rr):
1186
        """ Handles the start of a SuspensionResourceReservation       
1187
        
1188
        Arguments:
1189
        l -- Lease the SuspensionResourceReservation belongs to
1190
        rr -- The SuspensionResourceReservation
1191
        
1192
        """
1193
        self.logger.debug("LEASE-%i Start of handleStartSuspend" % l.id)
1194
        l.print_contents()
1195
        rr.state = ResourceReservation.STATE_ACTIVE
1196
        
1197
        try:
1198
            self.resourcepool.suspend_vms(l, rr)
1199
        except EnactmentError, exc:
1200
            self.logger.error("Enactment error when suspending VMs.")
1201
            # Right now, this is a non-recoverable error, so we just
1202
            # propagate it upwards to the lease scheduler
1203
            # In the future, it may be possible to react to these
1204
            # kind of errors.
1205
            raise            
1206

    
1207
        if rr.is_first():
1208
            l.set_state(Lease.STATE_SUSPENDING)
1209
            l.print_contents()
1210
            self.logger.info("Suspending lease %i..." % (l.id))
1211
        self.logger.debug("LEASE-%i End of handleStartSuspend" % l.id)
1212

    
1213

    
1214
    def _handle_end_suspend(self, l, rr):
1215
        """ Handles the end of a SuspensionResourceReservation       
1216
        
1217
        Arguments:
1218
        l -- Lease the SuspensionResourceReservation belongs to
1219
        rr -- The SuspensionResourceReservation
1220
        """               
1221
        self.logger.debug("LEASE-%i Start of handleEndSuspend" % l.id)
1222
        l.print_contents()
1223
        # TODO: React to incomplete suspend
1224
        self.resourcepool.verify_suspend(l, rr)
1225
        rr.state = ResourceReservation.STATE_DONE
1226
        if rr.is_last():
1227
            l.set_state(Lease.STATE_SUSPENDED_PENDING)
1228
        l.print_contents()
1229
        self.logger.debug("LEASE-%i End of handleEndSuspend" % l.id)
1230
        self.logger.info("Lease %i suspended." % (l.id))
1231
        
1232
        if l.get_state() == Lease.STATE_SUSPENDED_PENDING:
1233
            raise RescheduleLeaseException
1234

    
1235

    
1236
    def _handle_start_resume(self, l, rr):
1237
        """ Handles the start of a ResumptionResourceReservation       
1238
        
1239
        Arguments:
1240
        l -- Lease the ResumptionResourceReservation belongs to
1241
        rr -- The ResumptionResourceReservation
1242
        
1243
        """             
1244
        self.logger.debug("LEASE-%i Start of handleStartResume" % l.id)
1245
        l.print_contents()
1246
        
1247
        try:
1248
            self.resourcepool.resume_vms(l, rr)
1249
        except EnactmentError, exc:
1250
            self.logger.error("Enactment error when resuming VMs.")
1251
            # Right now, this is a non-recoverable error, so we just
1252
            # propagate it upwards to the lease scheduler
1253
            # In the future, it may be possible to react to these
1254
            # kind of errors.
1255
            raise
1256
                    
1257
        rr.state = ResourceReservation.STATE_ACTIVE
1258
        if rr.is_first():
1259
            l.set_state(Lease.STATE_RESUMING)
1260
            l.print_contents()
1261
            self.logger.info("Resuming lease %i..." % (l.id))
1262
        self.logger.debug("LEASE-%i End of handleStartResume" % l.id)
1263

    
1264

    
1265
    def _handle_end_resume(self, l, rr):
1266
        """ Handles the end of a ResumptionResourceReservation       
1267
        
1268
        Arguments:
1269
        l -- Lease the ResumptionResourceReservation belongs to
1270
        rr -- The ResumptionResourceReservation
1271
        
1272
        """        
1273
        self.logger.debug("LEASE-%i Start of handleEndResume" % l.id)
1274
        l.print_contents()
1275
        # TODO: React to incomplete resume
1276
        self.resourcepool.verify_resume(l, rr)
1277
        rr.state = ResourceReservation.STATE_DONE
1278
        if rr.is_last():
1279
            l.set_state(Lease.STATE_RESUMED_READY)
1280
            self.logger.info("Resumed lease %i" % (l.id))
1281
        for vnode, pnode in rr.vmrr.nodes.items():
1282
            self.resourcepool.remove_ramfile(pnode, l.id, vnode)
1283
        l.print_contents()
1284
        self.logger.debug("LEASE-%i End of handleEndResume" % l.id)
1285

    
1286

    
1287
    def _handle_start_shutdown(self, l, rr):
1288
        """ Handles the start of a ShutdownResourceReservation       
1289
        
1290
        Arguments:
1291
        l -- Lease the SuspensionResourceReservation belongs to
1292
        rr -- The SuspensionResourceReservation
1293
        """        
1294
        
1295
        self.logger.debug("LEASE-%i Start of handleStartShutdown" % l.id)
1296
        l.print_contents()
1297
        rr.state = ResourceReservation.STATE_ACTIVE
1298
        try:
1299
            self.resourcepool.stop_vms(l, rr)
1300
        except EnactmentError, exc:
1301
            self.logger.error("Enactment error when shutting down VMs.")
1302
            # Right now, this is a non-recoverable error, so we just
1303
            # propagate it upwards to the lease scheduler
1304
            # In the future, it may be possible to react to these
1305
            # kind of errors.
1306
            raise
1307
        
1308
        l.print_contents()
1309
        self.logger.debug("LEASE-%i End of handleStartShutdown" % l.id)
1310

    
1311

    
1312
    def _handle_end_shutdown(self, l, rr):
1313
        """ Handles the end of a SuspensionResourceReservation       
1314
        
1315
        Arguments:
1316
        l -- Lease the SuspensionResourceReservation belongs to
1317
        rr -- The SuspensionResourceReservation
1318
        
1319
        """
1320
        self.logger.debug("LEASE-%i Start of handleEndShutdown" % l.id)
1321
        l.print_contents()
1322
        rr.state = ResourceReservation.STATE_DONE
1323
        l.print_contents()
1324
        self.logger.debug("LEASE-%i End of handleEndShutdown" % l.id)
1325
        self.logger.info("Lease %i's VMs have shutdown." % (l.id))
1326
        raise NormalEndLeaseException
1327
    
1328

    
1329
    def _handle_start_migrate(self, l, rr):
1330
        """ Handles the start of a MemImageMigrationResourceReservation       
1331
        
1332
        Arguments:
1333
        l -- Lease the MemImageMigrationResourceReservation belongs to
1334
        rr -- The MemImageMigrationResourceReservation
1335
        
1336
        """             
1337
        self.logger.debug("LEASE-%i Start of handleStartMigrate" % l.id)
1338
        l.print_contents()
1339
        rr.state = ResourceReservation.STATE_ACTIVE
1340
        l.print_contents()
1341
        self.logger.debug("LEASE-%i End of handleStartMigrate" % l.id)
1342
        self.logger.info("Migrating lease %i..." % (l.id))
1343

    
1344

    
1345
    def _handle_end_migrate(self, l, rr):
1346
        """ Handles the end of a MemImageMigrationResourceReservation       
1347
        
1348
        Arguments:
1349
        l -- Lease the MemImageMigrationResourceReservation belongs to
1350
        rr -- The MemImageMigrationResourceReservation
1351
        
1352
        """                
1353
        self.logger.debug("LEASE-%i Start of handleEndMigrate" % l.id)
1354
        l.print_contents()
1355

    
1356
        for vnode in rr.transfers:
1357
            origin = rr.transfers[vnode][0]
1358
            dest = rr.transfers[vnode][1]
1359
            
1360
            # Update RAM files
1361
            self.resourcepool.remove_ramfile(origin, l.id, vnode)
1362
            self.resourcepool.add_ramfile(dest, l.id, vnode, l.requested_resources[vnode].get_quantity(constants.RES_MEM))
1363
        
1364
        rr.state = ResourceReservation.STATE_DONE
1365
        l.print_contents()
1366
        self.logger.debug("LEASE-%i End of handleEndMigrate" % l.id)
1367
        self.logger.info("Migrated lease %i..." % (l.id))
1368

    
1369

    
1370

    
1371
class VMResourceReservation(ResourceReservation):
1372
    def __init__(self, lease, start, end, nodes, res):
1373
        ResourceReservation.__init__(self, lease, start, end, res)
1374
        self.nodes = nodes # { vnode -> pnode }
1375
        self.pre_rrs = []
1376
        self.post_rrs = []
1377

    
1378
        # ONLY for simulation
1379
        self.__update_prematureend()
1380

    
1381
    def update_start(self, time):
1382
        self.start = time
1383
        # ONLY for simulation
1384
        self.__update_prematureend()
1385

    
1386
    def update_end(self, time):
1387
        self.end = time
1388
        # ONLY for simulation
1389
        self.__update_prematureend()
1390
        
1391
    # ONLY for simulation
1392
    def __update_prematureend(self):
1393
        if self.lease.duration.known != None:
1394
            remdur = self.lease.duration.get_remaining_known_duration()
1395
            rrdur = self.end - self.start
1396
            if remdur < rrdur:
1397
                self.prematureend = self.start + remdur
1398
                # Kludgy, but this corner case actually does happen
1399
                # (because of preemptions, it may turn out that
1400
                # the premature end time coincides with the
1401
                # starting time of the VMRR)
1402
                if self.prematureend == self.start:
1403
                    self.prematureend += 1 
1404
            else:
1405
                self.prematureend = None
1406
        else:
1407
            self.prematureend = None 
1408

    
1409
    def get_final_end(self):
1410
        if len(self.post_rrs) == 0:
1411
            return self.end
1412
        else:
1413
            return self.post_rrs[-1].end
1414

    
1415
    def is_suspending(self):
1416
        return len(self.post_rrs) > 0 and isinstance(self.post_rrs[0], SuspensionResourceReservation)
1417

    
1418
    def is_shutting_down(self):
1419
        return len(self.post_rrs) > 0 and isinstance(self.post_rrs[0], ShutdownResourceReservation)
1420

    
1421
    def print_contents(self, loglevel=constants.LOGLEVEL_VDEBUG):
1422
        for resmrr in self.pre_rrs:
1423
            resmrr.print_contents(loglevel)
1424
            self.logger.log(loglevel, "--")
1425
        self.logger.log(loglevel, "Type           : VM")
1426
        self.logger.log(loglevel, "Nodes          : %s" % pretty_nodemap(self.nodes))
1427
        if self.prematureend != None:
1428
            self.logger.log(loglevel, "Premature end  : %s" % self.prematureend)
1429
        ResourceReservation.print_contents(self, loglevel)
1430
        for susprr in self.post_rrs:
1431
            self.logger.log(loglevel, "--")
1432
            susprr.print_contents(loglevel)
1433

    
1434
        
1435
class SuspensionResourceReservation(ResourceReservation):
1436
    def __init__(self, lease, start, end, res, vnodes, vmrr):
1437
        ResourceReservation.__init__(self, lease, start, end, res)
1438
        self.vmrr = vmrr
1439
        self.vnodes = vnodes
1440

    
1441
    def print_contents(self, loglevel=constants.LOGLEVEL_VDEBUG):
1442
        self.logger.log(loglevel, "Type           : SUSPEND")
1443
        self.logger.log(loglevel, "Vnodes         : %s" % self.vnodes)
1444
        ResourceReservation.print_contents(self, loglevel)
1445
        
1446
    def is_first(self):
1447
        return (self == self.vmrr.post_rrs[0])
1448

    
1449
    def is_last(self):
1450
        return (self == self.vmrr.post_rrs[-1])   
1451
        
1452
        
1453
class ResumptionResourceReservation(ResourceReservation):
1454
    def __init__(self, lease, start, end, res, vnodes, vmrr):
1455
        ResourceReservation.__init__(self, lease, start, end, res)
1456
        self.vmrr = vmrr
1457
        self.vnodes = vnodes
1458

    
1459
    def print_contents(self, loglevel=constants.LOGLEVEL_VDEBUG):
1460
        self.logger.log(loglevel, "Type           : RESUME")
1461
        self.logger.log(loglevel, "Vnodes         : %s" % self.vnodes)
1462
        ResourceReservation.print_contents(self, loglevel)
1463

    
1464
    def is_first(self):
1465
        resm_rrs = [r for r in self.vmrr.pre_rrs if isinstance(r, ResumptionResourceReservation)]
1466
        return (self == resm_rrs[0])
1467

    
1468
    def is_last(self):
1469
        resm_rrs = [r for r in self.vmrr.pre_rrs if isinstance(r, ResumptionResourceReservation)]
1470
        return (self == resm_rrs[-1])
1471
    
1472
    
1473
class ShutdownResourceReservation(ResourceReservation):
1474
    def __init__(self, lease, start, end, res, vnodes, vmrr):
1475
        ResourceReservation.__init__(self, lease, start, end, res)
1476
        self.vmrr = vmrr
1477
        self.vnodes = vnodes
1478

    
1479
    def print_contents(self, loglevel=constants.LOGLEVEL_VDEBUG):
1480
        self.logger.log(loglevel, "Type           : SHUTDOWN")
1481
        ResourceReservation.print_contents(self, loglevel)
1482

    
1483

    
1484
class MemImageMigrationResourceReservation(MigrationResourceReservation):
1485
    def __init__(self, lease, start, end, res, vmrr, transfers):
1486
        MigrationResourceReservation.__init__(self, lease, start, end, res, vmrr, transfers)
1487
  
1488
    def print_contents(self, loglevel=constants.LOGLEVEL_VDEBUG):
1489
        self.logger.log(loglevel, "Type           : MEM IMAGE MIGRATION")
1490
        self.logger.log(loglevel, "Transfers      : %s" % self.transfers)
1491
        ResourceReservation.print_contents(self, loglevel)