Project

General

Profile

root / branches / 1.1 / src / haizea / core / scheduler / lease_scheduler.py @ 704

1
# -------------------------------------------------------------------------- #
2
# Copyright 2006-2009, University of Chicago                                 #
3
# Copyright 2008-2009, Distributed Systems Architecture Group, Universidad   #
4
# Complutense de Madrid (dsa-research.org)                                   #
5
#                                                                            #
6
# Licensed under the Apache License, Version 2.0 (the "License"); you may    #
7
# not use this file except in compliance with the License. You may obtain    #
8
# a copy of the License at                                                   #
9
#                                                                            #
10
# http://www.apache.org/licenses/LICENSE-2.0                                 #
11
#                                                                            #
12
# Unless required by applicable law or agreed to in writing, software        #
13
# distributed under the License is distributed on an "AS IS" BASIS,          #
14
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.   #
15
# See the License for the specific language governing permissions and        #
16
# limitations under the License.                                             #
17
# -------------------------------------------------------------------------- #
18

    
19

    
20
"""This module provides the main classes for Haizea's lease scheduler, particularly
21
the LeaseScheduler class. This module does *not* contain VM scheduling code (i.e.,
22
the code that decides what physical hosts a VM should be mapped to), which is
23
located in the vm_scheduler module. Lease preparation code (e.g., image transfer 
24
scheduling) is located in the preparation_schedulers package. In fact, the
25
main purpose of the lease schedule is to orchestrate these preparation and VM
26
schedulers.
27

28
This module also includes a Queue class and a LeaseTable class, which are used
29
by the lease scheduler.
30
"""
31

    
32
import haizea.common.constants as constants
33
from haizea.common.utils import round_datetime, get_config, get_clock, get_policy, get_persistence
34
from haizea.core.leases import Lease
35
from haizea.core.scheduler import RescheduleLeaseException, NormalEndLeaseException, InconsistentLeaseStateError, EnactmentError, UnrecoverableError, NotSchedulableException, EarliestStartingTime
36
from haizea.core.scheduler.slottable import ResourceReservation
37
from operator import attrgetter
38

    
39
import logging
40

    
41
class LeaseScheduler(object):
42
    """The Haizea Lease Scheduler
43
    
44
    This is the main scheduling class in Haizea. It handles lease scheduling which,
45
    in turn, involves VM scheduling, preparation scheduling (such as transferring
46
    a VM image), and numerous bookkeeping operations. All these operations are
47
    handled by other classes, so this class acts mostly as an orchestrator that
48
    coordinates all the different operations involved in scheduling a lease.    
49
    """
50
    
51
    def __init__(self, vm_scheduler, preparation_scheduler, slottable, accounting):
52
        """Constructor
53
        
54
        The constructor does little more than create the lease scheduler's
55
        attributes. However, it does expect (in the arguments) a fully-constructed 
56
        VMScheduler, PreparationScheduler, SlotTable, and PolicyManager (these are 
57
        constructed in the Manager's constructor). 
58
        
59
        Arguments:
60
        vm_scheduler -- VM scheduler
61
        preparation_scheduler -- Preparation scheduler
62
        slottable -- Slottable
63
        accounting -- AccountingDataCollection object
64
        """
65
        
66
        # Logger
67
        self.logger = logging.getLogger("LSCHED")
68
        
69
        # Assign schedulers and slottable
70
        self.vm_scheduler = vm_scheduler
71
        """
72
        VM Scheduler
73
        @type: VMScheduler
74
        """
75
        self.preparation_scheduler = preparation_scheduler
76
        self.slottable = slottable
77
        self.accounting = accounting
78

    
79
        # Create other data structures
80
        self.queue = Queue()
81
        self.leases = LeaseTable()
82
        self.completed_leases = LeaseTable()
83

    
84
        # Handlers are callback functions that get called whenever a type of
85
        # resource reservation starts or ends. Each scheduler publishes the
86
        # handlers it supports through its "handlers" attributes. For example,
87
        # the VMScheduler provides _handle_start_vm and _handle_end_vm that
88
        # must be called when a VMResourceReservation start or end is encountered
89
        # in the slot table.
90
        #
91
        # Handlers are called from the process_reservations method of this class
92
        self.handlers = {}
93
        for (type, handler) in self.vm_scheduler.handlers.items():
94
            self.handlers[type] = handler
95

    
96
        for (type, handler) in self.preparation_scheduler.handlers.items():
97
            self.handlers[type] = handler
98

    
99

    
100
    def request_lease(self, lease):
101
        """Requests a leases. This is the entry point of leases into the scheduler.
102
        
103
        Request a lease. The decision on whether to accept or reject a
104
        lease is deferred to the policy manager (through its admission
105
        control policy). 
106
        
107
        If the policy determines the lease can be
108
        accepted, it is marked as "Pending". This still doesn't
109
        guarantee that the lease will be scheduled (e.g., an AR lease
110
        could still be rejected if the scheduler determines there are no
111
        resources for it; but that is a *scheduling* decision, not a admission
112
        control policy decision). The ultimate fate of the lease is determined
113
        the next time the scheduling function is called.
114
        
115
        If the policy determines the lease cannot be accepted, it is marked
116
        as rejected.
117

118
        Arguments:
119
        lease -- Lease object. Its state must be STATE_NEW.
120
        """
121
        self.logger.info("Lease #%i has been requested." % lease.id)
122
        if lease.submit_time == None:
123
            lease.submit_time = round_datetime(get_clock().get_time())
124
        lease.print_contents()
125
        lease.set_state(Lease.STATE_PENDING)
126
        if get_policy().accept_lease(lease):
127
            self.logger.info("Lease #%i has been marked as pending." % lease.id)
128
            self.leases.add(lease)
129
        else:
130
            self.logger.info("Lease #%i has not been accepted" % lease.id)
131
            lease.set_state(Lease.STATE_REJECTED)
132
            self.completed_leases.add(lease)
133
        
134
        self.accounting.at_lease_request(lease)
135
        get_persistence().persist_lease(lease)
136
        
137
    def schedule(self, nexttime):
138
        """ The main scheduling function
139
        
140
        The scheduling function looks at all pending requests and schedules them.
141
        Note that most of the actual scheduling code is contained in the
142
        __schedule_lease method and in the VMScheduler and PreparationScheduler classes.
143
        
144
        Arguments:
145
        nexttime -- The next time at which the scheduler can allocate resources.
146
        """
147
        
148
        # Get pending leases
149
        pending_leases = self.leases.get_leases_by_state(Lease.STATE_PENDING)  
150
        
151
        # Process leases that have to be queued. Right now, only best-effort leases get queued.
152
        queue_leases = [req for req in pending_leases if req.get_type() == Lease.BEST_EFFORT]
153

    
154
        # Queue leases
155
        for lease in queue_leases:
156
            self.__enqueue(lease)
157
            lease.set_state(Lease.STATE_QUEUED)
158
            self.logger.info("Queued lease request #%i, %i nodes for %s." % (lease.id, lease.numnodes, lease.duration.requested))
159
            get_persistence().persist_lease(lease)
160

    
161

    
162
        # Process leases that have to be scheduled right away. Right now, this is any
163
        # lease that is not a best-effort lease (ARs, immediate, and deadlined leases)
164
        now_leases = [req for req in pending_leases if req.get_type() != Lease.BEST_EFFORT]
165
        
166
        # Schedule leases
167
        for lease in now_leases:
168
            lease_type = Lease.type_str[lease.get_type()]
169
            self.logger.info("Scheduling lease #%i (%i nodes) -- %s" % (lease.id, lease.numnodes, lease_type))
170
            if lease.get_type() == Lease.ADVANCE_RESERVATION:
171
                self.logger.info("From %s to %s" % (lease.start.requested, lease.start.requested + lease.duration.requested))
172
            elif lease.get_type() == Lease.DEADLINE:
173
                self.logger.info("Starting at %s. Deadline: %s" % (lease.start.requested, lease.deadline))
174
                
175
            lease.print_contents()
176
       
177
            try:
178
                self.__schedule_lease(lease, nexttime=nexttime)
179
                self.logger.info("Lease #%i has been scheduled." % lease.id)
180
                lease.print_contents()
181
            except NotSchedulableException, exc:
182
                self.logger.info("Lease request #%i cannot be scheduled: %s" % (lease.id, exc.reason))
183
                lease.set_state(Lease.STATE_REJECTED)
184
                self.completed_leases.add(lease)
185
                self.accounting.at_lease_done(lease)
186
                self.leases.remove(lease)            
187
            get_persistence().persist_lease(lease)
188

    
189
                        
190
        # Process queue (i.e., traverse queue in search of leases that can be scheduled)
191
        self.__process_queue(nexttime)
192
        get_persistence().persist_queue(self.queue)
193
    
194
    def process_starting_reservations(self, nowtime):
195
        """Processes starting reservations
196
        
197
        This method checks the slottable to see if there are any reservations that are
198
        starting at "nowtime". If so, the appropriate handler is called.
199

200
        Arguments:
201
        nowtime -- Time at which to check for starting reservations.
202
        """
203

    
204
        # Find starting/ending reservations
205
        starting = self.slottable.get_reservations_starting_at(nowtime)
206
        starting = [res for res in starting if res.state == ResourceReservation.STATE_SCHEDULED]
207
        
208
        # Process starting reservations
209
        for rr in starting:
210
            lease = rr.lease
211
            # Call the appropriate handler, and catch exceptions and errors.
212
            try:
213
                self.handlers[type(rr)].on_start(lease, rr)
214
                
215
            # An InconsistentLeaseStateError is raised when the lease is in an inconsistent
216
            # state. This is usually indicative of a programming error, but not necessarily
217
            # one that affects all leases, so we just fail this lease. Note that Haizea can also
218
            # be configured to stop immediately when a lease fails.
219
            except InconsistentLeaseStateError, exc:
220
                self.fail_lease(lease, exc)
221
            # An EnactmentError is raised when the handler had to perform an enactment action
222
            # (e.g., stopping a VM), and that enactment action failed. This is currently treated
223
            # as a non-recoverable error for the lease, and the lease is failed.
224
            except EnactmentError, exc:
225
                self.fail_lease(lease, exc)
226

    
227
            # Other exceptions are not expected, and generally indicate a programming error.
228
            # Thus, they are propagated upwards to the Manager where they will make
229
            # Haizea crash and burn.
230
            
231
            get_persistence().persist_lease(lease)
232

    
233
    def process_ending_reservations(self, nowtime):
234
        """Processes ending reservations
235
        
236
        This method checks the slottable to see if there are any reservations that are
237
        ending at "nowtime". If so, the appropriate handler is called.
238

239
        Arguments:
240
        nowtime -- Time at which to check for starting/ending reservations.
241
        """
242

    
243
        # Find starting/ending reservations
244
        ending = self.slottable.get_reservations_ending_at(nowtime)
245
        ending = [res for res in ending if res.state == ResourceReservation.STATE_ACTIVE]
246

    
247
        # Process ending reservations
248
        for rr in ending:
249
            lease = rr.lease
250
            self._handle_end_rr(rr)
251
            
252
            # Call the appropriate handler, and catch exceptions and errors.
253
            try:
254
                self.handlers[type(rr)].on_end(lease, rr)
255
                
256
            # A RescheduleLeaseException indicates that the lease has to be rescheduled
257
            except RescheduleLeaseException, exc:
258
                # Currently, the only leases that get rescheduled are best-effort leases,
259
                # once they've been suspended.
260
                if rr.lease.get_type() == Lease.BEST_EFFORT:
261
                    if lease.get_state() == Lease.STATE_SUSPENDED_PENDING:
262
                        # Put back in the queue, in the same order it arrived
263
                        self.__enqueue_in_order(lease)
264
                        lease.set_state(Lease.STATE_SUSPENDED_QUEUED)
265
                        get_persistence().persist_queue(self.queue)
266
                    else:
267
                        raise InconsistentLeaseStateError(lease, doing = "rescheduling best-effort lease")
268
                    
269
            # A NormalEndLeaseException indicates that the end of this reservations marks
270
            # the normal end of the lease.
271
            except NormalEndLeaseException, msg:
272
                self._handle_end_lease(lease)
273
                
274
            # An InconsistentLeaseStateError is raised when the lease is in an inconsistent
275
            # state. This is usually indicative of a programming error, but not necessarily
276
            # one that affects all leases, so we just fail this lease. Note that Haizea can also
277
            # be configured to stop immediately when a lease fails.
278
            except InconsistentLeaseStateError, exc:
279
                self.fail_lease(lease, exc)
280
                
281
            # An EnactmentError is raised when the handler had to perform an enactment action
282
            # (e.g., stopping a VM), and that enactment action failed. This is currently treated
283
            # as a non-recoverable error for the lease, and the lease is failed.
284
            except EnactmentError, exc:
285
                self.fail_lease(lease, exc)
286
                
287
            # Other exceptions are not expected, and generally indicate a programming error.
288
            # Thus, they are propagated upwards to the Manager where they will make
289
            # Haizea crash and burn.
290
            
291
            get_persistence().persist_lease(lease)
292

    
293
    def get_lease_by_id(self, lease_id):
294
        """Gets a lease with the given ID
295
        
296
        This method is useful for UIs (like the CLI) that operate on the lease ID.
297
        If no lease with a given ID is found, None is returned.
298

299
        Arguments:
300
        lease_id -- The ID of the lease
301
        """
302
        if not self.leases.has_lease(lease_id):
303
            return None
304
        else:
305
            return self.leases.get_lease(lease_id)
306

    
307
    def cancel_lease(self, lease):
308
        """Cancels a lease.
309
        
310
        Arguments:
311
        lease -- Lease to cancel
312
        """
313
        time = get_clock().get_time()
314
        
315
        self.logger.info("Cancelling lease %i..." % lease.id)
316
            
317
        lease_state = lease.get_state()
318
        
319
        if lease_state == Lease.STATE_PENDING:
320
            # If a lease is pending, we just need to change its state and
321
            # remove it from the lease table. Since this is done at the
322
            # end of this method, we do nothing here.
323
            pass
324

    
325
        elif lease_state == Lease.STATE_ACTIVE:
326
            # If a lease is active, that means we have to shut down its VMs to cancel it.
327
            self.logger.info("Lease %i is active. Stopping active reservation..." % lease.id)
328
            vmrr = lease.get_active_vmrrs(time)[0]
329
            self._handle_end_rr(vmrr)
330
            self.vm_scheduler._handle_unscheduled_end_vm(lease, vmrr)
331
            
332
            # Force machines to shut down
333
            try:
334
                self.vm_scheduler.resourcepool.stop_vms(lease, vmrr)
335
            except EnactmentError, exc:
336
                self.logger.error("Enactment error when shutting down VMs.")
337
                # Right now, this is a non-recoverable error, so we just
338
                # propagate it upwards.
339
                # In the future, it may be possible to react to these
340
                # kind of errors.
341
                raise            
342

    
343
        elif lease_state in [Lease.STATE_SCHEDULED, Lease.STATE_SUSPENDED_SCHEDULED, Lease.STATE_READY, Lease.STATE_RESUMED_READY]:
344
            # If a lease is scheduled or ready, we just need to cancel all future reservations
345
            # for that lease
346
            self.logger.info("Lease %i is scheduled. Cancelling reservations." % lease.id)
347
            rrs = lease.get_scheduled_reservations()
348
            for r in rrs:
349
                self.slottable.remove_reservation(r)
350
            
351
        elif lease_state in [Lease.STATE_QUEUED, Lease.STATE_SUSPENDED_QUEUED]:
352
            # If a lease is in the queue, waiting to be scheduled, cancelling
353
            # just requires removing it from the queue
354
            
355
            self.logger.info("Lease %i is in the queue. Removing..." % lease.id)
356
            self.queue.remove_lease(lease)
357
            get_persistence().persist_queue(self.queue)
358
        else:
359
            # Cancelling in any of the other states is currently unsupported
360
            raise InconsistentLeaseStateError(lease, doing = "cancelling the VM")
361
            
362
        # Change state, and remove from lease table
363
        lease.set_state(Lease.STATE_CANCELLED)
364
        self.completed_leases.add(lease)
365
        self.leases.remove(lease)
366
        get_persistence().persist_lease(lease)
367

    
368
    
369
    def fail_lease(self, lease, exc=None):
370
        """Transitions a lease to a failed state, and does any necessary cleaning up
371
        
372
        Arguments:
373
        lease -- Lease to fail
374
        exc -- The exception that made the lease fail
375
        """
376
        treatment = get_config().get("lease-failure-handling")
377
        
378
        if treatment == constants.ONFAILURE_CANCEL:
379
            # In this case, a lease failure is handled by cancelling the lease,
380
            # but allowing Haizea to continue to run normally.
381
            rrs = lease.get_scheduled_reservations()
382
            for r in rrs:
383
                self.slottable.remove_reservation(r)
384
            lease.set_state(Lease.STATE_FAIL)
385
            self.completed_leases.add(lease)
386
            self.leases.remove(lease)
387
            get_persistence().persist_lease(lease)
388
        elif treatment == constants.ONFAILURE_EXIT or treatment == constants.ONFAILURE_EXIT_RAISE:
389
            # In this case, a lease failure makes Haizea exit. This is useful when debugging,
390
            # so we can immediately know about any errors.
391
            raise UnrecoverableError(exc)
392
            
393
    
394
    def notify_event(self, lease, event):
395
        """Notifies an event that affects a lease.
396
        
397
        This is the entry point of asynchronous events into the scheduler. Currently,
398
        the only supported event is the premature end of a VM (i.e., before its
399
        scheduled end). Other events will emerge when we integrate Haizea with OpenNebula 1.4,
400
        since that version will support sending asynchronous events to Haizea.
401
        
402
        Arguments:
403
        lease -- Lease the event refers to
404
        event -- Event type
405
        """
406
        time = get_clock().get_time()
407
        if event == constants.EVENT_END_VM:
408
            vmrr = lease.get_last_vmrr()
409
            self._handle_end_rr(vmrr)
410
            # TODO: Exception handling
411
            self.vm_scheduler._handle_unscheduled_end_vm(lease, vmrr)
412
            self._handle_end_lease(lease)
413
            get_persistence().persist_lease(lease)
414
            
415
            # We need to reevaluate the schedule to see if there are any 
416
            # leases scheduled in the future that could be rescheduled
417
            # to start earlier
418
            nexttime = get_clock().get_next_schedulable_time()
419
            self.reevaluate_schedule(nexttime)
420

    
421

    
422
    def reevaluate_schedule(self, nexttime):
423
        """Reevaluates the schedule.
424
        
425
        This method can be called whenever resources are freed up
426
        unexpectedly (e.g., a lease than ends earlier than expected))
427
        to check if any leases scheduled in the future could be
428
        rescheduled to start earlier on the freed up resources.
429
        
430
        Currently, this method only checks if best-effort leases
431
        scheduled in the future (using a backfilling algorithm)
432
        can be rescheduled
433
        
434
        Arguments:
435
        nexttime -- The next time at which the scheduler can allocate resources.
436
        """        
437
        future = self.vm_scheduler.get_future_reschedulable_leases()
438
        for l in future:
439
            # We can only reschedule leases in the following four states
440
            if l.get_state() in (Lease.STATE_PREPARING, Lease.STATE_READY, Lease.STATE_SCHEDULED, Lease.STATE_SUSPENDED_SCHEDULED):
441
                # For each reschedulable lease already scheduled in the
442
                # future, we cancel the lease's preparantion and
443
                # the last scheduled VM.
444
                vmrr = l.get_last_vmrr()
445
                self.preparation_scheduler.cancel_preparation(l)
446
                self.vm_scheduler.cancel_vm(vmrr)
447
                l.remove_vmrr(vmrr)
448
                if l.get_state() in (Lease.STATE_READY, Lease.STATE_SCHEDULED, Lease.STATE_PREPARING):
449
                    l.set_state(Lease.STATE_PENDING)
450
                elif l.get_state() == Lease.STATE_SUSPENDED_SCHEDULED:
451
                    l.set_state(Lease.STATE_SUSPENDED_PENDING)
452

    
453
                # At this point, the lease just looks like a regular
454
                # pending lease that can be handed off directly to the
455
                # __schedule_lease method.
456
                # TODO: We should do exception handling here. However,
457
                # since we can only reschedule best-effort leases that were
458
                # originally schedule in the future, the scheduling function 
459
                # should always be able to schedule the lease (worst-case 
460
                # scenario is that it simply replicates the previous schedule)
461
                self.__schedule_lease(l, nexttime)
462

    
463

    
464
    def is_queue_empty(self):
465
        """Return True is the queue is empty, False otherwise"""
466
        return self.queue.is_empty()
467

    
468
    
469
    def exists_scheduled_leases(self):
470
        """Return True if there are any leases scheduled in the future"""
471
        return not self.slottable.is_empty()    
472

    
473
            
474
    def __process_queue(self, nexttime):
475
        """ Traverses the queue in search of leases that can be scheduled.
476
        
477
        This method processes the queue in order, but takes into account that
478
        it may be possible to schedule leases in the future (using a 
479
        backfilling algorithm)
480
        
481
        Arguments:
482
        nexttime -- The next time at which the scheduler can allocate resources.
483
        """        
484
        
485
        done = False
486
        newqueue = Queue()
487
        while not done and not self.is_queue_empty():
488
            if not self.vm_scheduler.can_schedule_in_future() and self.slottable.is_full(nexttime, restype = constants.RES_CPU):
489
                self.logger.debug("Used up all future reservations and slot table is full. Skipping rest of queue.")
490
                done = True
491
            else:
492
                lease = self.queue.dequeue()
493
                try:
494
                    self.logger.info("Next request in the queue is lease %i. Attempting to schedule..." % lease.id)
495
                    lease.print_contents()
496
                    self.__schedule_lease(lease, nexttime)
497
                except NotSchedulableException, msg:
498
                    # Put back on queue
499
                    newqueue.enqueue(lease)
500
                    self.logger.info("Lease %i could not be scheduled at this time." % lease.id)
501
                    if get_config().get("backfilling") == constants.BACKFILLING_OFF:
502
                        done = True
503
        
504
        for lease in self.queue:
505
            newqueue.enqueue(lease)
506
        
507
        self.queue = newqueue 
508
    
509

    
510
    def __schedule_lease(self, lease, nexttime):            
511
        """ Schedules a lease.
512
        
513
        This method orchestrates the preparation and VM scheduler to
514
        schedule a lease.
515
        
516
        Arguments:
517
        lease -- Lease to schedule.
518
        nexttime -- The next time at which the scheduler can allocate resources.
519
        """       
520
                
521
        lease_state = lease.get_state()
522
        migration = get_config().get("migration")
523
        
524
        # Determine earliest start time in each node
525
        if lease_state == Lease.STATE_PENDING or lease_state == Lease.STATE_QUEUED:
526
            # This lease might require preparation. Ask the preparation
527
            # scheduler for the earliest starting time.
528
            earliest = self.preparation_scheduler.find_earliest_starting_times(lease, nexttime)
529
        elif lease_state == Lease.STATE_SUSPENDED_PENDING or lease_state == Lease.STATE_SUSPENDED_QUEUED:
530
            # This lease may have to be migrated.
531
            # We have to ask both the preparation scheduler and the VM
532
            # scheduler what would be the earliest possible starting time
533
            # on each node, assuming we have to transfer files between
534
            # nodes.
535

    
536
            node_ids = self.slottable.nodes.keys()
537
            earliest = {}
538
            if migration == constants.MIGRATE_NO:
539
                # If migration is disabled, the earliest starting time
540
                # is simply nexttime.
541
                for node in node_ids:
542
                    earliest[node] = EarliestStartingTime(nexttime, EarliestStartingTime.EARLIEST_NOPREPARATION)
543
            else:
544
                # Otherwise, we ask the preparation scheduler and the VM
545
                # scheduler how long it would take them to migrate the
546
                # lease state.
547
                prep_migr_time = self.preparation_scheduler.estimate_migration_time(lease)            
548
                vm_migr_time = self.vm_scheduler.estimate_migration_time(lease)
549
                for node in node_ids:
550
                    earliest[node] = EarliestStartingTime(nexttime + prep_migr_time + vm_migr_time, EarliestStartingTime.EARLIEST_MIGRATION)
551
        else:
552
            raise InconsistentLeaseStateError(lease, doing = "scheduling a best-effort lease")
553

    
554
        # Now, we give the lease to the VM scheduler, along with the
555
        # earliest possible starting times. If the VM scheduler can
556
        # schedule VMs for this lease, it will return a resource reservation
557
        # that we can add to the slot table, along with a list of
558
        # leases that have to be preempted.
559
        # If the VM scheduler can't schedule the VMs, it will throw an
560
        # exception (we don't catch it here, and it is just thrown up
561
        # to the calling method.
562
        (vmrr, preemptions) = self.vm_scheduler.schedule(lease, nexttime, earliest)
563
        
564
        ## BEGIN NOT-FIT-FOR-PRODUCTION CODE
565
        ## Pricing shouldn't live here. Instead, it should happen before a lease is accepted
566
        ## It is being done here in the interest of developing a first prototype
567
        ## that incorporates pricing in simulations (but not interactively yet)
568
        
569
        # Call pricing policy
570
        lease_price = get_policy().price_lease(lease, preemptions)
571
        
572
        # Determine whether to accept price or not (this in particular
573
        # should happen in the lease admission step)
574
        if lease.extras.has_key("simul_pricemarkup"):
575
            markup = float(lease.extras["simul_pricemarkup"])
576
            if get_config().get("policy.pricing") != "free":
577
                fair_price = get_policy().pricing.get_fair_price(lease)
578
                if lease_price > fair_price * markup:
579
                    lease.price = -1
580
                    raise NotSchedulableException, "Lease priced at %.2f. User is only willing to pay %.2f" % (lease_price, fair_price * markup)
581
                else:
582
                    lease.price = lease_price
583
                    lease.extras["fair_price"] = fair_price
584
        
585
        ## END NOT-FIT-FOR-PRODUCTION CODE
586
                                
587
        # If scheduling the lease involves preempting other leases,
588
        # go ahead and preempt them.
589
        if len(preemptions) > 0:
590
            self.logger.info("Must preempt leases %s to make room for lease #%i" % ([l.id for l in preemptions], lease.id))
591
            for l in preemptions:
592
                self.__preempt_lease(l, preemption_time=vmrr.start)
593
                
594
        # Schedule lease preparation
595
        is_ready = False
596
        preparation_rrs = []
597
        if lease_state in (Lease.STATE_SUSPENDED_PENDING, Lease.STATE_SUSPENDED_QUEUED) and migration != constants.MIGRATE_NO:
598
            # The lease might require migration
599
            migr_rrs = self.preparation_scheduler.schedule_migration(lease, vmrr, nexttime)
600
            if len(migr_rrs) > 0:
601
                end_migr = migr_rrs[-1].end
602
            else:
603
                end_migr = nexttime
604
            migr_rrs += self.vm_scheduler.schedule_migration(lease, vmrr, end_migr)
605
            migr_rrs.reverse()
606
            for migr_rr in migr_rrs:
607
                vmrr.pre_rrs.insert(0, migr_rr)
608
            if len(migr_rrs) == 0:
609
                is_ready = True
610
        elif lease_state in (Lease.STATE_SUSPENDED_PENDING, Lease.STATE_SUSPENDED_QUEUED) and migration == constants.MIGRATE_NO:
611
            # No migration means the lease is ready
612
            is_ready = True
613
        elif lease_state in (Lease.STATE_PENDING, Lease.STATE_QUEUED):
614
            # The lease might require initial preparation
615
            preparation_rrs, is_ready = self.preparation_scheduler.schedule(lease, vmrr, earliest)
616

    
617
        # At this point, the lease is feasible.
618
        # Commit changes by adding RRs to lease and to slot table
619
        
620
        # Add preparation RRs (if any) to lease
621
        for rr in preparation_rrs:
622
            lease.append_preparationrr(rr)
623
        
624
        # Add VMRR to lease
625
        lease.append_vmrr(vmrr)
626
        
627

    
628
        # Add resource reservations to slottable
629
        
630
        # Preparation RRs (if any)
631
        for rr in preparation_rrs:
632
            self.slottable.add_reservation(rr)
633
        
634
        # Pre-VM RRs (if any)
635
        for rr in vmrr.pre_rrs:
636
            self.slottable.add_reservation(rr)
637
            
638
        # VM
639
        self.slottable.add_reservation(vmrr)
640
        
641
        # Post-VM RRs (if any)
642
        for rr in vmrr.post_rrs:
643
            self.slottable.add_reservation(rr)
644
          
645
        # Change lease state
646
        if lease_state == Lease.STATE_PENDING or lease_state == Lease.STATE_QUEUED:
647
            lease.set_state(Lease.STATE_SCHEDULED)
648
            if is_ready:
649
                lease.set_state(Lease.STATE_READY)
650
        elif lease_state == Lease.STATE_SUSPENDED_PENDING or lease_state == Lease.STATE_SUSPENDED_QUEUED:
651
            lease.set_state(Lease.STATE_SUSPENDED_SCHEDULED)
652

    
653
        get_persistence().persist_lease(lease)
654

    
655
        lease.print_contents()
656

    
657
        
658
    def __preempt_lease(self, lease, preemption_time):
659
        """ Preempts a lease.
660
        
661
        This method preempts a lease such that any resources allocated
662
        to that lease after a given time are freed up. This may require
663
        scheduling the lease to suspend before that time, or cancelling
664
        the lease altogether.
665
        
666
        Arguments:
667
        lease -- Lease to schedule.
668
        preemption_time -- Time at which lease must be preempted
669
        """       
670
        
671
        self.logger.info("Preempting lease #%i..." % (lease.id))
672
        self.logger.vdebug("Lease before preemption:")
673
        lease.print_contents()
674
        vmrr = lease.get_last_vmrr()
675
        
676
        if vmrr.state == ResourceReservation.STATE_SCHEDULED and vmrr.start >= preemption_time:
677
            self.logger.debug("Lease was set to start in the middle of the preempting lease.")
678
            must_cancel_and_requeue = True
679
        else:
680
            susptype = get_config().get("suspension")
681
            if susptype == constants.SUSPENSION_NONE:
682
                must_cancel_and_requeue = True
683
            else:
684
                can_suspend = self.vm_scheduler.can_suspend_at(lease, preemption_time)
685
                if not can_suspend:
686
                    self.logger.debug("Suspending the lease does not meet scheduling threshold.")
687
                    must_cancel_and_requeue = True
688
                else:
689
                    if lease.numnodes > 1 and susptype == constants.SUSPENSION_SERIAL:
690
                        self.logger.debug("Can't suspend lease because only suspension of single-node leases is allowed.")
691
                        must_cancel_and_requeue = True
692
                    else:
693
                        self.logger.debug("Lease can be suspended")
694
                        must_cancel_and_requeue = False
695
                    
696
        if must_cancel_and_requeue:
697
            self.logger.info("... lease #%i has been cancelled and requeued." % lease.id)
698
            self.preparation_scheduler.cancel_preparation(lease)
699
            self.vm_scheduler.cancel_vm(vmrr)
700
            lease.remove_vmrr(vmrr)
701
            # TODO: Take into account other states
702
            if lease.get_state() == Lease.STATE_SUSPENDED_SCHEDULED:
703
                lease.set_state(Lease.STATE_SUSPENDED_QUEUED)
704
            else:
705
                lease.set_state(Lease.STATE_QUEUED)
706
            self.__enqueue_in_order(lease)
707
        else:
708
            self.logger.info("... lease #%i will be suspended at %s." % (lease.id, preemption_time))
709
            self.vm_scheduler.preempt_vm(vmrr, preemption_time)            
710
            
711
        get_persistence().persist_lease(lease)
712

    
713
        self.logger.vdebug("Lease after preemption:")
714
        lease.print_contents()
715
                
716
  
717
    def __enqueue(self, lease):
718
        """Queues a best-effort lease request
719
        
720
        Arguments:
721
        lease -- Lease to be queued
722
        """
723
        self.queue.enqueue(lease)
724

    
725

    
726
    def __enqueue_in_order(self, lease):
727
        """Queues a lease in order (currently, time of submission)
728
        
729
        Arguments:
730
        lease -- Lease to be queued
731
        """
732
        self.queue.enqueue_in_order(lease)
733

    
734

    
735
    def _handle_end_rr(self, rr):
736
        """Performs actions that have to be done each time a reservation ends.
737
        
738
        Arguments:
739
        rr -- Reservation that ended
740
        """
741
        self.slottable.remove_reservation(rr)
742
        
743

    
744
    def _handle_end_lease(self, l):
745
        """Performs actions that have to be done each time a lease ends.
746
        
747
        Arguments:
748
        lease -- Lease that has ended
749
        """
750
        l.set_state(Lease.STATE_DONE)
751
        l.duration.actual = l.duration.accumulated
752
        l.end = round_datetime(get_clock().get_time())
753
        self.preparation_scheduler.cleanup(l)
754
        self.completed_leases.add(l)
755
        self.leases.remove(l)
756
        self.accounting.at_lease_done(l)
757

    
758
        
759

    
760
class Queue(object):
761
    """A simple queue for leases
762
    
763
    This class is a simple queue container for leases, with some
764
    extra syntactic sugar added for convenience.    
765
    """    
766

    
767
    def __init__(self):
768
        self.__q = []
769
        
770
    def is_empty(self):
771
        return len(self.__q)==0
772
    
773
    def enqueue(self, r):
774
        self.__q.append(r)
775
    
776
    def dequeue(self):
777
        return self.__q.pop(0)
778
    
779
    def enqueue_in_order(self, r):
780
        self.__q.append(r)
781
        self.__q.sort(key=attrgetter("submit_time"))
782

    
783
    def length(self):
784
        return len(self.__q)
785
    
786
    def has_lease(self, lease_id):
787
        return (1 == len([l for l in self.__q if l.id == lease_id]))
788
    
789
    def get_lease(self, lease_id):
790
        return [l for l in self.__q if l.id == lease_id][0]
791
    
792
    def remove_lease(self, lease):
793
        self.__q.remove(lease)
794
    
795
    def __iter__(self):
796
        return iter(self.__q)
797
        
798
class LeaseTable(object):
799
    """A simple container for leases
800
    
801
    This class is a simple dictionary-like container for leases, with some
802
    extra syntactic sugar added for convenience.    
803
    """    
804
    
805
    def __init__(self):
806
        self.entries = {}
807
        
808
    def has_lease(self, lease_id):
809
        return self.entries.has_key(lease_id)
810
        
811
    def get_lease(self, lease_id):
812
        return self.entries[lease_id]
813
    
814
    def is_empty(self):
815
        return len(self.entries)==0
816
    
817
    def remove(self, lease):
818
        del self.entries[lease.id]
819
        
820
    def add(self, lease):
821
        self.entries[lease.id] = lease
822
        
823
    def get_leases(self, type=None):
824
        if type==None:
825
            return self.entries.values()
826
        else:
827
            return [e for e in self.entries.values() if e.get_type() == type]
828

    
829
    def get_leases_by_state(self, state):
830
        return [e for e in self.entries.values() if e.get_state() == state]
831
 
832