Project

General

Profile

root / trunk / src / haizea / core / scheduler / lease_scheduler.py @ 641

1
# -------------------------------------------------------------------------- #
2
# Copyright 2006-2009, University of Chicago                                 #
3
# Copyright 2008-2009, Distributed Systems Architecture Group, Universidad   #
4
# Complutense de Madrid (dsa-research.org)                                   #
5
#                                                                            #
6
# Licensed under the Apache License, Version 2.0 (the "License"); you may    #
7
# not use this file except in compliance with the License. You may obtain    #
8
# a copy of the License at                                                   #
9
#                                                                            #
10
# http://www.apache.org/licenses/LICENSE-2.0                                 #
11
#                                                                            #
12
# Unless required by applicable law or agreed to in writing, software        #
13
# distributed under the License is distributed on an "AS IS" BASIS,          #
14
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.   #
15
# See the License for the specific language governing permissions and        #
16
# limitations under the License.                                             #
17
# -------------------------------------------------------------------------- #
18

    
19

    
20
"""This module provides the main classes for Haizea's lease scheduler, particularly
21
the LeaseScheduler class. This module does *not* contain VM scheduling code (i.e.,
22
the code that decides what physical hosts a VM should be mapped to), which is
23
located in the vm_scheduler module. Lease preparation code (e.g., image transfer 
24
scheduling) is located in the preparation_schedulers package. In fact, the
25
main purpose of the lease schedule is to orchestrate these preparation and VM
26
schedulers.
27

28
This module also includes a Queue class and a LeaseTable class, which are used
29
by the lease scheduler.
30
"""
31

    
32
import haizea.common.constants as constants
33
from haizea.common.utils import round_datetime, get_config, get_accounting, get_clock, get_policy
34
from haizea.core.leases import Lease
35
from haizea.core.scheduler import RescheduleLeaseException, NormalEndLeaseException, InconsistentLeaseStateError, EnactmentError, UnrecoverableError, NotSchedulableException, EarliestStartingTime
36
from haizea.core.scheduler.slottable import ResourceReservation
37
from haizea.core.scheduler.vm_scheduler import VMResourceReservation
38
from operator import attrgetter
39

    
40
import logging
41

    
42
class LeaseScheduler(object):
43
    """The Haizea Lease Scheduler
44
    
45
    This is the main scheduling class in Haizea. It handles lease scheduling which,
46
    in turn, involves VM scheduling, preparation scheduling (such as transferring
47
    a VM image), and numerous bookkeeping operations. All these operations are
48
    handled by other classes, so this class acts mostly as an orchestrator that
49
    coordinates all the different operations involved in scheduling a lease.    
50
    """
51
    
52
    def __init__(self, vm_scheduler, preparation_scheduler, slottable):
53
        """Constructor
54
        
55
        The constructor does little more than create the lease scheduler's
56
        attributes. However, it does expect (in the arguments) a fully-constructed 
57
        VMScheduler, PreparationScheduler, SlotTable, and PolicyManager (these are 
58
        constructed in the Manager's constructor). 
59
        
60
        Arguments:
61
        vm_scheduler -- VM scheduler
62
        preparation_scheduler -- Preparation scheduler
63
        slottable -- Slottable
64
        """
65
        
66
        # Logger
67
        self.logger = logging.getLogger("LSCHED")
68
        
69
        # Assign schedulers and slottable
70
        self.vm_scheduler = vm_scheduler
71
        self.preparation_scheduler = preparation_scheduler
72
        self.slottable = slottable
73

    
74
        # Create other data structures
75
        self.queue = Queue()
76
        self.leases = LeaseTable()
77
        self.completed_leases = LeaseTable()
78

    
79
        # Handlers are callback functions that get called whenever a type of
80
        # resource reservation starts or ends. Each scheduler publishes the
81
        # handlers it supports through its "handlers" attributes. For example,
82
        # the VMScheduler provides _handle_start_vm and _handle_end_vm that
83
        # must be called when a VMResourceReservation start or end is encountered
84
        # in the slot table.
85
        #
86
        # Handlers are called from the process_reservations method of this class
87
        self.handlers = {}
88
        for (type, handler) in self.vm_scheduler.handlers.items():
89
            self.handlers[type] = handler
90

    
91
        for (type, handler) in self.preparation_scheduler.handlers.items():
92
            self.handlers[type] = handler
93

    
94

    
95
    def request_lease(self, lease):
96
        """Requests a leases. This is the entry point of leases into the scheduler.
97
        
98
        Request a lease. The decision on whether to accept or reject a
99
        lease is deferred to the policy manager (through its admission
100
        control policy). 
101
        
102
        If the policy determines the lease can be
103
        accepted, it is marked as "Pending". This still doesn't
104
        guarantee that the lease will be scheduled (e.g., an AR lease
105
        could still be rejected if the scheduler determines there are no
106
        resources for it; but that is a *scheduling* decision, not a admission
107
        control policy decision). The ultimate fate of the lease is determined
108
        the next time the scheduling function is called.
109
        
110
        If the policy determines the lease cannot be accepted, it is marked
111
        as rejected.
112

113
        Arguments:
114
        lease -- Lease object. Its state must be STATE_NEW.
115
        """
116
        self.logger.info("Lease #%i has been requested." % lease.id)
117
        if lease.submit_time == None:
118
            lease.submit_time = round_datetime(get_clock().get_time())
119
        lease.print_contents()
120
        lease.set_state(Lease.STATE_PENDING)
121
        if get_policy().accept_lease(lease):
122
            self.logger.info("Lease #%i has been marked as pending." % lease.id)
123
            self.leases.add(lease)
124
        else:
125
            self.logger.info("Lease #%i has not been accepted" % lease.id)
126
            lease.set_state(Lease.STATE_REJECTED)
127
            self.completed_leases.add(lease)
128

    
129
        
130
    def schedule(self, nexttime):
131
        """ The main scheduling function
132
        
133
        The scheduling function looks at all pending requests and schedules them.
134
        Note that most of the actual scheduling code is contained in the
135
        __schedule_lease method and in the VMScheduler and PreparationScheduler classes.
136
        
137
        Arguments:
138
        nexttime -- The next time at which the scheduler can allocate resources.
139
        """
140
        
141
        # Get pending leases
142
        pending_leases = self.leases.get_leases_by_state(Lease.STATE_PENDING)  
143
        ar_leases = [req for req in pending_leases if req.get_type() == Lease.ADVANCE_RESERVATION]
144
        im_leases = [req for req in pending_leases if req.get_type() == Lease.IMMEDIATE]
145
        be_leases = [req for req in pending_leases if req.get_type() == Lease.BEST_EFFORT]
146
        
147
        # Queue best-effort leases
148
        for lease in be_leases:
149
            self.__enqueue(lease)
150
            lease.set_state(Lease.STATE_QUEUED)
151
            self.logger.info("Queued best-effort lease request #%i, %i nodes for %s." % (lease.id, lease.numnodes, lease.duration.requested))
152

    
153
        # Schedule immediate leases
154
        for lease in im_leases:
155
            self.logger.info("Scheduling immediate lease #%i (%i nodes)" % (lease.id, lease.numnodes))
156
            lease.print_contents()
157
       
158
            try:
159
                self.__schedule_lease(lease, nexttime=nexttime)
160
                self.logger.info("Immediate lease #%i has been scheduled." % lease.id)
161
                get_accounting().incr_counter(constants.COUNTER_IMACCEPTED, lease.id)
162
                lease.print_contents()
163
            except NotSchedulableException, exc:
164
                get_accounting().incr_counter(constants.COUNTER_IMREJECTED, lease.id)
165
                self.logger.info("Immediate lease request #%i cannot be scheduled: %s" % (lease.id, exc.reason))
166
                lease.set_state(Lease.STATE_REJECTED)
167
                self.completed_leases.add(lease)
168
                self.leases.remove(lease)            
169

    
170
        # Schedule AR requests
171
        for lease in ar_leases:
172
            self.logger.info("Scheduling AR lease #%i, %i nodes from %s to %s." % (lease.id, lease.numnodes, lease.start.requested, lease.start.requested + lease.duration.requested))
173
            lease.print_contents()
174
            
175
            try:
176
                self.__schedule_lease(lease, nexttime)
177
                self.logger.info("AR lease #%i has been scheduled." % lease.id)
178
                get_accounting().incr_counter(constants.COUNTER_ARACCEPTED, lease.id)
179
                lease.print_contents()
180
            except NotSchedulableException, exc:
181
                get_accounting().incr_counter(constants.COUNTER_ARREJECTED, lease.id)
182
                self.logger.info("AR lease request #%i cannot be scheduled: %s" % (lease.id, exc.reason))
183
                lease.set_state(Lease.STATE_REJECTED)
184
                self.completed_leases.add(lease)
185
                self.leases.remove(lease)            
186
            
187
        # Process queue (i.e., traverse queue in search of leases that can be scheduled)
188
        self.__process_queue(nexttime)
189
        
190
    
191
    def process_starting_reservations(self, nowtime):
192
        """Processes starting reservations
193
        
194
        This method checks the slottable to see if there are any reservations that are
195
        starting at "nowtime". If so, the appropriate handler is called.
196

197
        Arguments:
198
        nowtime -- Time at which to check for starting reservations.
199
        """
200

    
201
        # Find starting/ending reservations
202
        starting = self.slottable.get_reservations_starting_at(nowtime)
203
        starting = [res for res in starting if res.state == ResourceReservation.STATE_SCHEDULED]
204
        
205
        # Process starting reservations
206
        for rr in starting:
207
            lease = rr.lease
208
            # Call the appropriate handler, and catch exceptions and errors.
209
            try:
210
                self.handlers[type(rr)].on_start(lease, rr)
211
                
212
            # An InconsistentLeaseStateError is raised when the lease is in an inconsistent
213
            # state. This is usually indicative of a programming error, but not necessarily
214
            # one that affects all leases, so we just fail this lease. Note that Haizea can also
215
            # be configured to stop immediately when a lease fails.
216
            except InconsistentLeaseStateError, exc:
217
                self.fail_lease(lease, exc)
218
            # An EnactmentError is raised when the handler had to perform an enactment action
219
            # (e.g., stopping a VM), and that enactment action failed. This is currently treated
220
            # as a non-recoverable error for the lease, and the lease is failed.
221
            except EnactmentError, exc:
222
                self.fail_lease(lease, exc)
223

    
224
            # Other exceptions are not expected, and generally indicate a programming error.
225
            # Thus, they are propagated upwards to the Manager where they will make
226
            # Haizea crash and burn.
227

    
228
    def process_ending_reservations(self, nowtime):
229
        """Processes ending reservations
230
        
231
        This method checks the slottable to see if there are any reservations that are
232
        ending at "nowtime". If so, the appropriate handler is called.
233

234
        Arguments:
235
        nowtime -- Time at which to check for starting/ending reservations.
236
        """
237

    
238
        # Find starting/ending reservations
239
        ending = self.slottable.get_reservations_ending_at(nowtime)
240
        ending = [res for res in ending if res.state == ResourceReservation.STATE_ACTIVE]
241

    
242
        # Process ending reservations
243
        for rr in ending:
244
            lease = rr.lease
245
            self._handle_end_rr(rr)
246
            
247
            # Call the appropriate handler, and catch exceptions and errors.
248
            try:
249
                self.handlers[type(rr)].on_end(lease, rr)
250
                
251
            # A RescheduleLeaseException indicates that the lease has to be rescheduled
252
            except RescheduleLeaseException, exc:
253
                # Currently, the only leases that get rescheduled are best-effort leases,
254
                # once they've been suspended.
255
                if rr.lease.get_type() == Lease.BEST_EFFORT:
256
                    if lease.get_state() == Lease.STATE_SUSPENDED_PENDING:
257
                        # Put back in the queue, in the same order it arrived
258
                        self.__enqueue_in_order(lease)
259
                        lease.set_state(Lease.STATE_SUSPENDED_QUEUED)
260
                    else:
261
                        raise InconsistentLeaseStateError(lease, doing = "rescheduling best-effort lease")
262
                    
263
            # A NormalEndLeaseException indicates that the end of this reservations marks
264
            # the normal end of the lease.
265
            except NormalEndLeaseException, msg:
266
                self._handle_end_lease(lease)
267
                
268
            # An InconsistentLeaseStateError is raised when the lease is in an inconsistent
269
            # state. This is usually indicative of a programming error, but not necessarily
270
            # one that affects all leases, so we just fail this lease. Note that Haizea can also
271
            # be configured to stop immediately when a lease fails.
272
            except InconsistentLeaseStateError, exc:
273
                self.fail_lease(lease, exc)
274
                
275
            # An EnactmentError is raised when the handler had to perform an enactment action
276
            # (e.g., stopping a VM), and that enactment action failed. This is currently treated
277
            # as a non-recoverable error for the lease, and the lease is failed.
278
            except EnactmentError, exc:
279
                self.fail_lease(lease, exc)
280
                
281
            # Other exceptions are not expected, and generally indicate a programming error.
282
            # Thus, they are propagated upwards to the Manager where they will make
283
            # Haizea crash and burn.
284

    
285
    def get_lease_by_id(self, lease_id):
286
        """Gets a lease with the given ID
287
        
288
        This method is useful for UIs (like the CLI) that operate on the lease ID.
289
        If no lease with a given ID is found, None is returned.
290

291
        Arguments:
292
        lease_id -- The ID of the lease
293
        """
294
        if not self.leases.has_lease(lease_id):
295
            return None
296
        else:
297
            return self.leases.get_lease(lease_id)
298

    
299
    def cancel_lease(self, lease):
300
        """Cancels a lease.
301
        
302
        Arguments:
303
        lease -- Lease to cancel
304
        """
305
        time = get_clock().get_time()
306
        
307
        self.logger.info("Cancelling lease %i..." % lease.id)
308
            
309
        lease_state = lease.get_state()
310
        
311
        if lease_state == Lease.STATE_PENDING:
312
            # If a lease is pending, we just need to change its state and
313
            # remove it from the lease table. Since this is done at the
314
            # end of this method, we do nothing here.
315
            pass
316

    
317
        elif lease_state == Lease.STATE_ACTIVE:
318
            # If a lease is active, that means we have to shut down its VMs to cancel it.
319
            self.logger.info("Lease %i is active. Stopping active reservation..." % lease.id)
320
            vmrr = lease.get_active_vmrrs(time)[0]
321
            self.vm_scheduler._handle_unscheduled_end_vm(lease, vmrr)
322

    
323
        elif lease_state in [Lease.STATE_SCHEDULED, Lease.STATE_SUSPENDED_SCHEDULED, Lease.STATE_READY, Lease.STATE_RESUMED_READY]:
324
            # If a lease is scheduled or ready, we just need to cancel all future reservations
325
            # for that lease
326
            self.logger.info("Lease %i is scheduled. Cancelling reservations." % lease.id)
327
            rrs = lease.get_scheduled_reservations()
328
            for r in rrs:
329
                self.slottable.remove_reservation(r)
330
            
331
        elif lease_state in [Lease.STATE_QUEUED, Lease.STATE_SUSPENDED_QUEUED]:
332
            # If a lease is in the queue, waiting to be scheduled, cancelling
333
            # just requires removing it from the queue
334
            
335
            self.logger.info("Lease %i is in the queue. Removing..." % lease.id)
336
            self.queue.remove_lease(lease)
337
        else:
338
            # Cancelling in any of the other states is currently unsupported
339
            raise InconsistentLeaseStateError(lease, doing = "cancelling the VM")
340
            
341
        # Change state, and remove from lease table
342
        lease.set_state(Lease.STATE_CANCELLED)
343
        self.completed_leases.add(lease)
344
        self.leases.remove(lease)
345

    
346
    
347
    def fail_lease(self, lease, exc=None):
348
        """Transitions a lease to a failed state, and does any necessary cleaning up
349
        
350
        Arguments:
351
        lease -- Lease to fail
352
        exc -- The exception that made the lease fail
353
        """
354
        treatment = get_config().get("lease-failure-handling")
355
        
356
        if treatment == constants.ONFAILURE_CANCEL:
357
            # In this case, a lease failure is handled by cancelling the lease,
358
            # but allowing Haizea to continue to run normally.
359
            rrs = lease.get_scheduled_reservations()
360
            for r in rrs:
361
                self.slottable.remove_reservation(r)
362
            lease.set_state(Lease.STATE_FAIL)
363
            self.completed_leases.add(lease)
364
            self.leases.remove(lease)
365
        elif treatment == constants.ONFAILURE_EXIT or treatment == constants.ONFAILURE_EXIT_RAISE:
366
            # In this case, a lease failure makes Haizea exit. This is useful when debugging,
367
            # so we can immediately know about any errors.
368
            raise UnrecoverableError(exc)
369
            
370
    
371
    def notify_event(self, lease, event):
372
        """Notifies an event that affects a lease.
373
        
374
        This is the entry point of asynchronous events into the scheduler. Currently,
375
        the only supported event is the premature end of a VM (i.e., before its
376
        scheduled end). Other events will emerge when we integrate Haizea with OpenNebula 1.4,
377
        since that version will support sending asynchronous events to Haizea.
378
        
379
        Arguments:
380
        lease -- Lease the event refers to
381
        event -- Event type
382
        """
383
        time = get_clock().get_time()
384
        if event == constants.EVENT_END_VM:
385
            vmrr = lease.get_last_vmrr()
386
            self._handle_end_rr(vmrr)
387
            # TODO: Exception handling
388
            self.vm_scheduler._handle_unscheduled_end_vm(lease, vmrr)
389
            self._handle_end_lease(lease)
390
            nexttime = get_clock().get_next_schedulable_time()
391
            # We need to reevaluate the schedule to see if there are any 
392
            # leases scheduled in the future that could be rescheduled
393
            # to start earlier
394
            self.reevaluate_schedule(nexttime)
395

    
396

    
397
    def reevaluate_schedule(self, nexttime):
398
        """Reevaluates the schedule.
399
        
400
        This method can be called whenever resources are freed up
401
        unexpectedly (e.g., a lease than ends earlier than expected))
402
        to check if any leases scheduled in the future could be
403
        rescheduled to start earlier on the freed up resources.
404
        
405
        Currently, this method only checks if best-effort leases
406
        scheduled in the future (using a backfilling algorithm)
407
        can be rescheduled
408
        
409
        Arguments:
410
        nexttime -- The next time at which the scheduler can allocate resources.
411
        """        
412
        future = self.vm_scheduler.get_future_reschedulable_leases()
413
        for l in future:
414
            # We can only reschedule leases in the following four states
415
            if l.get_state() in (Lease.STATE_PREPARING, Lease.STATE_READY, Lease.STATE_SCHEDULED, Lease.STATE_SUSPENDED_SCHEDULED):
416
                # For each reschedulable lease already scheduled in the
417
                # future, we cancel the lease's preparantion and
418
                # the last scheduled VM.
419
                vmrr = l.get_last_vmrr()
420
                self.preparation_scheduler.cancel_preparation(l)
421
                self.vm_scheduler.cancel_vm(vmrr)
422
                l.remove_vmrr(vmrr)
423
                if l.get_state() in (Lease.STATE_READY, Lease.STATE_SCHEDULED, Lease.STATE_PREPARING):
424
                    l.set_state(Lease.STATE_PENDING)
425
                elif l.get_state() == Lease.STATE_SUSPENDED_SCHEDULED:
426
                    l.set_state(Lease.STATE_SUSPENDED_PENDING)
427

    
428
                # At this point, the lease just looks like a regular
429
                # pending lease that can be handed off directly to the
430
                # __schedule_lease method.
431
                # TODO: We should do exception handling here. However,
432
                # since we can only reschedule best-effort leases that were
433
                # originally schedule in the future, the scheduling function 
434
                # should always be able to schedule the lease (worst-case 
435
                # scenario is that it simply replicates the previous schedule)
436
                self.__schedule_lease(l, nexttime)
437

    
438

    
439
    def is_queue_empty(self):
440
        """Return True is the queue is empty, False otherwise"""
441
        return self.queue.is_empty()
442

    
443
    
444
    def exists_scheduled_leases(self):
445
        """Return True if there are any leases scheduled in the future"""
446
        return not self.slottable.is_empty()    
447

    
448
            
449
    def __process_queue(self, nexttime):
450
        """ Traverses the queue in search of leases that can be scheduled.
451
        
452
        This method processes the queue in order, but takes into account that
453
        it may be possible to schedule leases in the future (using a 
454
        backfilling algorithm)
455
        
456
        Arguments:
457
        nexttime -- The next time at which the scheduler can allocate resources.
458
        """        
459
        
460
        done = False
461
        newqueue = Queue()
462
        while not done and not self.is_queue_empty():
463
            if not self.vm_scheduler.can_schedule_in_future() and self.slottable.is_full(nexttime, restype = constants.RES_CPU):
464
                self.logger.debug("Used up all future reservations and slot table is full. Skipping rest of queue.")
465
                done = True
466
            else:
467
                lease = self.queue.dequeue()
468
                try:
469
                    self.logger.info("Next request in the queue is lease %i. Attempting to schedule..." % lease.id)
470
                    lease.print_contents()
471
                    self.__schedule_lease(lease, nexttime)
472
                    get_accounting().decr_counter(constants.COUNTER_QUEUESIZE, lease.id)
473
                except NotSchedulableException, msg:
474
                    # Put back on queue
475
                    newqueue.enqueue(lease)
476
                    self.logger.info("Lease %i could not be scheduled at this time." % lease.id)
477
                    if get_config().get("backfilling") == constants.BACKFILLING_OFF:
478
                        done = True
479
        
480
        for lease in self.queue:
481
            newqueue.enqueue(lease)
482
        
483
        self.queue = newqueue 
484
    
485

    
486
    def __schedule_lease(self, lease, nexttime):            
487
        """ Schedules a lease.
488
        
489
        This method orchestrates the preparation and VM scheduler to
490
        schedule a lease.
491
        
492
        Arguments:
493
        lease -- Lease to schedule.
494
        nexttime -- The next time at which the scheduler can allocate resources.
495
        """       
496
                
497
        lease_state = lease.get_state()
498
        migration = get_config().get("migration")
499
        
500
        # Determine earliest start time in each node
501
        if lease_state == Lease.STATE_PENDING or lease_state == Lease.STATE_QUEUED:
502
            # This lease might require preparation. Ask the preparation
503
            # scheduler for the earliest starting time.
504
            earliest = self.preparation_scheduler.find_earliest_starting_times(lease, nexttime)
505
        elif lease_state == Lease.STATE_SUSPENDED_PENDING or lease_state == Lease.STATE_SUSPENDED_QUEUED:
506
            # This lease may have to be migrated.
507
            # We have to ask both the preparation scheduler and the VM
508
            # scheduler what would be the earliest possible starting time
509
            # on each node, assuming we have to transfer files between
510
            # nodes.
511

    
512
            node_ids = self.slottable.nodes.keys()
513
            earliest = {}
514
            if migration == constants.MIGRATE_NO:
515
                # If migration is disabled, the earliest starting time
516
                # is simply nexttime.
517
                for node in node_ids:
518
                    earliest[node] = EarliestStartingTime(nexttime, EarliestStartingTime.EARLIEST_NOPREPARATION)
519
            else:
520
                # Otherwise, we ask the preparation scheduler and the VM
521
                # scheduler how long it would take them to migrate the
522
                # lease state.
523
                prep_migr_time = self.preparation_scheduler.estimate_migration_time(lease)            
524
                vm_migr_time = self.vm_scheduler.estimate_migration_time(lease)
525
                for node in node_ids:
526
                    earliest[node] = EarliestStartingTime(nexttime + prep_migr_time + vm_migr_time, EarliestStartingTime.EARLIEST_MIGRATION)
527
        else:
528
            raise InconsistentLeaseStateError(lease, doing = "scheduling a best-effort lease")
529

    
530
        # Now, we give the lease to the VM scheduler, along with the
531
        # earliest possible starting times. If the VM scheduler can
532
        # schedule VMs for this lease, it will return a resource reservation
533
        # that we can add to the slot table, along with a list of
534
        # leases that have to be preempted.
535
        # If the VM scheduler can't schedule the VMs, it will throw an
536
        # exception (we don't catch it here, and it is just thrown up
537
        # to the calling method.
538
        (vmrr, preemptions) = self.vm_scheduler.schedule(lease, nexttime, earliest)
539
                                
540
        # If scheduling the lease involves preempting other leases,
541
        # go ahead and preempt them.
542
        if len(preemptions) > 0:
543
            self.logger.info("Must preempt leases %s to make room for lease #%i" % ([l.id for l in preemptions], lease.id))
544
            for l in preemptions:
545
                self.__preempt_lease(l, preemption_time=vmrr.start)
546
                
547
        # Schedule lease preparation
548
        is_ready = False
549
        preparation_rrs = []
550
        if lease_state in (Lease.STATE_SUSPENDED_PENDING, Lease.STATE_SUSPENDED_QUEUED) and migration != constants.MIGRATE_NO:
551
            # The lease might require migration
552
            migr_rrs = self.preparation_scheduler.schedule_migration(lease, vmrr, nexttime)
553
            if len(migr_rrs) > 0:
554
                end_migr = migr_rrs[-1].end
555
            else:
556
                end_migr = nexttime
557
            migr_rrs += self.vm_scheduler.schedule_migration(lease, vmrr, end_migr)
558
            migr_rrs.reverse()
559
            for migr_rr in migr_rrs:
560
                vmrr.pre_rrs.insert(0, migr_rr)
561
            if len(migr_rrs) == 0:
562
                is_ready = True
563
        elif lease_state in (Lease.STATE_SUSPENDED_PENDING, Lease.STATE_SUSPENDED_QUEUED) and migration == constants.MIGRATE_NO:
564
            # No migration means the lease is ready
565
            is_ready = True
566
        elif lease_state in (Lease.STATE_PENDING, Lease.STATE_QUEUED):
567
            # The lease might require initial preparation
568
            preparation_rrs, is_ready = self.preparation_scheduler.schedule(lease, vmrr, earliest)
569

    
570
        # At this point, the lease is feasible.
571
        # Commit changes by adding RRs to lease and to slot table
572
        
573
        # Add preparation RRs (if any) to lease
574
        for rr in preparation_rrs:
575
            lease.append_preparationrr(rr)
576
        
577
        # Add VMRR to lease
578
        lease.append_vmrr(vmrr)
579
        
580

    
581
        # Add resource reservations to slottable
582
        
583
        # Preparation RRs (if any)
584
        for rr in preparation_rrs:
585
            self.slottable.add_reservation(rr)
586
        
587
        # Pre-VM RRs (if any)
588
        for rr in vmrr.pre_rrs:
589
            self.slottable.add_reservation(rr)
590
            
591
        # VM
592
        self.slottable.add_reservation(vmrr)
593
        
594
        # Post-VM RRs (if any)
595
        for rr in vmrr.post_rrs:
596
            self.slottable.add_reservation(rr)
597
          
598
        # Change lease state
599
        if lease_state == Lease.STATE_PENDING or lease_state == Lease.STATE_QUEUED:
600
            lease.set_state(Lease.STATE_SCHEDULED)
601
            if is_ready:
602
                lease.set_state(Lease.STATE_READY)
603
        elif lease_state == Lease.STATE_SUSPENDED_PENDING or lease_state == Lease.STATE_SUSPENDED_QUEUED:
604
            lease.set_state(Lease.STATE_SUSPENDED_SCHEDULED)
605

    
606
        lease.print_contents()
607

    
608
        
609
    def __preempt_lease(self, lease, preemption_time):
610
        """ Preempts a lease.
611
        
612
        This method preempts a lease such that any resources allocated
613
        to that lease after a given time are freed up. This may require
614
        scheduling the lease to suspend before that time, or cancelling
615
        the lease altogether.
616
        
617
        Arguments:
618
        lease -- Lease to schedule.
619
        preemption_time -- Time at which lease must be preempted
620
        """       
621
        
622
        self.logger.info("Preempting lease #%i..." % (lease.id))
623
        self.logger.vdebug("Lease before preemption:")
624
        lease.print_contents()
625
        vmrr = lease.get_last_vmrr()
626
        
627
        if vmrr.state == ResourceReservation.STATE_SCHEDULED and vmrr.start >= preemption_time:
628
            self.logger.debug("Lease was set to start in the middle of the preempting lease.")
629
            must_cancel_and_requeue = True
630
        else:
631
            susptype = get_config().get("suspension")
632
            if susptype == constants.SUSPENSION_NONE:
633
                must_cancel_and_requeue = True
634
            else:
635
                can_suspend = self.vm_scheduler.can_suspend_at(lease, preemption_time)
636
                if not can_suspend:
637
                    self.logger.debug("Suspending the lease does not meet scheduling threshold.")
638
                    must_cancel_and_requeue = True
639
                else:
640
                    if lease.numnodes > 1 and susptype == constants.SUSPENSION_SERIAL:
641
                        self.logger.debug("Can't suspend lease because only suspension of single-node leases is allowed.")
642
                        must_cancel_and_requeue = True
643
                    else:
644
                        self.logger.debug("Lease can be suspended")
645
                        must_cancel_and_requeue = False
646
                    
647
        if must_cancel_and_requeue:
648
            self.logger.info("... lease #%i has been cancelled and requeued." % lease.id)
649
            self.preparation_scheduler.cancel_preparation(lease)
650
            self.vm_scheduler.cancel_vm(vmrr)
651
            lease.remove_vmrr(vmrr)
652
            # TODO: Take into account other states
653
            if lease.get_state() == Lease.STATE_SUSPENDED_SCHEDULED:
654
                lease.set_state(Lease.STATE_SUSPENDED_QUEUED)
655
            else:
656
                lease.set_state(Lease.STATE_QUEUED)
657
            self.__enqueue_in_order(lease)
658
        else:
659
            self.logger.info("... lease #%i will be suspended at %s." % (lease.id, preemption_time))
660
            self.vm_scheduler.preempt_vm(vmrr, preemption_time)            
661
            
662
        self.logger.vdebug("Lease after preemption:")
663
        lease.print_contents()
664
                
665
  
666
    def __enqueue(self, lease):
667
        """Queues a best-effort lease request
668
        
669
        Arguments:
670
        lease -- Lease to be queued
671
        """
672
        get_accounting().incr_counter(constants.COUNTER_QUEUESIZE, lease.id)
673
        self.queue.enqueue(lease)
674

    
675

    
676
    def __enqueue_in_order(self, lease):
677
        """Queues a lease in order (currently, time of submission)
678
        
679
        Arguments:
680
        lease -- Lease to be queued
681
        """
682
        get_accounting().incr_counter(constants.COUNTER_QUEUESIZE, lease.id)
683
        self.queue.enqueue_in_order(lease)
684

    
685

    
686
    def _handle_end_rr(self, rr):
687
        """Performs actions that have to be done each time a reservation ends.
688
        
689
        Arguments:
690
        rr -- Reservation that ended
691
        """
692
        self.slottable.remove_reservation(rr)
693
        
694

    
695
    def _handle_end_lease(self, l):
696
        """Performs actions that have to be done each time a lease ends.
697
        
698
        Arguments:
699
        lease -- Lease that has ended
700
        """
701
        l.set_state(Lease.STATE_DONE)
702
        l.duration.actual = l.duration.accumulated
703
        l.end = round_datetime(get_clock().get_time())
704
        self.preparation_scheduler.cleanup(l)
705
        self.completed_leases.add(l)
706
        self.leases.remove(l)
707
        if l.get_type() == Lease.BEST_EFFORT:
708
            get_accounting().incr_counter(constants.COUNTER_BESTEFFORTCOMPLETED, l.id)
709
        
710

    
711
class Queue(object):
712
    """A simple queue for leases
713
    
714
    This class is a simple queue container for leases, with some
715
    extra syntactic sugar added for convenience.    
716
    """    
717

    
718
    def __init__(self):
719
        self.__q = []
720
        
721
    def is_empty(self):
722
        return len(self.__q)==0
723
    
724
    def enqueue(self, r):
725
        self.__q.append(r)
726
    
727
    def dequeue(self):
728
        return self.__q.pop(0)
729
    
730
    def enqueue_in_order(self, r):
731
        self.__q.append(r)
732
        self.__q.sort(key=attrgetter("submit_time"))
733

    
734
    def length(self):
735
        return len(self.__q)
736
    
737
    def has_lease(self, lease_id):
738
        return (1 == len([l for l in self.__q if l.id == lease_id]))
739
    
740
    def get_lease(self, lease_id):
741
        return [l for l in self.__q if l.id == lease_id][0]
742
    
743
    def remove_lease(self, lease):
744
        self.__q.remove(lease)
745
    
746
    def __iter__(self):
747
        return iter(self.__q)
748
        
749
class LeaseTable(object):
750
    """A simple container for leases
751
    
752
    This class is a simple dictionary-like container for leases, with some
753
    extra syntactic sugar added for convenience.    
754
    """    
755
    
756
    def __init__(self):
757
        self.entries = {}
758
        
759
    def has_lease(self, lease_id):
760
        return self.entries.has_key(lease_id)
761
        
762
    def get_lease(self, lease_id):
763
        return self.entries[lease_id]
764
    
765
    def is_empty(self):
766
        return len(self.entries)==0
767
    
768
    def remove(self, lease):
769
        del self.entries[lease.id]
770
        
771
    def add(self, lease):
772
        self.entries[lease.id] = lease
773
        
774
    def get_leases(self, type=None):
775
        if type==None:
776
            return self.entries.values()
777
        else:
778
            return [e for e in self.entries.values() if e.get_type() == type]
779

    
780
    def get_leases_by_state(self, state):
781
        return [e for e in self.entries.values() if e.get_state() == state]
782
 
783