Project

General

Profile

root / trunk / src / haizea / core / scheduler / lease_scheduler.py @ 686

1
# -------------------------------------------------------------------------- #
2
# Copyright 2006-2009, University of Chicago                                 #
3
# Copyright 2008-2009, Distributed Systems Architecture Group, Universidad   #
4
# Complutense de Madrid (dsa-research.org)                                   #
5
#                                                                            #
6
# Licensed under the Apache License, Version 2.0 (the "License"); you may    #
7
# not use this file except in compliance with the License. You may obtain    #
8
# a copy of the License at                                                   #
9
#                                                                            #
10
# http://www.apache.org/licenses/LICENSE-2.0                                 #
11
#                                                                            #
12
# Unless required by applicable law or agreed to in writing, software        #
13
# distributed under the License is distributed on an "AS IS" BASIS,          #
14
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.   #
15
# See the License for the specific language governing permissions and        #
16
# limitations under the License.                                             #
17
# -------------------------------------------------------------------------- #
18

    
19

    
20
"""This module provides the main classes for Haizea's lease scheduler, particularly
21
the LeaseScheduler class. This module does *not* contain VM scheduling code (i.e.,
22
the code that decides what physical hosts a VM should be mapped to), which is
23
located in the vm_scheduler module. Lease preparation code (e.g., image transfer 
24
scheduling) is located in the preparation_schedulers package. In fact, the
25
main purpose of the lease schedule is to orchestrate these preparation and VM
26
schedulers.
27

28
This module also includes a Queue class and a LeaseTable class, which are used
29
by the lease scheduler.
30
"""
31

    
32
import haizea.common.constants as constants
33
from haizea.common.utils import round_datetime, get_config, get_clock, get_policy, get_persistence
34
from haizea.core.leases import Lease
35
from haizea.core.scheduler import RescheduleLeaseException, NormalEndLeaseException, InconsistentLeaseStateError, EnactmentError, UnrecoverableError, NotSchedulableException, EarliestStartingTime
36
from haizea.core.scheduler.slottable import ResourceReservation
37
from operator import attrgetter
38

    
39
import logging
40

    
41
class LeaseScheduler(object):
42
    """The Haizea Lease Scheduler
43
    
44
    This is the main scheduling class in Haizea. It handles lease scheduling which,
45
    in turn, involves VM scheduling, preparation scheduling (such as transferring
46
    a VM image), and numerous bookkeeping operations. All these operations are
47
    handled by other classes, so this class acts mostly as an orchestrator that
48
    coordinates all the different operations involved in scheduling a lease.    
49
    """
50
    
51
    def __init__(self, vm_scheduler, preparation_scheduler, slottable, accounting):
52
        """Constructor
53
        
54
        The constructor does little more than create the lease scheduler's
55
        attributes. However, it does expect (in the arguments) a fully-constructed 
56
        VMScheduler, PreparationScheduler, SlotTable, and PolicyManager (these are 
57
        constructed in the Manager's constructor). 
58
        
59
        Arguments:
60
        vm_scheduler -- VM scheduler
61
        preparation_scheduler -- Preparation scheduler
62
        slottable -- Slottable
63
        accounting -- AccountingDataCollection object
64
        """
65
        
66
        # Logger
67
        self.logger = logging.getLogger("LSCHED")
68
        
69
        # Assign schedulers and slottable
70
        self.vm_scheduler = vm_scheduler
71
        """
72
        VM Scheduler
73
        @type: VMScheduler
74
        """
75
        self.preparation_scheduler = preparation_scheduler
76
        self.slottable = slottable
77
        self.accounting = accounting
78

    
79
        # Create other data structures
80
        self.queue = Queue()
81
        self.leases = LeaseTable()
82
        self.completed_leases = LeaseTable()
83

    
84
        # Handlers are callback functions that get called whenever a type of
85
        # resource reservation starts or ends. Each scheduler publishes the
86
        # handlers it supports through its "handlers" attributes. For example,
87
        # the VMScheduler provides _handle_start_vm and _handle_end_vm that
88
        # must be called when a VMResourceReservation start or end is encountered
89
        # in the slot table.
90
        #
91
        # Handlers are called from the process_reservations method of this class
92
        self.handlers = {}
93
        for (type, handler) in self.vm_scheduler.handlers.items():
94
            self.handlers[type] = handler
95

    
96
        for (type, handler) in self.preparation_scheduler.handlers.items():
97
            self.handlers[type] = handler
98

    
99

    
100
    def request_lease(self, lease):
101
        """Requests a leases. This is the entry point of leases into the scheduler.
102
        
103
        Request a lease. The decision on whether to accept or reject a
104
        lease is deferred to the policy manager (through its admission
105
        control policy). 
106
        
107
        If the policy determines the lease can be
108
        accepted, it is marked as "Pending". This still doesn't
109
        guarantee that the lease will be scheduled (e.g., an AR lease
110
        could still be rejected if the scheduler determines there are no
111
        resources for it; but that is a *scheduling* decision, not a admission
112
        control policy decision). The ultimate fate of the lease is determined
113
        the next time the scheduling function is called.
114
        
115
        If the policy determines the lease cannot be accepted, it is marked
116
        as rejected.
117

118
        Arguments:
119
        lease -- Lease object. Its state must be STATE_NEW.
120
        """
121
        self.logger.info("Lease #%i has been requested." % lease.id)
122
        if lease.submit_time == None:
123
            lease.submit_time = round_datetime(get_clock().get_time())
124
        lease.print_contents()
125
        lease.set_state(Lease.STATE_PENDING)
126
        if get_policy().accept_lease(lease):
127
            self.logger.info("Lease #%i has been marked as pending." % lease.id)
128
            self.leases.add(lease)
129
        else:
130
            self.logger.info("Lease #%i has not been accepted" % lease.id)
131
            lease.set_state(Lease.STATE_REJECTED)
132
            self.completed_leases.add(lease)
133
        
134
        self.accounting.at_lease_request(lease)
135
        get_persistence().persist_lease(lease)
136
        
137
    def schedule(self, nexttime):
138
        """ The main scheduling function
139
        
140
        The scheduling function looks at all pending requests and schedules them.
141
        Note that most of the actual scheduling code is contained in the
142
        __schedule_lease method and in the VMScheduler and PreparationScheduler classes.
143
        
144
        Arguments:
145
        nexttime -- The next time at which the scheduler can allocate resources.
146
        """
147
        
148
        # Get pending leases
149
        pending_leases = self.leases.get_leases_by_state(Lease.STATE_PENDING)  
150
        ar_leases = [req for req in pending_leases if req.get_type() == Lease.ADVANCE_RESERVATION]
151
        im_leases = [req for req in pending_leases if req.get_type() == Lease.IMMEDIATE]
152
        be_leases = [req for req in pending_leases if req.get_type() == Lease.BEST_EFFORT]
153
        
154
        # Queue best-effort leases
155
        for lease in be_leases:
156
            self.__enqueue(lease)
157
            lease.set_state(Lease.STATE_QUEUED)
158
            self.logger.info("Queued best-effort lease request #%i, %i nodes for %s." % (lease.id, lease.numnodes, lease.duration.requested))
159
            get_persistence().persist_lease(lease)
160

    
161
        # Schedule immediate leases
162
        for lease in im_leases:
163
            self.logger.info("Scheduling immediate lease #%i (%i nodes)" % (lease.id, lease.numnodes))
164
            lease.print_contents()
165
       
166
            try:
167
                self.__schedule_lease(lease, nexttime=nexttime)
168
                self.logger.info("Immediate lease #%i has been scheduled." % lease.id)
169
                lease.print_contents()
170
            except NotSchedulableException, exc:
171
                self.logger.info("Immediate lease request #%i cannot be scheduled: %s" % (lease.id, exc.reason))
172
                lease.set_state(Lease.STATE_REJECTED)
173
                self.completed_leases.add(lease)
174
                self.accounting.at_lease_done(lease)
175
                self.leases.remove(lease)            
176
            get_persistence().persist_lease(lease)
177

    
178
        # Schedule AR requests
179
        for lease in ar_leases:
180
            self.logger.info("Scheduling AR lease #%i, %i nodes from %s to %s." % (lease.id, lease.numnodes, lease.start.requested, lease.start.requested + lease.duration.requested))
181
            lease.print_contents()
182
            
183
            try:
184
                self.__schedule_lease(lease, nexttime)
185
                self.logger.info("AR lease #%i has been scheduled." % lease.id)
186
                lease.print_contents()
187
            except NotSchedulableException, exc:
188
                self.logger.info("AR lease request #%i cannot be scheduled: %s" % (lease.id, exc.reason))
189
                lease.set_state(Lease.STATE_REJECTED)
190
                self.completed_leases.add(lease)
191
                self.accounting.at_lease_done(lease)
192
                self.leases.remove(lease)            
193
            get_persistence().persist_lease(lease)
194
            
195
        # Process queue (i.e., traverse queue in search of leases that can be scheduled)
196
        self.__process_queue(nexttime)
197
        get_persistence().persist_queue(self.queue)
198
    
199
    def process_starting_reservations(self, nowtime):
200
        """Processes starting reservations
201
        
202
        This method checks the slottable to see if there are any reservations that are
203
        starting at "nowtime". If so, the appropriate handler is called.
204

205
        Arguments:
206
        nowtime -- Time at which to check for starting reservations.
207
        """
208

    
209
        # Find starting/ending reservations
210
        starting = self.slottable.get_reservations_starting_at(nowtime)
211
        starting = [res for res in starting if res.state == ResourceReservation.STATE_SCHEDULED]
212
        
213
        # Process starting reservations
214
        for rr in starting:
215
            lease = rr.lease
216
            # Call the appropriate handler, and catch exceptions and errors.
217
            try:
218
                self.handlers[type(rr)].on_start(lease, rr)
219
                
220
            # An InconsistentLeaseStateError is raised when the lease is in an inconsistent
221
            # state. This is usually indicative of a programming error, but not necessarily
222
            # one that affects all leases, so we just fail this lease. Note that Haizea can also
223
            # be configured to stop immediately when a lease fails.
224
            except InconsistentLeaseStateError, exc:
225
                self.fail_lease(lease, exc)
226
            # An EnactmentError is raised when the handler had to perform an enactment action
227
            # (e.g., stopping a VM), and that enactment action failed. This is currently treated
228
            # as a non-recoverable error for the lease, and the lease is failed.
229
            except EnactmentError, exc:
230
                self.fail_lease(lease, exc)
231

    
232
            # Other exceptions are not expected, and generally indicate a programming error.
233
            # Thus, they are propagated upwards to the Manager where they will make
234
            # Haizea crash and burn.
235
            
236
            get_persistence().persist_lease(lease)
237

    
238
    def process_ending_reservations(self, nowtime):
239
        """Processes ending reservations
240
        
241
        This method checks the slottable to see if there are any reservations that are
242
        ending at "nowtime". If so, the appropriate handler is called.
243

244
        Arguments:
245
        nowtime -- Time at which to check for starting/ending reservations.
246
        """
247

    
248
        # Find starting/ending reservations
249
        ending = self.slottable.get_reservations_ending_at(nowtime)
250
        ending = [res for res in ending if res.state == ResourceReservation.STATE_ACTIVE]
251

    
252
        # Process ending reservations
253
        for rr in ending:
254
            lease = rr.lease
255
            self._handle_end_rr(rr)
256
            
257
            # Call the appropriate handler, and catch exceptions and errors.
258
            try:
259
                self.handlers[type(rr)].on_end(lease, rr)
260
                
261
            # A RescheduleLeaseException indicates that the lease has to be rescheduled
262
            except RescheduleLeaseException, exc:
263
                # Currently, the only leases that get rescheduled are best-effort leases,
264
                # once they've been suspended.
265
                if rr.lease.get_type() == Lease.BEST_EFFORT:
266
                    if lease.get_state() == Lease.STATE_SUSPENDED_PENDING:
267
                        # Put back in the queue, in the same order it arrived
268
                        self.__enqueue_in_order(lease)
269
                        lease.set_state(Lease.STATE_SUSPENDED_QUEUED)
270
                        get_persistence().persist_queue(self.queue)
271
                    else:
272
                        raise InconsistentLeaseStateError(lease, doing = "rescheduling best-effort lease")
273
                    
274
            # A NormalEndLeaseException indicates that the end of this reservations marks
275
            # the normal end of the lease.
276
            except NormalEndLeaseException, msg:
277
                self._handle_end_lease(lease)
278
                
279
            # An InconsistentLeaseStateError is raised when the lease is in an inconsistent
280
            # state. This is usually indicative of a programming error, but not necessarily
281
            # one that affects all leases, so we just fail this lease. Note that Haizea can also
282
            # be configured to stop immediately when a lease fails.
283
            except InconsistentLeaseStateError, exc:
284
                self.fail_lease(lease, exc)
285
                
286
            # An EnactmentError is raised when the handler had to perform an enactment action
287
            # (e.g., stopping a VM), and that enactment action failed. This is currently treated
288
            # as a non-recoverable error for the lease, and the lease is failed.
289
            except EnactmentError, exc:
290
                self.fail_lease(lease, exc)
291
                
292
            # Other exceptions are not expected, and generally indicate a programming error.
293
            # Thus, they are propagated upwards to the Manager where they will make
294
            # Haizea crash and burn.
295
            
296
            get_persistence().persist_lease(lease)
297

    
298
    def get_lease_by_id(self, lease_id):
299
        """Gets a lease with the given ID
300
        
301
        This method is useful for UIs (like the CLI) that operate on the lease ID.
302
        If no lease with a given ID is found, None is returned.
303

304
        Arguments:
305
        lease_id -- The ID of the lease
306
        """
307
        if not self.leases.has_lease(lease_id):
308
            return None
309
        else:
310
            return self.leases.get_lease(lease_id)
311

    
312
    def cancel_lease(self, lease):
313
        """Cancels a lease.
314
        
315
        Arguments:
316
        lease -- Lease to cancel
317
        """
318
        time = get_clock().get_time()
319
        
320
        self.logger.info("Cancelling lease %i..." % lease.id)
321
            
322
        lease_state = lease.get_state()
323
        
324
        if lease_state == Lease.STATE_PENDING:
325
            # If a lease is pending, we just need to change its state and
326
            # remove it from the lease table. Since this is done at the
327
            # end of this method, we do nothing here.
328
            pass
329

    
330
        elif lease_state == Lease.STATE_ACTIVE:
331
            # If a lease is active, that means we have to shut down its VMs to cancel it.
332
            self.logger.info("Lease %i is active. Stopping active reservation..." % lease.id)
333
            vmrr = lease.get_active_vmrrs(time)[0]
334
            self._handle_end_rr(vmrr)
335
            self.vm_scheduler._handle_unscheduled_end_vm(lease, vmrr)
336
            
337
            # Force machines to shut down
338
            try:
339
                self.vm_scheduler.resourcepool.stop_vms(lease, vmrr)
340
            except EnactmentError, exc:
341
                self.logger.error("Enactment error when shutting down VMs.")
342
                # Right now, this is a non-recoverable error, so we just
343
                # propagate it upwards.
344
                # In the future, it may be possible to react to these
345
                # kind of errors.
346
                raise            
347

    
348
        elif lease_state in [Lease.STATE_SCHEDULED, Lease.STATE_SUSPENDED_SCHEDULED, Lease.STATE_READY, Lease.STATE_RESUMED_READY]:
349
            # If a lease is scheduled or ready, we just need to cancel all future reservations
350
            # for that lease
351
            self.logger.info("Lease %i is scheduled. Cancelling reservations." % lease.id)
352
            rrs = lease.get_scheduled_reservations()
353
            for r in rrs:
354
                self.slottable.remove_reservation(r)
355
            
356
        elif lease_state in [Lease.STATE_QUEUED, Lease.STATE_SUSPENDED_QUEUED]:
357
            # If a lease is in the queue, waiting to be scheduled, cancelling
358
            # just requires removing it from the queue
359
            
360
            self.logger.info("Lease %i is in the queue. Removing..." % lease.id)
361
            self.queue.remove_lease(lease)
362
            get_persistence().persist_queue(self.queue)
363
        else:
364
            # Cancelling in any of the other states is currently unsupported
365
            raise InconsistentLeaseStateError(lease, doing = "cancelling the VM")
366
            
367
        # Change state, and remove from lease table
368
        lease.set_state(Lease.STATE_CANCELLED)
369
        self.completed_leases.add(lease)
370
        self.leases.remove(lease)
371
        get_persistence().persist_lease(lease)
372

    
373
    
374
    def fail_lease(self, lease, exc=None):
375
        """Transitions a lease to a failed state, and does any necessary cleaning up
376
        
377
        Arguments:
378
        lease -- Lease to fail
379
        exc -- The exception that made the lease fail
380
        """
381
        treatment = get_config().get("lease-failure-handling")
382
        
383
        if treatment == constants.ONFAILURE_CANCEL:
384
            # In this case, a lease failure is handled by cancelling the lease,
385
            # but allowing Haizea to continue to run normally.
386
            rrs = lease.get_scheduled_reservations()
387
            for r in rrs:
388
                self.slottable.remove_reservation(r)
389
            lease.set_state(Lease.STATE_FAIL)
390
            self.completed_leases.add(lease)
391
            self.leases.remove(lease)
392
            get_persistence().persist_lease(lease)
393
        elif treatment == constants.ONFAILURE_EXIT or treatment == constants.ONFAILURE_EXIT_RAISE:
394
            # In this case, a lease failure makes Haizea exit. This is useful when debugging,
395
            # so we can immediately know about any errors.
396
            raise UnrecoverableError(exc)
397
            
398
    
399
    def notify_event(self, lease, event):
400
        """Notifies an event that affects a lease.
401
        
402
        This is the entry point of asynchronous events into the scheduler. Currently,
403
        the only supported event is the premature end of a VM (i.e., before its
404
        scheduled end). Other events will emerge when we integrate Haizea with OpenNebula 1.4,
405
        since that version will support sending asynchronous events to Haizea.
406
        
407
        Arguments:
408
        lease -- Lease the event refers to
409
        event -- Event type
410
        """
411
        time = get_clock().get_time()
412
        if event == constants.EVENT_END_VM:
413
            vmrr = lease.get_last_vmrr()
414
            self._handle_end_rr(vmrr)
415
            # TODO: Exception handling
416
            self.vm_scheduler._handle_unscheduled_end_vm(lease, vmrr)
417
            self._handle_end_lease(lease)
418
            get_persistence().persist_lease(lease)
419
            
420
            # We need to reevaluate the schedule to see if there are any 
421
            # leases scheduled in the future that could be rescheduled
422
            # to start earlier
423
            nexttime = get_clock().get_next_schedulable_time()
424
            self.reevaluate_schedule(nexttime)
425

    
426

    
427
    def reevaluate_schedule(self, nexttime):
428
        """Reevaluates the schedule.
429
        
430
        This method can be called whenever resources are freed up
431
        unexpectedly (e.g., a lease than ends earlier than expected))
432
        to check if any leases scheduled in the future could be
433
        rescheduled to start earlier on the freed up resources.
434
        
435
        Currently, this method only checks if best-effort leases
436
        scheduled in the future (using a backfilling algorithm)
437
        can be rescheduled
438
        
439
        Arguments:
440
        nexttime -- The next time at which the scheduler can allocate resources.
441
        """        
442
        future = self.vm_scheduler.get_future_reschedulable_leases()
443
        for l in future:
444
            # We can only reschedule leases in the following four states
445
            if l.get_state() in (Lease.STATE_PREPARING, Lease.STATE_READY, Lease.STATE_SCHEDULED, Lease.STATE_SUSPENDED_SCHEDULED):
446
                # For each reschedulable lease already scheduled in the
447
                # future, we cancel the lease's preparantion and
448
                # the last scheduled VM.
449
                vmrr = l.get_last_vmrr()
450
                self.preparation_scheduler.cancel_preparation(l)
451
                self.vm_scheduler.cancel_vm(vmrr)
452
                l.remove_vmrr(vmrr)
453
                if l.get_state() in (Lease.STATE_READY, Lease.STATE_SCHEDULED, Lease.STATE_PREPARING):
454
                    l.set_state(Lease.STATE_PENDING)
455
                elif l.get_state() == Lease.STATE_SUSPENDED_SCHEDULED:
456
                    l.set_state(Lease.STATE_SUSPENDED_PENDING)
457

    
458
                # At this point, the lease just looks like a regular
459
                # pending lease that can be handed off directly to the
460
                # __schedule_lease method.
461
                # TODO: We should do exception handling here. However,
462
                # since we can only reschedule best-effort leases that were
463
                # originally schedule in the future, the scheduling function 
464
                # should always be able to schedule the lease (worst-case 
465
                # scenario is that it simply replicates the previous schedule)
466
                self.__schedule_lease(l, nexttime)
467

    
468

    
469
    def is_queue_empty(self):
470
        """Return True is the queue is empty, False otherwise"""
471
        return self.queue.is_empty()
472

    
473
    
474
    def exists_scheduled_leases(self):
475
        """Return True if there are any leases scheduled in the future"""
476
        return not self.slottable.is_empty()    
477

    
478
            
479
    def __process_queue(self, nexttime):
480
        """ Traverses the queue in search of leases that can be scheduled.
481
        
482
        This method processes the queue in order, but takes into account that
483
        it may be possible to schedule leases in the future (using a 
484
        backfilling algorithm)
485
        
486
        Arguments:
487
        nexttime -- The next time at which the scheduler can allocate resources.
488
        """        
489
        
490
        done = False
491
        newqueue = Queue()
492
        while not done and not self.is_queue_empty():
493
            if not self.vm_scheduler.can_schedule_in_future() and self.slottable.is_full(nexttime, restype = constants.RES_CPU):
494
                self.logger.debug("Used up all future reservations and slot table is full. Skipping rest of queue.")
495
                done = True
496
            else:
497
                lease = self.queue.dequeue()
498
                try:
499
                    self.logger.info("Next request in the queue is lease %i. Attempting to schedule..." % lease.id)
500
                    lease.print_contents()
501
                    self.__schedule_lease(lease, nexttime)
502
                except NotSchedulableException, msg:
503
                    # Put back on queue
504
                    newqueue.enqueue(lease)
505
                    self.logger.info("Lease %i could not be scheduled at this time." % lease.id)
506
                    if get_config().get("backfilling") == constants.BACKFILLING_OFF:
507
                        done = True
508
        
509
        for lease in self.queue:
510
            newqueue.enqueue(lease)
511
        
512
        self.queue = newqueue 
513
    
514

    
515
    def __schedule_lease(self, lease, nexttime):            
516
        """ Schedules a lease.
517
        
518
        This method orchestrates the preparation and VM scheduler to
519
        schedule a lease.
520
        
521
        Arguments:
522
        lease -- Lease to schedule.
523
        nexttime -- The next time at which the scheduler can allocate resources.
524
        """       
525
                
526
        lease_state = lease.get_state()
527
        migration = get_config().get("migration")
528
        
529
        # Determine earliest start time in each node
530
        if lease_state == Lease.STATE_PENDING or lease_state == Lease.STATE_QUEUED:
531
            # This lease might require preparation. Ask the preparation
532
            # scheduler for the earliest starting time.
533
            earliest = self.preparation_scheduler.find_earliest_starting_times(lease, nexttime)
534
        elif lease_state == Lease.STATE_SUSPENDED_PENDING or lease_state == Lease.STATE_SUSPENDED_QUEUED:
535
            # This lease may have to be migrated.
536
            # We have to ask both the preparation scheduler and the VM
537
            # scheduler what would be the earliest possible starting time
538
            # on each node, assuming we have to transfer files between
539
            # nodes.
540

    
541
            node_ids = self.slottable.nodes.keys()
542
            earliest = {}
543
            if migration == constants.MIGRATE_NO:
544
                # If migration is disabled, the earliest starting time
545
                # is simply nexttime.
546
                for node in node_ids:
547
                    earliest[node] = EarliestStartingTime(nexttime, EarliestStartingTime.EARLIEST_NOPREPARATION)
548
            else:
549
                # Otherwise, we ask the preparation scheduler and the VM
550
                # scheduler how long it would take them to migrate the
551
                # lease state.
552
                prep_migr_time = self.preparation_scheduler.estimate_migration_time(lease)            
553
                vm_migr_time = self.vm_scheduler.estimate_migration_time(lease)
554
                for node in node_ids:
555
                    earliest[node] = EarliestStartingTime(nexttime + prep_migr_time + vm_migr_time, EarliestStartingTime.EARLIEST_MIGRATION)
556
        else:
557
            raise InconsistentLeaseStateError(lease, doing = "scheduling a best-effort lease")
558

    
559
        # Now, we give the lease to the VM scheduler, along with the
560
        # earliest possible starting times. If the VM scheduler can
561
        # schedule VMs for this lease, it will return a resource reservation
562
        # that we can add to the slot table, along with a list of
563
        # leases that have to be preempted.
564
        # If the VM scheduler can't schedule the VMs, it will throw an
565
        # exception (we don't catch it here, and it is just thrown up
566
        # to the calling method.
567
        (vmrr, preemptions) = self.vm_scheduler.schedule(lease, nexttime, earliest)
568
                                
569
        # If scheduling the lease involves preempting other leases,
570
        # go ahead and preempt them.
571
        if len(preemptions) > 0:
572
            self.logger.info("Must preempt leases %s to make room for lease #%i" % ([l.id for l in preemptions], lease.id))
573
            for l in preemptions:
574
                self.__preempt_lease(l, preemption_time=vmrr.start)
575
                
576
        # Schedule lease preparation
577
        is_ready = False
578
        preparation_rrs = []
579
        if lease_state in (Lease.STATE_SUSPENDED_PENDING, Lease.STATE_SUSPENDED_QUEUED) and migration != constants.MIGRATE_NO:
580
            # The lease might require migration
581
            migr_rrs = self.preparation_scheduler.schedule_migration(lease, vmrr, nexttime)
582
            if len(migr_rrs) > 0:
583
                end_migr = migr_rrs[-1].end
584
            else:
585
                end_migr = nexttime
586
            migr_rrs += self.vm_scheduler.schedule_migration(lease, vmrr, end_migr)
587
            migr_rrs.reverse()
588
            for migr_rr in migr_rrs:
589
                vmrr.pre_rrs.insert(0, migr_rr)
590
            if len(migr_rrs) == 0:
591
                is_ready = True
592
        elif lease_state in (Lease.STATE_SUSPENDED_PENDING, Lease.STATE_SUSPENDED_QUEUED) and migration == constants.MIGRATE_NO:
593
            # No migration means the lease is ready
594
            is_ready = True
595
        elif lease_state in (Lease.STATE_PENDING, Lease.STATE_QUEUED):
596
            # The lease might require initial preparation
597
            preparation_rrs, is_ready = self.preparation_scheduler.schedule(lease, vmrr, earliest)
598

    
599
        # At this point, the lease is feasible.
600
        # Commit changes by adding RRs to lease and to slot table
601
        
602
        # Add preparation RRs (if any) to lease
603
        for rr in preparation_rrs:
604
            lease.append_preparationrr(rr)
605
        
606
        # Add VMRR to lease
607
        lease.append_vmrr(vmrr)
608
        
609

    
610
        # Add resource reservations to slottable
611
        
612
        # Preparation RRs (if any)
613
        for rr in preparation_rrs:
614
            self.slottable.add_reservation(rr)
615
        
616
        # Pre-VM RRs (if any)
617
        for rr in vmrr.pre_rrs:
618
            self.slottable.add_reservation(rr)
619
            
620
        # VM
621
        self.slottable.add_reservation(vmrr)
622
        
623
        # Post-VM RRs (if any)
624
        for rr in vmrr.post_rrs:
625
            self.slottable.add_reservation(rr)
626
          
627
        # Change lease state
628
        if lease_state == Lease.STATE_PENDING or lease_state == Lease.STATE_QUEUED:
629
            lease.set_state(Lease.STATE_SCHEDULED)
630
            if is_ready:
631
                lease.set_state(Lease.STATE_READY)
632
        elif lease_state == Lease.STATE_SUSPENDED_PENDING or lease_state == Lease.STATE_SUSPENDED_QUEUED:
633
            lease.set_state(Lease.STATE_SUSPENDED_SCHEDULED)
634

    
635
        get_persistence().persist_lease(lease)
636

    
637
        lease.print_contents()
638

    
639
        
640
    def __preempt_lease(self, lease, preemption_time):
641
        """ Preempts a lease.
642
        
643
        This method preempts a lease such that any resources allocated
644
        to that lease after a given time are freed up. This may require
645
        scheduling the lease to suspend before that time, or cancelling
646
        the lease altogether.
647
        
648
        Arguments:
649
        lease -- Lease to schedule.
650
        preemption_time -- Time at which lease must be preempted
651
        """       
652
        
653
        self.logger.info("Preempting lease #%i..." % (lease.id))
654
        self.logger.vdebug("Lease before preemption:")
655
        lease.print_contents()
656
        vmrr = lease.get_last_vmrr()
657
        
658
        if vmrr.state == ResourceReservation.STATE_SCHEDULED and vmrr.start >= preemption_time:
659
            self.logger.debug("Lease was set to start in the middle of the preempting lease.")
660
            must_cancel_and_requeue = True
661
        else:
662
            susptype = get_config().get("suspension")
663
            if susptype == constants.SUSPENSION_NONE:
664
                must_cancel_and_requeue = True
665
            else:
666
                can_suspend = self.vm_scheduler.can_suspend_at(lease, preemption_time)
667
                if not can_suspend:
668
                    self.logger.debug("Suspending the lease does not meet scheduling threshold.")
669
                    must_cancel_and_requeue = True
670
                else:
671
                    if lease.numnodes > 1 and susptype == constants.SUSPENSION_SERIAL:
672
                        self.logger.debug("Can't suspend lease because only suspension of single-node leases is allowed.")
673
                        must_cancel_and_requeue = True
674
                    else:
675
                        self.logger.debug("Lease can be suspended")
676
                        must_cancel_and_requeue = False
677
                    
678
        if must_cancel_and_requeue:
679
            self.logger.info("... lease #%i has been cancelled and requeued." % lease.id)
680
            self.preparation_scheduler.cancel_preparation(lease)
681
            self.vm_scheduler.cancel_vm(vmrr)
682
            lease.remove_vmrr(vmrr)
683
            # TODO: Take into account other states
684
            if lease.get_state() == Lease.STATE_SUSPENDED_SCHEDULED:
685
                lease.set_state(Lease.STATE_SUSPENDED_QUEUED)
686
            else:
687
                lease.set_state(Lease.STATE_QUEUED)
688
            self.__enqueue_in_order(lease)
689
        else:
690
            self.logger.info("... lease #%i will be suspended at %s." % (lease.id, preemption_time))
691
            self.vm_scheduler.preempt_vm(vmrr, preemption_time)            
692
            
693
        get_persistence().persist_lease(lease)
694

    
695
        self.logger.vdebug("Lease after preemption:")
696
        lease.print_contents()
697
                
698
  
699
    def __enqueue(self, lease):
700
        """Queues a best-effort lease request
701
        
702
        Arguments:
703
        lease -- Lease to be queued
704
        """
705
        self.queue.enqueue(lease)
706

    
707

    
708
    def __enqueue_in_order(self, lease):
709
        """Queues a lease in order (currently, time of submission)
710
        
711
        Arguments:
712
        lease -- Lease to be queued
713
        """
714
        self.queue.enqueue_in_order(lease)
715

    
716

    
717
    def _handle_end_rr(self, rr):
718
        """Performs actions that have to be done each time a reservation ends.
719
        
720
        Arguments:
721
        rr -- Reservation that ended
722
        """
723
        self.slottable.remove_reservation(rr)
724
        
725

    
726
    def _handle_end_lease(self, l):
727
        """Performs actions that have to be done each time a lease ends.
728
        
729
        Arguments:
730
        lease -- Lease that has ended
731
        """
732
        l.set_state(Lease.STATE_DONE)
733
        l.duration.actual = l.duration.accumulated
734
        l.end = round_datetime(get_clock().get_time())
735
        self.preparation_scheduler.cleanup(l)
736
        self.completed_leases.add(l)
737
        self.leases.remove(l)
738
        self.accounting.at_lease_done(l)
739

    
740
        
741

    
742
class Queue(object):
743
    """A simple queue for leases
744
    
745
    This class is a simple queue container for leases, with some
746
    extra syntactic sugar added for convenience.    
747
    """    
748

    
749
    def __init__(self):
750
        self.__q = []
751
        
752
    def is_empty(self):
753
        return len(self.__q)==0
754
    
755
    def enqueue(self, r):
756
        self.__q.append(r)
757
    
758
    def dequeue(self):
759
        return self.__q.pop(0)
760
    
761
    def enqueue_in_order(self, r):
762
        self.__q.append(r)
763
        self.__q.sort(key=attrgetter("submit_time"))
764

    
765
    def length(self):
766
        return len(self.__q)
767
    
768
    def has_lease(self, lease_id):
769
        return (1 == len([l for l in self.__q if l.id == lease_id]))
770
    
771
    def get_lease(self, lease_id):
772
        return [l for l in self.__q if l.id == lease_id][0]
773
    
774
    def remove_lease(self, lease):
775
        self.__q.remove(lease)
776
    
777
    def __iter__(self):
778
        return iter(self.__q)
779
        
780
class LeaseTable(object):
781
    """A simple container for leases
782
    
783
    This class is a simple dictionary-like container for leases, with some
784
    extra syntactic sugar added for convenience.    
785
    """    
786
    
787
    def __init__(self):
788
        self.entries = {}
789
        
790
    def has_lease(self, lease_id):
791
        return self.entries.has_key(lease_id)
792
        
793
    def get_lease(self, lease_id):
794
        return self.entries[lease_id]
795
    
796
    def is_empty(self):
797
        return len(self.entries)==0
798
    
799
    def remove(self, lease):
800
        del self.entries[lease.id]
801
        
802
    def add(self, lease):
803
        self.entries[lease.id] = lease
804
        
805
    def get_leases(self, type=None):
806
        if type==None:
807
            return self.entries.values()
808
        else:
809
            return [e for e in self.entries.values() if e.get_type() == type]
810

    
811
    def get_leases_by_state(self, state):
812
        return [e for e in self.entries.values() if e.get_state() == state]
813
 
814