Project

General

Profile

root / trunk / src / haizea / core / scheduler / vm_scheduler.py @ 676

1
# -------------------------------------------------------------------------- #
2
# Copyright 2006-2009, University of Chicago                                 #
3
# Copyright 2008-2009, Distributed Systems Architecture Group, Universidad   #
4
# Complutense de Madrid (dsa-research.org)                                   #
5
#                                                                            #
6
# Licensed under the Apache License, Version 2.0 (the "License"); you may    #
7
# not use this file except in compliance with the License. You may obtain    #
8
# a copy of the License at                                                   #
9
#                                                                            #
10
# http://www.apache.org/licenses/LICENSE-2.0                                 #
11
#                                                                            #
12
# Unless required by applicable law or agreed to in writing, software        #
13
# distributed under the License is distributed on an "AS IS" BASIS,          #
14
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.   #
15
# See the License for the specific language governing permissions and        #
16
# limitations under the License.                                             #
17
# -------------------------------------------------------------------------- #
18

    
19
"""This module provides the main classes for Haizea's VM Scheduler. All the
20
scheduling code that decides when and where a lease is scheduled is contained
21
in the VMScheduler class (except for the code that specifically decides
22
what physical machines each virtual machine is mapped to, which is factored out
23
into the "mapper" module). This module also provides the classes for the
24
reservations that will be placed in the slot table and correspond to VMs. 
25
"""
26

    
27
import haizea.common.constants as constants
28
from haizea.common.utils import round_datetime_delta, round_datetime, estimate_transfer_time, pretty_nodemap, get_config, get_clock, get_persistence
29
from haizea.core.leases import Lease, Capacity
30
from haizea.core.scheduler.slottable import ResourceReservation, ResourceTuple
31
from haizea.core.scheduler import ReservationEventHandler, RescheduleLeaseException, NormalEndLeaseException, EnactmentError, NotSchedulableException, InconsistentScheduleError, InconsistentLeaseStateError, MigrationResourceReservation
32
from operator import attrgetter, itemgetter
33
from mx.DateTime import TimeDelta
34

    
35
import logging
36

    
37

    
38
class VMScheduler(object):
39
    """The Haizea VM Scheduler
40
    
41
    This class is responsible for taking a lease and scheduling VMs to satisfy
42
    the requirements of that lease.
43
    """
44
    
45
    def __init__(self, slottable, resourcepool, mapper, max_in_future):
46
        """Constructor
47
        
48
        The constructor does little more than create the VM scheduler's
49
        attributes. However, it does expect (in the arguments) a fully-constructed 
50
        SlotTable, ResourcePool, and Mapper (these are constructed in the 
51
        Manager's constructor). 
52
        
53
        Arguments:
54
        slottable -- Slot table
55
        resourcepool -- Resource pool where enactment commands will be sent to
56
        mapper -- Mapper
57
        """        
58
        self.slottable = slottable
59
        self.resourcepool = resourcepool
60
        self.mapper = mapper
61
        self.logger = logging.getLogger("VMSCHED")
62
        
63
        # Register the handlers for the types of reservations used by
64
        # the VM scheduler
65
        self.handlers = {}
66
        self.handlers[VMResourceReservation] = ReservationEventHandler(
67
                                sched    = self,
68
                                on_start = VMScheduler._handle_start_vm,
69
                                on_end   = VMScheduler._handle_end_vm)
70

    
71
        self.handlers[ShutdownResourceReservation] = ReservationEventHandler(
72
                                sched    = self,
73
                                on_start = VMScheduler._handle_start_shutdown,
74
                                on_end   = VMScheduler._handle_end_shutdown)
75

    
76
        self.handlers[SuspensionResourceReservation] = ReservationEventHandler(
77
                                sched    = self,
78
                                on_start = VMScheduler._handle_start_suspend,
79
                                on_end   = VMScheduler._handle_end_suspend)
80

    
81
        self.handlers[ResumptionResourceReservation] = ReservationEventHandler(
82
                                sched    = self,
83
                                on_start = VMScheduler._handle_start_resume,
84
                                on_end   = VMScheduler._handle_end_resume)
85

    
86
        self.handlers[MemImageMigrationResourceReservation] = ReservationEventHandler(
87
                                sched    = self,
88
                                on_start = VMScheduler._handle_start_migrate,
89
                                on_end   = VMScheduler._handle_end_migrate)
90
        
91
        self.max_in_future = max_in_future
92
        
93
        self.future_leases = set()
94

    
95

    
96
    def schedule(self, lease, nexttime, earliest):
97
        """ The scheduling function
98
        
99
        This particular function doesn't do much except call __schedule_asap
100
        and __schedule_exact (which do all the work).
101
        
102
        Arguments:
103
        lease -- Lease to schedule
104
        nexttime -- The next time at which the scheduler can allocate resources.
105
        earliest -- The earliest possible starting times on each physical node
106
        """        
107
        if lease.get_type() == Lease.BEST_EFFORT:
108
            return self.__schedule_asap(lease, nexttime, earliest, allow_in_future = self.can_schedule_in_future())
109
        elif lease.get_type() == Lease.ADVANCE_RESERVATION:
110
            return self.__schedule_exact(lease, nexttime, earliest)
111
        elif lease.get_type() == Lease.IMMEDIATE:
112
            return self.__schedule_asap(lease, nexttime, earliest, allow_in_future = False)
113

    
114

    
115
    def estimate_migration_time(self, lease):
116
        """ Estimates the time required to migrate a lease's VMs
117

118
        This function conservatively estimates that all the VMs are going to
119
        be migrated to other nodes. Since all the transfers are intra-node,
120
        the bottleneck is the transfer from whatever node has the most
121
        memory to transfer.
122
        
123
        Note that this method only estimates the time to migrate the memory
124
        state files for the VMs. Migrating the software environment (which may
125
        or may not be a disk image) is the responsibility of the preparation
126
        scheduler, which has it's own set of migration scheduling methods.
127

128
        Arguments:
129
        lease -- Lease that might be migrated
130
        """                
131
        migration = get_config().get("migration")
132
        if migration == constants.MIGRATE_YES:
133
            vmrr = lease.get_last_vmrr()
134
            mem_in_pnode = dict([(pnode,0) for pnode in set(vmrr.nodes.values())])
135
            for pnode in vmrr.nodes.values():
136
                mem = vmrr.resources_in_pnode[pnode].get_by_type(constants.RES_MEM)
137
                mem_in_pnode[pnode] += mem
138
            max_mem_to_transfer = max(mem_in_pnode.values())
139
            bandwidth = self.resourcepool.info.get_migration_bandwidth()
140
            return estimate_transfer_time(max_mem_to_transfer, bandwidth)
141
        elif migration == constants.MIGRATE_YES_NOTRANSFER:
142
            return TimeDelta(seconds=0)        
143

    
144
    def schedule_migration(self, lease, vmrr, nexttime):
145
        """ Schedules migrations for a lease
146

147
        Arguments:
148
        lease -- Lease being migrated
149
        vmrr -- The VM reservation before which the migration will take place
150
        nexttime -- The next time at which the scheduler can allocate resources.
151
        """
152
        
153
        # Determine what migrations have to be done. We do this by looking at
154
        # the mapping in the previous VM RR and in the new VM RR
155
        last_vmrr = lease.get_last_vmrr()
156
        vnode_migrations = dict([(vnode, (last_vmrr.nodes[vnode], vmrr.nodes[vnode])) for vnode in vmrr.nodes])
157
        
158
        # Determine if we actually have to migrate
159
        mustmigrate = False
160
        for vnode in vnode_migrations:
161
            if vnode_migrations[vnode][0] != vnode_migrations[vnode][1]:
162
                mustmigrate = True
163
                break
164
            
165
        if not mustmigrate:
166
            return []
167

    
168
        # If Haizea is configured to migrate without doing any transfers,
169
        # then we just return a nil-duration migration RR
170
        if get_config().get("migration") == constants.MIGRATE_YES_NOTRANSFER:
171
            start = nexttime
172
            end = nexttime
173
            res = {}
174
            migr_rr = MemImageMigrationResourceReservation(lease, start, end, res, vmrr, vnode_migrations)
175
            migr_rr.state = ResourceReservation.STATE_SCHEDULED
176
            return [migr_rr]
177

    
178
        # Figure out what migrations can be done simultaneously
179
        migrations = []
180
        while len(vnode_migrations) > 0:
181
            pnodes = set()
182
            migration = {}
183
            for vnode in vnode_migrations:
184
                origin = vnode_migrations[vnode][0]
185
                dest = vnode_migrations[vnode][1]
186
                if not origin in pnodes and not dest in pnodes:
187
                    migration[vnode] = vnode_migrations[vnode]
188
                    pnodes.add(origin)
189
                    pnodes.add(dest)
190
            for vnode in migration:
191
                del vnode_migrations[vnode]
192
            migrations.append(migration)
193
        
194
        # Create migration RRs
195
        start = max(last_vmrr.post_rrs[-1].end, nexttime)
196
        bandwidth = self.resourcepool.info.get_migration_bandwidth()
197
        migr_rrs = []
198
        for m in migrations:
199
            vnodes_to_migrate = m.keys()
200
            max_mem_to_migrate = max([lease.requested_resources[vnode].get_quantity(constants.RES_MEM) for vnode in vnodes_to_migrate])
201
            migr_time = estimate_transfer_time(max_mem_to_migrate, bandwidth)
202
            end = start + migr_time
203
            res = {}
204
            for (origin,dest) in m.values():
205
                resorigin = Capacity([constants.RES_NETOUT])
206
                resorigin.set_quantity(constants.RES_NETOUT, bandwidth)
207
                resdest = Capacity([constants.RES_NETIN])
208
                resdest.set_quantity(constants.RES_NETIN, bandwidth)
209
                res[origin] = self.slottable.create_resource_tuple_from_capacity(resorigin)
210
                res[dest] = self.slottable.create_resource_tuple_from_capacity(resdest)                
211
            migr_rr = MemImageMigrationResourceReservation(lease, start, start + migr_time, res, vmrr, m)
212
            migr_rr.state = ResourceReservation.STATE_SCHEDULED
213
            migr_rrs.append(migr_rr)
214
            start = end
215
            
216
        return migr_rrs
217

    
218
    def cancel_vm(self, vmrr):
219
        """ Cancels a VM resource reservation
220

221
        Arguments:
222
        vmrr -- VM RR to be cancelled
223
        """         
224
        
225
        # If this VM RR is part of a lease that was scheduled in the future,
226
        # remove that lease from the set of future leases.
227
        if vmrr.lease in self.future_leases:
228
            self.future_leases.remove(vmrr.lease)
229
            get_persistence().persist_future_leases(self.future_leases)
230

    
231
        # If there are any pre-RRs that are scheduled, remove them
232
        for rr in vmrr.pre_rrs:
233
            if rr.state == ResourceReservation.STATE_SCHEDULED:
234
                self.slottable.remove_reservation(rr)
235

    
236
        # If there are any post RRs, remove them
237
        for rr in vmrr.post_rrs:
238
            self.slottable.remove_reservation(rr)
239
        
240
        # Remove the reservation itself
241
        self.slottable.remove_reservation(vmrr)
242

    
243

    
244
    def can_suspend_at(self, lease, t):
245
        """ Determines if it is possible to suspend a lease before a given time
246

247
        Arguments:
248
        vmrr -- VM RR to be preempted
249
        t -- Time by which the VM must be preempted
250
        """                     
251
        # TODO: Make more general, should determine vmrr based on current time
252
        # This won't currently break, though, since the calling function 
253
        # operates on the last VM RR.
254
        vmrr = lease.get_last_vmrr()
255
        time_until_suspend = t - vmrr.start
256
        min_duration = self.__compute_scheduling_threshold(lease)
257
        can_suspend = time_until_suspend >= min_duration        
258
        return can_suspend
259
    
260
    
261
    def preempt_vm(self, vmrr, t):
262
        """ Preempts a VM reservation at a given time
263

264
        This method assumes that the lease is, in fact, preemptable,
265
        that the VMs are running at the given time, and that there is 
266
        enough time to suspend the VMs before the given time (all these
267
        checks are done in the lease scheduler).
268
        
269
        Arguments:
270
        vmrr -- VM RR to be preempted
271
        t -- Time by which the VM must be preempted
272
        """             
273
        
274
        # Save original start and end time of the vmrr
275
        old_start = vmrr.start
276
        old_end = vmrr.end
277
        
278
        # Schedule the VM suspension
279
        self.__schedule_suspension(vmrr, t)
280
        
281
        # Update the VMRR in the slot table
282
        self.slottable.update_reservation_with_key_change(vmrr, old_start, old_end)
283
        
284
        # Add the suspension RRs to the VM's post-RRs
285
        for susprr in vmrr.post_rrs:
286
            self.slottable.add_reservation(susprr)
287
            
288
            
289
    def get_future_reschedulable_leases(self):
290
        """ Returns a list of future leases that are reschedulable.
291

292
        Currently, this list is just the best-effort leases scheduled
293
        in the future as determined by the backfilling algorithm.
294
        Advance reservation leases, by their nature, cannot be 
295
        rescheduled to find a "better" starting time.
296
        """             
297
        return list(self.future_leases)
298
    
299

    
300
    def can_schedule_in_future(self):
301
        """ Returns True if the backfilling algorithm would allow a lease
302
        to be scheduled in the future.
303

304
        """             
305
        if self.max_in_future == -1: # Unlimited
306
            return True
307
        else:
308
            return len(self.future_leases) < self.max_in_future
309

    
310
        
311
    def get_utilization(self, time):
312
        """ Computes resource utilization (currently just CPU-based)
313
        
314
        This utilization information shows what 
315
        portion of the physical resources is used by each type of reservation 
316
        (e.g., 70% are running a VM, 5% are doing suspensions, etc.)
317

318
        Arguments:
319
        time -- Time at which to determine utilization
320
        """         
321
        total = self.slottable.get_total_capacity(restype = constants.RES_CPU)
322
        util = {}
323
        reservations = self.slottable.get_reservations_at(time)
324
        for r in reservations:
325
            for node in r.resources_in_pnode:
326
                if isinstance(r, VMResourceReservation):
327
                    use = r.resources_in_pnode[node].get_by_type(constants.RES_CPU)
328
                    util[type(r)] = use + util.setdefault(type(r),0.0)
329
                elif isinstance(r, SuspensionResourceReservation) or isinstance(r, ResumptionResourceReservation) or isinstance(r, ShutdownResourceReservation):
330
                    use = r.vmrr.resources_in_pnode[node].get_by_type(constants.RES_CPU)
331
                    util[type(r)] = use + util.setdefault(type(r),0.0)
332
        util[None] = total - sum(util.values())
333
        
334
        if total != 0:
335
            for k in util:
336
                util[k] /= total
337
            
338
        return util              
339
        
340

    
341
    def __schedule_exact(self, lease, nexttime, earliest):
342
        """ Schedules VMs that must start at an exact time
343
        
344
        This type of lease is "easy" to schedule because we know the exact
345
        start time, which means that's the only starting time we have to
346
        check. So, this method does little more than call the mapper.
347
        
348
        Arguments:
349
        lease -- Lease to schedule
350
        nexttime -- The next time at which the scheduler can allocate resources.
351
        earliest -- The earliest possible starting times on each physical node
352
        """             
353
        
354
        # Determine the start and end time
355
        start = lease.start.requested
356
        end = start + lease.duration.requested
357
        
358
        # Convert Capacity objects in lease object into ResourceTuples that
359
        # we can hand over to the mapper.
360
        requested_resources = dict([(k,self.slottable.create_resource_tuple_from_capacity(v)) for k,v in lease.requested_resources.items()])
361

    
362
        # Let the mapper do its magiv
363
        mapping, actualend, preemptions = self.mapper.map(lease, 
364
                                                          requested_resources,
365
                                                          start, 
366
                                                          end, 
367
                                                          strictend = True)
368
        
369
        # If no mapping was found, tell the lease scheduler about it
370
        if mapping == None:
371
            raise NotSchedulableException, "Not enough resources in specified interval"
372
        
373
        # Create VM resource reservations
374
        res = {}
375
        
376
        for (vnode,pnode) in mapping.items():
377
            vnode_res = requested_resources[vnode]
378
            if res.has_key(pnode):
379
                res[pnode].incr(vnode_res)
380
            else:
381
                res[pnode] = ResourceTuple.copy(vnode_res)
382
        
383
        vmrr = VMResourceReservation(lease, start, end, mapping, res)
384
        vmrr.state = ResourceReservation.STATE_SCHEDULED
385

    
386
        # Schedule shutdown for the VM
387
        self.__schedule_shutdown(vmrr)
388
        
389
        return vmrr, preemptions
390

    
391

    
392
    def __schedule_asap(self, lease, nexttime, earliest, allow_in_future = None):
393
        """ Schedules VMs as soon as possible
394
        
395
        This method is a bit more complex that __schedule_exact because
396
        we need to figure out what "as soon as possible" actually is.
397
        This involves attempting several mappings, at different points
398
        in time, before we can schedule the lease.
399
        
400
        This method will always check, at least, if the lease can be scheduled
401
        at the earliest possible moment at which the lease could be prepared
402
        (e.g., if the lease can't start until 1 hour in the future because that's
403
        the earliest possible time at which the disk images it requires can
404
        be transferred, then that's when the scheduler will check). Note, however,
405
        that this "earliest possible moment" is determined by the preparation
406
        scheduler.
407
        
408
        Additionally, if the lease can't be scheduled at the earliest
409
        possible moment, it can also check if the lease can be scheduled
410
        in the future. This partially implements a backfilling algorithm
411
        (the maximum number of future leases is stored in the max_in_future
412
        attribute of VMScheduler), the other part being implemented in the
413
        __process_queue method of LeaseScheduler.
414
        
415
        Note that, if the method is allowed to scheduled in the future,
416
        and assuming that the lease doesn't request more resources than
417
        the site itself, this method will always schedule the VMs succesfully
418
        (since there's always an empty spot somewhere in the future).
419
        
420
        
421
        Arguments:
422
        lease -- Lease to schedule
423
        nexttime -- The next time at which the scheduler can allocate resources.
424
        earliest -- The earliest possible starting times on each physical node
425
        allow_in_future -- Boolean indicating whether the scheduler is
426
        allowed to schedule the VMs in the future.
427
        """                
428
        
429

    
430

    
431
        #
432
        # STEP 1: PROLEGOMENA
433
        #
434
        
435
        lease_id = lease.id
436
        remaining_duration = lease.duration.get_remaining_duration()
437
        shutdown_time = self.__estimate_shutdown_time(lease)
438
        
439
        # We might be scheduling a suspended lease. If so, we will
440
        # also have to schedule its resumption. Right now, just 
441
        # figure out if this is such a lease.
442
        mustresume = (lease.get_state() in (Lease.STATE_SUSPENDED_PENDING, Lease.STATE_SUSPENDED_QUEUED, Lease.STATE_SUSPENDED_SCHEDULED))
443

    
444
        # This is the minimum duration that we must be able to schedule.
445
        # See __compute_scheduling_threshold for more details.
446
        min_duration = self.__compute_scheduling_threshold(lease)
447
        
448

    
449
        #
450
        # STEP 2: FIND THE CHANGEPOINTS
451
        #
452

    
453
        # Find the changepoints, and the available nodes at each changepoint
454
        # We need to do this because the preparation scheduler may have
455
        # determined that some nodes might require more time to prepare
456
        # than others (e.g., if using disk image caching, some nodes
457
        # might have the required disk image predeployed, while others
458
        # may require transferring the image to that node).
459
        # 
460
        # The end result of this step is a list (cps) where each entry
461
        # is a (t,nodes) pair, where "t" is the time of the changepoint
462
        # and "nodes" is the set of nodes that are available at that time.
463
        
464
        if not mustresume:
465
            # If this is not a suspended lease, then the changepoints
466
            # are determined based on the "earliest" parameter.
467
            cps = [(node, e.time) for node, e in earliest.items()]
468
            cps.sort(key=itemgetter(1))
469
            curcp = None
470
            changepoints = []
471
            nodes = []
472
            for node, time in cps:
473
                nodes.append(node)
474
                if time != curcp:
475
                    changepoints.append([time, set(nodes)])
476
                    curcp = time
477
                else:
478
                    changepoints[-1][1] = set(nodes)
479
        else:
480
            # If the lease is suspended, we take into account that, if
481
            # migration is disabled, we can only schedule the lease
482
            # on the nodes it is currently scheduled on.
483
            if get_config().get("migration") == constants.MIGRATE_NO:
484
                vmrr = lease.get_last_vmrr()
485
                onlynodes = set(vmrr.nodes.values())
486
            else:
487
                onlynodes = None               
488
            changepoints = list(set([x.time for x in earliest.values()]))
489
            changepoints.sort()
490
            changepoints = [(x, onlynodes) for x in changepoints]
491

    
492

    
493
        # If we can schedule VMs in the future,
494
        # we also consider future changepoints
495
        if allow_in_future:
496
            res = self.slottable.get_reservations_ending_after(changepoints[-1][0])
497
            # We really only care about changepoints where VMs end (which is
498
            # when resources become available)
499
            futurecp = [r.get_final_end() for r in res if isinstance(r, VMResourceReservation)]
500
            # Corner case: Sometimes we're right in the middle of a ShutdownReservation, so it won't be
501
            # included in futurecp.
502
            futurecp += [r.end for r in res if isinstance(r, ShutdownResourceReservation) and not r.vmrr in res]
503
            if not mustresume:
504
                futurecp = [(p,None) for p in futurecp]
505
            else:
506
                futurecp = [(p,onlynodes) for p in futurecp]                
507
        else:
508
            futurecp = []
509
            
510

    
511
        #
512
        # STEP 3: FIND A MAPPING
513
        #
514
        
515
        # In this step we find a starting time and a mapping for the VMs,
516
        # which involves going through the changepoints in order and seeing
517
        # if we can find a mapping.
518
        # Most of the work is done in the __find_fit_at_points
519
        
520
        # If resuming, we also have to allocate enough time for the resumption
521
        if mustresume:
522
            duration = remaining_duration + self.__estimate_resume_time(lease)
523
        else:
524
            duration = remaining_duration
525

    
526
        duration += shutdown_time
527

    
528
        in_future = False
529

    
530
        # Convert Capacity objects in lease object into ResourceTuples that
531
        # we can hand over to the mapper.
532
        requested_resources = dict([(k,self.slottable.create_resource_tuple_from_capacity(v)) for k,v in lease.requested_resources.items()])
533

    
534
        # First, try to find a mapping assuming we can't schedule in the future
535
        start, end, mapping, preemptions = self.__find_fit_at_points(lease,
536
                                                                     requested_resources,
537
                                                                     changepoints, 
538
                                                                     duration, 
539
                                                                     min_duration)
540
        
541
        if start == None and not allow_in_future:
542
            # We did not find a suitable starting time. This can happen
543
            # if we're unable to schedule in the future
544
            raise NotSchedulableException, "Could not find enough resources for this request"
545

    
546
        # If we haven't been able to fit the lease, check if we can
547
        # reserve it in the future
548
        if start == None and allow_in_future:
549
            start, end, mapping, preemptions = self.__find_fit_at_points(lease,
550
                                                                         requested_resources,
551
                                                                         futurecp, 
552
                                                                         duration, 
553
                                                                         min_duration
554
                                                                         )
555
            # TODO: The following will also raise an exception if a lease
556
            # makes a request that could *never* be satisfied with the
557
            # current resources.
558
            if start == None:
559
                raise InconsistentScheduleError, "Could not find a mapping in the future (this should not happen)"
560

    
561
            in_future = True
562

    
563
        #
564
        # STEP 4: CREATE RESERVATIONS
565
        #
566
        
567
        # At this point, the lease is feasible. We just need to create
568
        # the reservations for the VMs and, possibly, for the VM resumption,
569
        # suspension, and shutdown.    
570
        
571
        # VM resource reservation
572
        res = {}
573
        
574
        for (vnode,pnode) in mapping.items():
575
            vnode_res = requested_resources[vnode]
576
            if res.has_key(pnode):
577
                res[pnode].incr(vnode_res)
578
            else:
579
                res[pnode] = ResourceTuple.copy(vnode_res)
580

    
581
        vmrr = VMResourceReservation(lease, start, end, mapping, res)
582
        vmrr.state = ResourceReservation.STATE_SCHEDULED
583

    
584
        # VM resumption resource reservation
585
        if mustresume:
586
            self.__schedule_resumption(vmrr, start)
587

    
588
        # If the mapper couldn't find a mapping for the full duration
589
        # of the lease, then we need to schedule a suspension.
590
        mustsuspend = (vmrr.end - vmrr.start) < remaining_duration
591
        if mustsuspend:
592
            self.__schedule_suspension(vmrr, end)
593
        else:
594
            # Compensate for any overestimation
595
            if (vmrr.end - vmrr.start) > remaining_duration + shutdown_time:
596
                vmrr.end = vmrr.start + remaining_duration + shutdown_time
597
            self.__schedule_shutdown(vmrr)
598
        
599
        if in_future:
600
            self.future_leases.add(lease)
601
            get_persistence().persist_future_leases(self.future_leases)
602

    
603
        susp_str = res_str = ""
604
        if mustresume:
605
            res_str = " (resuming)"
606
        if mustsuspend:
607
            susp_str = " (suspending)"
608
        self.logger.info("Lease #%i has been scheduled on nodes %s from %s%s to %s%s" % (lease.id, mapping.values(), start, res_str, end, susp_str))
609

    
610
        return vmrr, preemptions
611

    
612

    
613
    def __find_fit_at_points(self, lease, requested_resources, changepoints, duration, min_duration):
614
        """ Tries to map a lease in a given list of points in time
615
        
616
        This method goes through a given list of points in time and tries
617
        to find the earliest time at which that lease can be allocated
618
        resources.
619
        
620
        Arguments:
621
        lease -- Lease to schedule
622
        requested_resources -- A dictionary of lease node -> ResourceTuple.
623
        changepoints -- The list of changepoints
624
        duration -- The amount of time requested
625
        min_duration -- The minimum amount of time that should be allocated
626
        
627
        Returns:
628
        start -- The time at which resources have been found for the lease
629
        actualend -- The time at which the resources won't be available. Note
630
        that this is not necessarily (start + duration) since the mapper
631
        might be unable to find enough resources for the full requested duration.
632
        mapping -- A mapping of lease nodes to physical nodes
633
        preemptions -- A list of 
634
        (if no mapping is found, all these values are set to None)
635
        """                 
636
        found = False
637
        
638
        for time, onlynodes in changepoints:
639
            start = time
640
            end = start + duration
641
            self.logger.debug("Attempting to map from %s to %s" % (start, end))
642
            
643
            # If suspension is disabled, we will only accept mappings that go
644
            # from "start" strictly until "end".
645
            susptype = get_config().get("suspension")
646
            if susptype == constants.SUSPENSION_NONE or (lease.numnodes > 1 and susptype == constants.SUSPENSION_SERIAL):
647
                strictend = True
648
            else:
649
                strictend = False
650

    
651
            # Let the mapper work its magic
652
            mapping, actualend, preemptions = self.mapper.map(lease, 
653
                                                              requested_resources,
654
                                                              start, 
655
                                                              end, 
656
                                                              strictend = strictend,
657
                                                              onlynodes = onlynodes)
658
            
659
            # We have a mapping; we still have to check if it satisfies
660
            # the minimum duration.
661
            if mapping != None:
662
                if actualend < end:
663
                    actualduration = actualend - start
664
                    if actualduration >= min_duration:
665
                        self.logger.debug("This lease can be scheduled from %s to %s (will require suspension)" % (start, actualend))
666
                        found = True
667
                        break
668
                    else:
669
                        self.logger.debug("This starting time does not allow for the requested minimum duration (%s < %s)" % (actualduration, min_duration))
670
                else:
671
                    self.logger.debug("This lease can be scheduled from %s to %s (full duration)" % (start, end))
672
                    found = True
673
                    break
674
        
675
        if found:
676
            return start, actualend, mapping, preemptions
677
        else:
678
            return None, None, None, None
679
    
680
    
681
    def __compute_susprem_times(self, vmrr, time, direction, exclusion, rate, override = None):
682
        """ Computes the times at which suspend/resume operations would have to start
683
        
684
        When suspending or resuming a VM, the VM's memory is dumped to a
685
        file on disk. To correctly estimate the time required to suspend
686
        a lease with multiple VMs, Haizea makes sure that no two 
687
        suspensions/resumptions happen at the same time (e.g., if eight
688
        memory files were being saved at the same time to disk, the disk's
689
        performance would be reduced in a way that is not as easy to estimate
690
        as if only one file were being saved at a time). Based on a number
691
        of parameters, this method estimates the times at which the 
692
        suspend/resume commands would have to be sent to guarantee this
693
        exclusion.
694
                    
695
        Arguments:
696
        vmrr -- The VM reservation that will be suspended/resumed
697
        time -- The time at which the suspend should end or the resume should start.
698
        direction -- DIRECTION_BACKWARD: start at "time" and compute the times going
699
        backward (for suspensions) DIRECTION_FORWARD: start at time "time" and compute
700
        the times going forward.
701
        exclusion -- SUSPRES_EXCLUSION_GLOBAL (memory is saved to global filesystem)
702
        or SUSPRES_EXCLUSION_LOCAL (saved to local filesystem)
703
        rate -- The rate at which an individual VM is suspended/resumed
704
        override -- If specified, then instead of computing the time to 
705
        suspend/resume VM based on its memory and the "rate" parameter,
706
        use this override value.
707
        
708
        """         
709
        times = [] # (start, end, {pnode -> vnodes})
710
        enactment_overhead = get_config().get("enactment-overhead") 
711

    
712
        if exclusion == constants.SUSPRES_EXCLUSION_GLOBAL:
713
            # Global exclusion (which represents, e.g., reading/writing the memory image files
714
            # from a global file system) meaning no two suspensions/resumptions can happen at 
715
            # the same time in the entire resource pool.
716
            
717
            t = time
718
            t_prev = None
719
                
720
            for (vnode,pnode) in vmrr.nodes.items():
721
                if override == None:
722
                    mem = vmrr.lease.requested_resources.get_by_type(constants.RES_MEM)
723
                    op_time = self.__compute_suspend_resume_time(mem, rate)
724
                else:
725
                    op_time = override
726

    
727
                op_time += enactment_overhead
728
                    
729
                t_prev = t
730
                
731
                if direction == constants.DIRECTION_FORWARD:
732
                    t += op_time
733
                    times.append((t_prev, t, {pnode:[vnode]}))
734
                elif direction == constants.DIRECTION_BACKWARD:
735
                    t -= op_time
736
                    times.append((t, t_prev, {pnode:[vnode]}))
737

    
738
        elif exclusion == constants.SUSPRES_EXCLUSION_LOCAL:
739
            # Local exclusion (which represents, e.g., reading the memory image files
740
            # from a local file system) means no two resumptions can happen at the same
741
            # time in the same physical node.
742
            pervnode_times = [] # (start, end, vnode)
743
            vnodes_in_pnode = {}
744
            for (vnode,pnode) in vmrr.nodes.items():
745
                vnodes_in_pnode.setdefault(pnode, []).append(vnode)
746
            for pnode in vnodes_in_pnode:
747
                t = time
748
                t_prev = None
749
                for vnode in vnodes_in_pnode[pnode]:
750
                    if override == None:
751
                        mem = vmrr.lease.requested_resources[vnode].get_quantity(constants.RES_MEM)
752
                        op_time = self.__compute_suspend_resume_time(mem, rate)
753
                    else:
754
                        op_time = override                    
755
                    
756
                    t_prev = t
757
                    
758
                    if direction == constants.DIRECTION_FORWARD:
759
                        t += op_time
760
                        pervnode_times.append((t_prev, t, vnode))
761
                    elif direction == constants.DIRECTION_BACKWARD:
762
                        t -= op_time
763
                        pervnode_times.append((t, t_prev, vnode))
764
            
765
            # Consolidate suspend/resume operations happening at the same time
766
            uniq_times = set([(start, end) for (start, end, vnode) in pervnode_times])
767
            for (start, end) in uniq_times:
768
                vnodes = [x[2] for x in pervnode_times if x[0] == start and x[1] == end]
769
                node_mappings = {}
770
                for vnode in vnodes:
771
                    pnode = vmrr.nodes[vnode]
772
                    node_mappings.setdefault(pnode, []).append(vnode)
773
                times.append([start,end,node_mappings])
774
        
775
            # Add the enactment overhead
776
            for t in times:
777
                num_vnodes = sum([len(vnodes) for vnodes in t[2].values()])
778
                overhead = TimeDelta(seconds = num_vnodes * enactment_overhead)
779
                if direction == constants.DIRECTION_FORWARD:
780
                    t[1] += overhead
781
                elif direction == constants.DIRECTION_BACKWARD:
782
                    t[0] -= overhead
783
                    
784
            # Fix overlaps
785
            if direction == constants.DIRECTION_FORWARD:
786
                times.sort(key=itemgetter(0))
787
            elif direction == constants.DIRECTION_BACKWARD:
788
                times.sort(key=itemgetter(1))
789
                times.reverse()
790
                
791
            prev_start = None
792
            prev_end = None
793
            for t in times:
794
                if prev_start != None:
795
                    start = t[0]
796
                    end = t[1]
797
                    if direction == constants.DIRECTION_FORWARD:
798
                        if start < prev_end:
799
                            diff = prev_end - start
800
                            t[0] += diff
801
                            t[1] += diff
802
                    elif direction == constants.DIRECTION_BACKWARD:
803
                        if end > prev_start:
804
                            diff = end - prev_start
805
                            t[0] -= diff
806
                            t[1] -= diff
807
                prev_start = t[0]
808
                prev_end = t[1]
809
        
810
        return times
811
    
812
    
813
    def __schedule_shutdown(self, vmrr):
814
        """ Schedules the shutdown of a VM reservation
815
                            
816
        Arguments:
817
        vmrr -- The VM reservation that will be shutdown
818
        
819
        """                 
820
        shutdown_time = self.__estimate_shutdown_time(vmrr.lease)
821

    
822
        start = vmrr.end - shutdown_time
823
        end = vmrr.end
824
        
825
        shutdown_rr = ShutdownResourceReservation(vmrr.lease, start, end, vmrr.resources_in_pnode, vmrr.nodes, vmrr)
826
        shutdown_rr.state = ResourceReservation.STATE_SCHEDULED
827
                
828
        vmrr.update_end(start)
829
        
830
        # If there are any post RRs, remove them
831
        for rr in vmrr.post_rrs:
832
            self.slottable.remove_reservation(rr)
833
        vmrr.post_rrs = []
834

    
835
        vmrr.post_rrs.append(shutdown_rr)
836

    
837

    
838
    def __schedule_suspension(self, vmrr, suspend_by):
839
        """ Schedules the suspension of a VM reservation
840
                         
841
        Most of the work is done in __compute_susprem_times. See that
842
        method's documentation for more details.
843
                            
844
        Arguments:
845
        vmrr -- The VM reservation that will be suspended
846
        suspend_by -- The time by which the VMs should be suspended.
847
        
848
        """            
849
        config = get_config()
850
        susp_exclusion = config.get("suspendresume-exclusion")
851
        override = get_config().get("override-suspend-time")
852
        rate = config.get("suspend-rate") 
853

    
854
        if suspend_by < vmrr.start or suspend_by > vmrr.end:
855
            raise InconsistentScheduleError, "Tried to schedule a suspension by %s, which is outside the VMRR's duration (%s-%s)" % (suspend_by, vmrr.start, vmrr.end)
856

    
857
        # Find the suspension times
858
        times = self.__compute_susprem_times(vmrr, suspend_by, constants.DIRECTION_BACKWARD, susp_exclusion, rate, override)
859
        
860
        # Create the suspension resource reservations
861
        suspend_rrs = []
862
        for (start, end, node_mappings) in times:
863
            suspres = {}
864
            all_vnodes = []
865
            for (pnode,vnodes) in node_mappings.items():
866
                num_vnodes = len(vnodes)
867
                r = Capacity([constants.RES_MEM,constants.RES_DISK])
868
                mem = 0
869
                for vnode in vnodes:
870
                    mem += vmrr.lease.requested_resources[vnode].get_quantity(constants.RES_MEM)
871
                r.set_quantity(constants.RES_MEM, mem * num_vnodes)
872
                r.set_quantity(constants.RES_DISK, mem * num_vnodes)
873
                suspres[pnode] = self.slottable.create_resource_tuple_from_capacity(r)          
874
                all_vnodes += vnodes     
875
                             
876
            susprr = SuspensionResourceReservation(vmrr.lease, start, end, suspres, all_vnodes, vmrr)
877
            susprr.state = ResourceReservation.STATE_SCHEDULED
878
            suspend_rrs.append(susprr)
879
                
880
        suspend_rrs.sort(key=attrgetter("start"))
881
            
882
        susp_start = suspend_rrs[0].start
883
        if susp_start < vmrr.start:
884
            raise InconsistentScheduleError, "Determined suspension should start at %s, before the VMRR's start (%s) -- Suspend time not being properly estimated?" % (susp_start, vmrr.start)
885
        
886
        vmrr.update_end(susp_start)
887
        
888
        # If there are any post RRs, remove them
889
        for rr in vmrr.post_rrs:
890
            self.slottable.remove_reservation(rr)
891
        vmrr.post_rrs = []
892

    
893
        # Add the suspension RRs to the VM RR
894
        for susprr in suspend_rrs:
895
            vmrr.post_rrs.append(susprr)       
896
            
897
            
898
    def __schedule_resumption(self, vmrr, resume_at):
899
        """ Schedules the resumption of a VM reservation
900
                         
901
        Most of the work is done in __compute_susprem_times. See that
902
        method's documentation for more details.
903
                            
904
        Arguments:
905
        vmrr -- The VM reservation that will be resumed
906
        resume_at -- The time at which the resumption should start
907
        
908
        """                 
909
        config = get_config()
910
        resm_exclusion = config.get("suspendresume-exclusion")        
911
        override = get_config().get("override-resume-time")
912
        rate = config.get("resume-rate") 
913

    
914
        if resume_at < vmrr.start or resume_at > vmrr.end:
915
            raise InconsistentScheduleError, "Tried to schedule a resumption at %s, which is outside the VMRR's duration (%s-%s)" % (resume_at, vmrr.start, vmrr.end)
916

    
917
        # Find the resumption times
918
        times = self.__compute_susprem_times(vmrr, resume_at, constants.DIRECTION_FORWARD, resm_exclusion, rate, override)
919
        
920
        # Create the resumption resource reservations
921
        resume_rrs = []
922
        for (start, end, node_mappings) in times:
923
            resmres = {}
924
            all_vnodes = []
925
            for (pnode,vnodes) in node_mappings.items():
926
                num_vnodes = len(vnodes)
927
                r = Capacity([constants.RES_MEM,constants.RES_DISK])
928
                mem = 0
929
                for vnode in vnodes:
930
                    mem += vmrr.lease.requested_resources[vnode].get_quantity(constants.RES_MEM)
931
                r.set_quantity(constants.RES_MEM, mem * num_vnodes)
932
                r.set_quantity(constants.RES_DISK, mem * num_vnodes)
933
                resmres[pnode] = self.slottable.create_resource_tuple_from_capacity(r)
934
                all_vnodes += vnodes
935
            resmrr = ResumptionResourceReservation(vmrr.lease, start, end, resmres, all_vnodes, vmrr)
936
            resmrr.state = ResourceReservation.STATE_SCHEDULED
937
            resume_rrs.append(resmrr)
938
                
939
        resume_rrs.sort(key=attrgetter("start"))
940
            
941
        resm_end = resume_rrs[-1].end
942
        if resm_end > vmrr.end:
943
            raise InconsistentScheduleError, "Determined resumption would end at %s, after the VMRR's end (%s) -- Resume time not being properly estimated?" % (resm_end, vmrr.end)
944
        
945
        vmrr.update_start(resm_end)
946
        
947
        # Add the resumption RRs to the VM RR
948
        for resmrr in resume_rrs:
949
            vmrr.pre_rrs.append(resmrr)        
950
           
951
           
952
    def __compute_suspend_resume_time(self, mem, rate):
953
        """ Compute the time to suspend/resume a single VM
954
                            
955
        Arguments:
956
        mem -- Amount of memory used by the VM
957
        rate -- The rate at which an individual VM is suspended/resumed
958
        
959
        """            
960
        time = float(mem) / rate
961
        time = round_datetime_delta(TimeDelta(seconds = time))
962
        return time
963
    
964
    
965
    def __estimate_suspend_time(self, lease):
966
        """ Estimate the time to suspend an entire lease
967
                            
968
        Most of the work is done in __estimate_suspend_resume_time. See
969
        that method's documentation for more details.
970
        
971
        Arguments:
972
        lease -- Lease that is going to be suspended
973
        
974
        """               
975
        rate = get_config().get("suspend-rate")
976
        override = get_config().get("override-suspend-time")
977
        if override != None:
978
            return override
979
        else:
980
            return self.__estimate_suspend_resume_time(lease, rate)
981

    
982

    
983
    def __estimate_resume_time(self, lease):
984
        """ Estimate the time to resume an entire lease
985
                            
986
        Most of the work is done in __estimate_suspend_resume_time. See
987
        that method's documentation for more details.
988
        
989
        Arguments:
990
        lease -- Lease that is going to be resumed
991
        
992
        """           
993
        rate = get_config().get("resume-rate") 
994
        override = get_config().get("override-resume-time")
995
        if override != None:
996
            return override
997
        else:
998
            return self.__estimate_suspend_resume_time(lease, rate)    
999
    
1000
    
1001
    def __estimate_suspend_resume_time(self, lease, rate):
1002
        """ Estimate the time to suspend/resume an entire lease
1003
                            
1004
        Note that, unlike __compute_suspend_resume_time, this estimates
1005
        the time to suspend/resume an entire lease (which may involve
1006
        suspending several VMs)
1007
        
1008
        Arguments:
1009
        lease -- Lease that is going to be suspended/resumed
1010
        rate -- The rate at which an individual VM is suspended/resumed
1011
        
1012
        """              
1013
        susp_exclusion = get_config().get("suspendresume-exclusion")        
1014
        enactment_overhead = get_config().get("enactment-overhead") 
1015
        mem = 0
1016
        for vnode in lease.requested_resources:
1017
            mem += lease.requested_resources[vnode].get_quantity(constants.RES_MEM)
1018
        if susp_exclusion == constants.SUSPRES_EXCLUSION_GLOBAL:
1019
            return lease.numnodes * (self.__compute_suspend_resume_time(mem, rate) + enactment_overhead)
1020
        elif susp_exclusion == constants.SUSPRES_EXCLUSION_LOCAL:
1021
            # Overestimating
1022
            return lease.numnodes * (self.__compute_suspend_resume_time(mem, rate) + enactment_overhead)
1023

    
1024

    
1025
    def __estimate_shutdown_time(self, lease):
1026
        """ Estimate the time to shutdown an entire lease
1027
                            
1028
        Arguments:
1029
        lease -- Lease that is going to be shutdown
1030
        
1031
        """            
1032
        enactment_overhead = get_config().get("enactment-overhead").seconds
1033
        return get_config().get("shutdown-time") + (enactment_overhead * lease.numnodes)
1034

    
1035

    
1036
    def __compute_scheduling_threshold(self, lease):
1037
        """ Compute the scheduling threshold (the 'minimum duration') of a lease
1038
        
1039
        To avoid thrashing, Haizea will not schedule a lease unless all overheads
1040
        can be correctly scheduled (which includes image transfers, suspensions, etc.).
1041
        However, this can still result in situations where a lease is prepared,
1042
        and then immediately suspended because of a blocking lease in the future.
1043
        The scheduling threshold is used to specify that a lease must
1044
        not be scheduled unless it is guaranteed to run for a minimum amount of
1045
        time (the rationale behind this is that you ideally don't want leases
1046
        to be scheduled if they're not going to be active for at least as much time
1047
        as was spent in overheads).
1048
        
1049
        An important part of computing this value is the "scheduling threshold factor".
1050
        The default value is 1, meaning that the lease will be active for at least
1051
        as much time T as was spent on overheads (e.g., if preparing the lease requires
1052
        60 seconds, and we know that it will have to be suspended, requiring 30 seconds,
1053
        Haizea won't schedule the lease unless it can run for at least 90 minutes).
1054
        In other words, a scheduling factor of F required a minimum duration of 
1055
        F*T. A value of 0 could lead to thrashing, since Haizea could end up with
1056
        situations where a lease starts and immediately gets suspended.         
1057
        
1058
        Arguments:
1059
        lease -- Lease for which we want to find the scheduling threshold
1060
        """
1061
        # TODO: Take into account other things like boot overhead, migration overhead, etc.
1062
        config = get_config()
1063
        threshold = config.get("force-scheduling-threshold")
1064
        if threshold != None:
1065
            # If there is a hard-coded threshold, use that
1066
            return threshold
1067
        else:
1068
            factor = config.get("scheduling-threshold-factor")
1069
            
1070
            # First, figure out the "safe duration" (the minimum duration
1071
            # so that we at least allocate enough time for all the
1072
            # overheads).
1073
            susp_overhead = self.__estimate_suspend_time(lease)
1074
            safe_duration = susp_overhead
1075
            
1076
            if lease.get_state() == Lease.STATE_SUSPENDED_QUEUED:
1077
                resm_overhead = self.__estimate_resume_time(lease)
1078
                safe_duration += resm_overhead
1079
            
1080
            # TODO: Incorporate other overheads into the minimum duration
1081
            min_duration = safe_duration
1082
            
1083
            # At the very least, we want to allocate enough time for the
1084
            # safe duration (otherwise, we'll end up with incorrect schedules,
1085
            # where a lease is scheduled to suspend, but isn't even allocated
1086
            # enough time to suspend). 
1087
            # The factor is assumed to be non-negative. i.e., a factor of 0
1088
            # means we only allocate enough time for potential suspend/resume
1089
            # operations, while a factor of 1 means the lease will get as much
1090
            # running time as spend on the runtime overheads involved in setting
1091
            # it up
1092
            threshold = safe_duration + (min_duration * factor)
1093
            return threshold
1094

    
1095

    
1096
    #-------------------------------------------------------------------#
1097
    #                                                                   #
1098
    #                  SLOT TABLE EVENT HANDLERS                        #
1099
    #                                                                   #
1100
    #-------------------------------------------------------------------#
1101

    
1102
    def _handle_start_vm(self, l, rr):
1103
        """ Handles the start of a VMResourceReservation       
1104
        
1105
        Arguments:
1106
        l -- Lease the VMResourceReservation belongs to
1107
        rr -- THe VMResourceReservation
1108
        """        
1109
        self.logger.debug("LEASE-%i Start of handleStartVM" % l.id)
1110
        l.print_contents()
1111
        lease_state = l.get_state()
1112
        if lease_state == Lease.STATE_READY:
1113
            l.set_state(Lease.STATE_ACTIVE)
1114
            rr.state = ResourceReservation.STATE_ACTIVE
1115
            now_time = get_clock().get_time()
1116
            l.start.actual = now_time
1117
            
1118
            try:
1119
                self.resourcepool.start_vms(l, rr)
1120
            except EnactmentError, exc:
1121
                self.logger.error("Enactment error when starting VMs.")
1122
                # Right now, this is a non-recoverable error, so we just
1123
                # propagate it upwards to the lease scheduler
1124
                # In the future, it may be possible to react to these
1125
                # kind of errors.
1126
                raise
1127
                
1128
        elif lease_state == Lease.STATE_RESUMED_READY:
1129
            l.set_state(Lease.STATE_ACTIVE)
1130
            rr.state = ResourceReservation.STATE_ACTIVE
1131
            # No enactment to do here, since all the suspend/resume actions are
1132
            # handled during the suspend/resume RRs
1133
        else:
1134
            raise InconsistentLeaseStateError(l, doing = "starting a VM")
1135
        
1136
        # If this was a future reservation (as determined by backfilling),
1137
        # remove that status, since the future is now.
1138
        if rr.lease in self.future_leases:
1139
            self.future_leases.remove(l)
1140
            get_persistence().persist_future_leases(self.future_leases)
1141
        
1142
        l.print_contents()
1143
        self.logger.debug("LEASE-%i End of handleStartVM" % l.id)
1144
        self.logger.info("Started VMs for lease %i on nodes %s" % (l.id, rr.nodes.values()))
1145

    
1146

    
1147
    def _handle_end_vm(self, l, rr):
1148
        """ Handles the end of a VMResourceReservation       
1149
        
1150
        Arguments:
1151
        l -- Lease the VMResourceReservation belongs to
1152
        rr -- THe VMResourceReservation
1153
        """        
1154
        self.logger.debug("LEASE-%i Start of handleEndVM" % l.id)
1155
        self.logger.vdebug("LEASE-%i Before:" % l.id)
1156
        l.print_contents()
1157
        now_time = round_datetime(get_clock().get_time())
1158
        diff = now_time - rr.start
1159
        l.duration.accumulate_duration(diff)
1160
        rr.state = ResourceReservation.STATE_DONE
1161
                
1162
        self.logger.vdebug("LEASE-%i After:" % l.id)
1163
        l.print_contents()
1164
        self.logger.debug("LEASE-%i End of handleEndVM" % l.id)
1165
        self.logger.info("Stopped VMs for lease %i on nodes %s" % (l.id, rr.nodes.values()))
1166

    
1167

    
1168
    def _handle_unscheduled_end_vm(self, l, vmrr):
1169
        """ Handles the unexpected end of a VMResourceReservation
1170
        
1171
        Arguments:
1172
        l -- Lease the VMResourceReservation belongs to
1173
        rr -- THe VMResourceReservation
1174
        """        
1175
        
1176
        self.logger.info("LEASE-%i The VM has ended prematurely." % l.id)
1177
        for rr in vmrr.post_rrs:
1178
            self.slottable.remove_reservation(rr)
1179
        vmrr.post_rrs = []
1180
        vmrr.end = get_clock().get_time()
1181
        self._handle_end_vm(l, vmrr)
1182

    
1183

    
1184
    def _handle_start_suspend(self, l, rr):
1185
        """ Handles the start of a SuspensionResourceReservation       
1186
        
1187
        Arguments:
1188
        l -- Lease the SuspensionResourceReservation belongs to
1189
        rr -- The SuspensionResourceReservation
1190
        
1191
        """
1192
        self.logger.debug("LEASE-%i Start of handleStartSuspend" % l.id)
1193
        l.print_contents()
1194
        rr.state = ResourceReservation.STATE_ACTIVE
1195
        
1196
        try:
1197
            self.resourcepool.suspend_vms(l, rr)
1198
        except EnactmentError, exc:
1199
            self.logger.error("Enactment error when suspending VMs.")
1200
            # Right now, this is a non-recoverable error, so we just
1201
            # propagate it upwards to the lease scheduler
1202
            # In the future, it may be possible to react to these
1203
            # kind of errors.
1204
            raise            
1205

    
1206
        if rr.is_first():
1207
            l.set_state(Lease.STATE_SUSPENDING)
1208
            l.print_contents()
1209
            self.logger.info("Suspending lease %i..." % (l.id))
1210
        self.logger.debug("LEASE-%i End of handleStartSuspend" % l.id)
1211

    
1212

    
1213
    def _handle_end_suspend(self, l, rr):
1214
        """ Handles the end of a SuspensionResourceReservation       
1215
        
1216
        Arguments:
1217
        l -- Lease the SuspensionResourceReservation belongs to
1218
        rr -- The SuspensionResourceReservation
1219
        """               
1220
        self.logger.debug("LEASE-%i Start of handleEndSuspend" % l.id)
1221
        l.print_contents()
1222
        # TODO: React to incomplete suspend
1223
        self.resourcepool.verify_suspend(l, rr)
1224
        rr.state = ResourceReservation.STATE_DONE
1225
        if rr.is_last():
1226
            l.set_state(Lease.STATE_SUSPENDED_PENDING)
1227
        l.print_contents()
1228
        self.logger.debug("LEASE-%i End of handleEndSuspend" % l.id)
1229
        self.logger.info("Lease %i suspended." % (l.id))
1230
        
1231
        if l.get_state() == Lease.STATE_SUSPENDED_PENDING:
1232
            raise RescheduleLeaseException
1233

    
1234

    
1235
    def _handle_start_resume(self, l, rr):
1236
        """ Handles the start of a ResumptionResourceReservation       
1237
        
1238
        Arguments:
1239
        l -- Lease the ResumptionResourceReservation belongs to
1240
        rr -- The ResumptionResourceReservation
1241
        
1242
        """             
1243
        self.logger.debug("LEASE-%i Start of handleStartResume" % l.id)
1244
        l.print_contents()
1245
        
1246
        try:
1247
            self.resourcepool.resume_vms(l, rr)
1248
        except EnactmentError, exc:
1249
            self.logger.error("Enactment error when resuming VMs.")
1250
            # Right now, this is a non-recoverable error, so we just
1251
            # propagate it upwards to the lease scheduler
1252
            # In the future, it may be possible to react to these
1253
            # kind of errors.
1254
            raise
1255
                    
1256
        rr.state = ResourceReservation.STATE_ACTIVE
1257
        if rr.is_first():
1258
            l.set_state(Lease.STATE_RESUMING)
1259
            l.print_contents()
1260
            self.logger.info("Resuming lease %i..." % (l.id))
1261
        self.logger.debug("LEASE-%i End of handleStartResume" % l.id)
1262

    
1263

    
1264
    def _handle_end_resume(self, l, rr):
1265
        """ Handles the end of a ResumptionResourceReservation       
1266
        
1267
        Arguments:
1268
        l -- Lease the ResumptionResourceReservation belongs to
1269
        rr -- The ResumptionResourceReservation
1270
        
1271
        """        
1272
        self.logger.debug("LEASE-%i Start of handleEndResume" % l.id)
1273
        l.print_contents()
1274
        # TODO: React to incomplete resume
1275
        self.resourcepool.verify_resume(l, rr)
1276
        rr.state = ResourceReservation.STATE_DONE
1277
        if rr.is_last():
1278
            l.set_state(Lease.STATE_RESUMED_READY)
1279
            self.logger.info("Resumed lease %i" % (l.id))
1280
        for vnode, pnode in rr.vmrr.nodes.items():
1281
            self.resourcepool.remove_ramfile(pnode, l.id, vnode)
1282
        l.print_contents()
1283
        self.logger.debug("LEASE-%i End of handleEndResume" % l.id)
1284

    
1285

    
1286
    def _handle_start_shutdown(self, l, rr):
1287
        """ Handles the start of a ShutdownResourceReservation       
1288
        
1289
        Arguments:
1290
        l -- Lease the SuspensionResourceReservation belongs to
1291
        rr -- The SuspensionResourceReservation
1292
        """        
1293
        
1294
        self.logger.debug("LEASE-%i Start of handleStartShutdown" % l.id)
1295
        l.print_contents()
1296
        rr.state = ResourceReservation.STATE_ACTIVE
1297
        try:
1298
            self.resourcepool.stop_vms(l, rr)
1299
        except EnactmentError, exc:
1300
            self.logger.error("Enactment error when shutting down VMs.")
1301
            # Right now, this is a non-recoverable error, so we just
1302
            # propagate it upwards to the lease scheduler
1303
            # In the future, it may be possible to react to these
1304
            # kind of errors.
1305
            raise
1306
        
1307
        l.print_contents()
1308
        self.logger.debug("LEASE-%i End of handleStartShutdown" % l.id)
1309

    
1310

    
1311
    def _handle_end_shutdown(self, l, rr):
1312
        """ Handles the end of a SuspensionResourceReservation       
1313
        
1314
        Arguments:
1315
        l -- Lease the SuspensionResourceReservation belongs to
1316
        rr -- The SuspensionResourceReservation
1317
        
1318
        """
1319
        self.logger.debug("LEASE-%i Start of handleEndShutdown" % l.id)
1320
        l.print_contents()
1321
        rr.state = ResourceReservation.STATE_DONE
1322
        l.print_contents()
1323
        self.logger.debug("LEASE-%i End of handleEndShutdown" % l.id)
1324
        self.logger.info("Lease %i's VMs have shutdown." % (l.id))
1325
        raise NormalEndLeaseException
1326
    
1327

    
1328
    def _handle_start_migrate(self, l, rr):
1329
        """ Handles the start of a MemImageMigrationResourceReservation       
1330
        
1331
        Arguments:
1332
        l -- Lease the MemImageMigrationResourceReservation belongs to
1333
        rr -- The MemImageMigrationResourceReservation
1334
        
1335
        """             
1336
        self.logger.debug("LEASE-%i Start of handleStartMigrate" % l.id)
1337
        l.print_contents()
1338
        rr.state = ResourceReservation.STATE_ACTIVE
1339
        l.print_contents()
1340
        self.logger.debug("LEASE-%i End of handleStartMigrate" % l.id)
1341
        self.logger.info("Migrating lease %i..." % (l.id))
1342

    
1343

    
1344
    def _handle_end_migrate(self, l, rr):
1345
        """ Handles the end of a MemImageMigrationResourceReservation       
1346
        
1347
        Arguments:
1348
        l -- Lease the MemImageMigrationResourceReservation belongs to
1349
        rr -- The MemImageMigrationResourceReservation
1350
        
1351
        """                
1352
        self.logger.debug("LEASE-%i Start of handleEndMigrate" % l.id)
1353
        l.print_contents()
1354

    
1355
        for vnode in rr.transfers:
1356
            origin = rr.transfers[vnode][0]
1357
            dest = rr.transfers[vnode][1]
1358
            
1359
            # Update RAM files
1360
            self.resourcepool.remove_ramfile(origin, l.id, vnode)
1361
            self.resourcepool.add_ramfile(dest, l.id, vnode, l.requested_resources[vnode].get_quantity(constants.RES_MEM))
1362
        
1363
        rr.state = ResourceReservation.STATE_DONE
1364
        l.print_contents()
1365
        self.logger.debug("LEASE-%i End of handleEndMigrate" % l.id)
1366
        self.logger.info("Migrated lease %i..." % (l.id))
1367

    
1368

    
1369

    
1370
class VMResourceReservation(ResourceReservation):
1371
    def __init__(self, lease, start, end, nodes, res):
1372
        ResourceReservation.__init__(self, lease, start, end, res)
1373
        self.nodes = nodes # { vnode -> pnode }
1374
        self.pre_rrs = []
1375
        self.post_rrs = []
1376

    
1377
        # ONLY for simulation
1378
        self.__update_prematureend()
1379

    
1380
    def update_start(self, time):
1381
        self.start = time
1382
        # ONLY for simulation
1383
        self.__update_prematureend()
1384

    
1385
    def update_end(self, time):
1386
        self.end = time
1387
        # ONLY for simulation
1388
        self.__update_prematureend()
1389
        
1390
    # ONLY for simulation
1391
    def __update_prematureend(self):
1392
        if self.lease.duration.known != None:
1393
            remdur = self.lease.duration.get_remaining_known_duration()
1394
            rrdur = self.end - self.start
1395
            if remdur < rrdur:
1396
                self.prematureend = self.start + remdur
1397
                # Kludgy, but this corner case actually does happen
1398
                # (because of preemptions, it may turn out that
1399
                # the premature end time coincides with the
1400
                # starting time of the VMRR)
1401
                if self.prematureend == self.start:
1402
                    self.prematureend += 1 
1403
            else:
1404
                self.prematureend = None
1405
        else:
1406
            self.prematureend = None 
1407

    
1408
    def get_final_end(self):
1409
        if len(self.post_rrs) == 0:
1410
            return self.end
1411
        else:
1412
            return self.post_rrs[-1].end
1413

    
1414
    def is_suspending(self):
1415
        return len(self.post_rrs) > 0 and isinstance(self.post_rrs[0], SuspensionResourceReservation)
1416

    
1417
    def is_shutting_down(self):
1418
        return len(self.post_rrs) > 0 and isinstance(self.post_rrs[0], ShutdownResourceReservation)
1419

    
1420
    def print_contents(self, loglevel=constants.LOGLEVEL_VDEBUG):
1421
        logger = logging.getLogger("LEASES")
1422
        for resmrr in self.pre_rrs:
1423
            resmrr.print_contents(loglevel)
1424
            logger.log(loglevel, "--")
1425
        logger.log(loglevel, "Type           : VM")
1426
        logger.log(loglevel, "Nodes          : %s" % pretty_nodemap(self.nodes))
1427
        if self.prematureend != None:
1428
            logger.log(loglevel, "Premature end  : %s" % self.prematureend)
1429
        ResourceReservation.print_contents(self, loglevel)
1430
        for susprr in self.post_rrs:
1431
            logger.log(loglevel, "--")
1432
            susprr.print_contents(loglevel)
1433

    
1434
        
1435
class SuspensionResourceReservation(ResourceReservation):
1436
    def __init__(self, lease, start, end, res, vnodes, vmrr):
1437
        ResourceReservation.__init__(self, lease, start, end, res)
1438
        self.vmrr = vmrr
1439
        self.vnodes = vnodes
1440

    
1441
    def print_contents(self, loglevel=constants.LOGLEVEL_VDEBUG):
1442
        logger = logging.getLogger("LEASES")
1443
        logger.log(loglevel, "Type           : SUSPEND")
1444
        logger.log(loglevel, "Vnodes         : %s" % self.vnodes)
1445
        ResourceReservation.print_contents(self, loglevel)
1446
        
1447
    def is_first(self):
1448
        return (self == self.vmrr.post_rrs[0])
1449

    
1450
    def is_last(self):
1451
        return (self == self.vmrr.post_rrs[-1])   
1452
        
1453
        
1454
class ResumptionResourceReservation(ResourceReservation):
1455
    def __init__(self, lease, start, end, res, vnodes, vmrr):
1456
        ResourceReservation.__init__(self, lease, start, end, res)
1457
        self.vmrr = vmrr
1458
        self.vnodes = vnodes
1459

    
1460
    def print_contents(self, loglevel=constants.LOGLEVEL_VDEBUG):
1461
        logger = logging.getLogger("LEASES")
1462
        logger.log(loglevel, "Type           : RESUME")
1463
        logger.log(loglevel, "Vnodes         : %s" % self.vnodes)
1464
        ResourceReservation.print_contents(self, loglevel)
1465

    
1466
    def is_first(self):
1467
        resm_rrs = [r for r in self.vmrr.pre_rrs if isinstance(r, ResumptionResourceReservation)]
1468
        return (self == resm_rrs[0])
1469

    
1470
    def is_last(self):
1471
        resm_rrs = [r for r in self.vmrr.pre_rrs if isinstance(r, ResumptionResourceReservation)]
1472
        return (self == resm_rrs[-1])
1473
    
1474
    
1475
class ShutdownResourceReservation(ResourceReservation):
1476
    def __init__(self, lease, start, end, res, vnodes, vmrr):
1477
        ResourceReservation.__init__(self, lease, start, end, res)
1478
        self.vmrr = vmrr
1479
        self.vnodes = vnodes
1480

    
1481
    def print_contents(self, loglevel=constants.LOGLEVEL_VDEBUG):
1482
        logger = logging.getLogger("LEASES")
1483
        logger.log(loglevel, "Type           : SHUTDOWN")
1484
        ResourceReservation.print_contents(self, loglevel)
1485

    
1486

    
1487
class MemImageMigrationResourceReservation(MigrationResourceReservation):
1488
    def __init__(self, lease, start, end, res, vmrr, transfers):
1489
        MigrationResourceReservation.__init__(self, lease, start, end, res, vmrr, transfers)
1490
  
1491
    def print_contents(self, loglevel=constants.LOGLEVEL_VDEBUG):
1492
        logger = logging.getLogger("LEASES")
1493
        logger.log(loglevel, "Type           : MEM IMAGE MIGRATION")
1494
        logger.log(loglevel, "Transfers      : %s" % self.transfers)
1495
        ResourceReservation.print_contents(self, loglevel)