Project

General

Profile

root / branches / 1.1 / src / haizea / core / scheduler / lease_scheduler.py @ 842

1
# -------------------------------------------------------------------------- #
2
# Copyright 2006-2009, University of Chicago                                 #
3
# Copyright 2008-2009, Distributed Systems Architecture Group, Universidad   #
4
# Complutense de Madrid (dsa-research.org)                                   #
5
#                                                                            #
6
# Licensed under the Apache License, Version 2.0 (the "License"); you may    #
7
# not use this file except in compliance with the License. You may obtain    #
8
# a copy of the License at                                                   #
9
#                                                                            #
10
# http://www.apache.org/licenses/LICENSE-2.0                                 #
11
#                                                                            #
12
# Unless required by applicable law or agreed to in writing, software        #
13
# distributed under the License is distributed on an "AS IS" BASIS,          #
14
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.   #
15
# See the License for the specific language governing permissions and        #
16
# limitations under the License.                                             #
17
# -------------------------------------------------------------------------- #
18

    
19

    
20
"""This module provides the main classes for Haizea's lease scheduler, particularly
21
the LeaseScheduler class. This module does *not* contain VM scheduling code (i.e.,
22
the code that decides what physical hosts a VM should be mapped to), which is
23
located in the vm_scheduler module. Lease preparation code (e.g., image transfer 
24
scheduling) is located in the preparation_schedulers package. In fact, the
25
main purpose of the lease schedule is to orchestrate these preparation and VM
26
schedulers.
27

28
This module also includes a Queue class and a LeaseTable class, which are used
29
by the lease scheduler.
30
"""
31

    
32
import haizea.common.constants as constants
33
from haizea.common.utils import round_datetime, get_config, get_clock, get_policy, get_persistence
34
from haizea.core.leases import Lease
35
from haizea.core.scheduler import RescheduleLeaseException, NormalEndLeaseException, InconsistentLeaseStateError, EnactmentError, UnrecoverableError, NotSchedulableException, EarliestStartingTime
36
from haizea.core.scheduler.vm_scheduler import VMResourceReservation
37
from haizea.core.scheduler.slottable import ResourceReservation
38
from operator import attrgetter
39

    
40
import logging
41
from mx.DateTime import DateTimeDelta
42

    
43
class LeaseScheduler(object):
44
    """The Haizea Lease Scheduler
45
    
46
    This is the main scheduling class in Haizea. It handles lease scheduling which,
47
    in turn, involves VM scheduling, preparation scheduling (such as transferring
48
    a VM image), and numerous bookkeeping operations. All these operations are
49
    handled by other classes, so this class acts mostly as an orchestrator that
50
    coordinates all the different operations involved in scheduling a lease.    
51
    """
52
    
53
    def __init__(self, vm_scheduler, preparation_scheduler, slottable, accounting):
54
        """Constructor
55
        
56
        The constructor does little more than create the lease scheduler's
57
        attributes. However, it does expect (in the arguments) a fully-constructed 
58
        VMScheduler, PreparationScheduler, SlotTable, and PolicyManager (these are 
59
        constructed in the Manager's constructor). 
60
        
61
        Arguments:
62
        vm_scheduler -- VM scheduler
63
        preparation_scheduler -- Preparation scheduler
64
        slottable -- Slottable
65
        accounting -- AccountingDataCollection object
66
        """
67
        
68
        # Logger
69
        self.logger = logging.getLogger("LSCHED")
70
        
71
        # Assign schedulers and slottable
72
        self.vm_scheduler = vm_scheduler
73
        """
74
        VM Scheduler
75
        @type: VMScheduler
76
        """
77
        self.preparation_scheduler = preparation_scheduler
78
        self.slottable = slottable
79
        self.accounting = accounting
80

    
81
        # Create other data structures
82
        self.queue = Queue()
83
        self.leases = LeaseTable()
84
        self.completed_leases = LeaseTable()
85

    
86
        # Handlers are callback functions that get called whenever a type of
87
        # resource reservation starts or ends. Each scheduler publishes the
88
        # handlers it supports through its "handlers" attributes. For example,
89
        # the VMScheduler provides _handle_start_vm and _handle_end_vm that
90
        # must be called when a VMResourceReservation start or end is encountered
91
        # in the slot table.
92
        #
93
        # Handlers are called from the process_reservations method of this class
94
        self.handlers = {}
95
        for (type, handler) in self.vm_scheduler.handlers.items():
96
            self.handlers[type] = handler
97

    
98
        for (type, handler) in self.preparation_scheduler.handlers.items():
99
            self.handlers[type] = handler
100

    
101

    
102
    def request_lease(self, lease):
103
        """Requests a leases. This is the entry point of leases into the scheduler.
104
        
105
        Request a lease. The decision on whether to accept or reject a
106
        lease is deferred to the policy manager (through its admission
107
        control policy). 
108
        
109
        If the policy determines the lease can be
110
        accepted, it is marked as "Pending". This still doesn't
111
        guarantee that the lease will be scheduled (e.g., an AR lease
112
        could still be rejected if the scheduler determines there are no
113
        resources for it; but that is a *scheduling* decision, not a admission
114
        control policy decision). The ultimate fate of the lease is determined
115
        the next time the scheduling function is called.
116
        
117
        If the policy determines the lease cannot be accepted, it is marked
118
        as rejected.
119

120
        Arguments:
121
        lease -- Lease object. Its state must be STATE_NEW.
122
        """
123
        self.logger.info("Lease #%i has been requested." % lease.id)
124
        if lease.submit_time == None:
125
            lease.submit_time = round_datetime(get_clock().get_time())
126
        lease.print_contents()
127
        lease.set_state(Lease.STATE_PENDING)
128
        if get_policy().accept_lease(lease):
129
            self.logger.info("Lease #%i has been marked as pending." % lease.id)
130
            self.leases.add(lease)
131
        else:
132
            self.logger.info("Lease #%i has not been accepted" % lease.id)
133
            lease.set_state(Lease.STATE_REJECTED)
134
            self.completed_leases.add(lease)
135
        
136
        self.accounting.at_lease_request(lease)
137
        get_persistence().persist_lease(lease)
138
        
139
    def schedule(self, nexttime):
140
        """ The main scheduling function
141
        
142
        The scheduling function looks at all pending requests and schedules them.
143
        Note that most of the actual scheduling code is contained in the
144
        __schedule_lease method and in the VMScheduler and PreparationScheduler classes.
145
        
146
        Arguments:
147
        nexttime -- The next time at which the scheduler can allocate resources.
148
        """
149
        
150
        # Get pending leases
151
        pending_leases = self.leases.get_leases_by_state(Lease.STATE_PENDING)  
152
        
153
        # Process leases that have to be queued. Right now, only best-effort leases get queued.
154
        queue_leases = [req for req in pending_leases if req.get_type() == Lease.BEST_EFFORT]
155

    
156
        # Queue leases
157
        for lease in queue_leases:
158
            self.__enqueue(lease)
159
            lease.set_state(Lease.STATE_QUEUED)
160
            self.logger.info("Queued lease request #%i, %i nodes for %s." % (lease.id, lease.numnodes, lease.duration.requested))
161
            get_persistence().persist_lease(lease)
162

    
163

    
164
        # Process leases that have to be scheduled right away. Right now, this is any
165
        # lease that is not a best-effort lease (ARs, immediate, and deadlined leases)
166
        now_leases = [req for req in pending_leases if req.get_type() != Lease.BEST_EFFORT]
167
        
168
        # Schedule leases
169
        for lease in now_leases:
170
            lease_type = Lease.type_str[lease.get_type()]
171
            self.logger.info("Scheduling lease #%i (%i nodes) -- %s" % (lease.id, lease.numnodes, lease_type))
172
            if lease.get_type() == Lease.ADVANCE_RESERVATION:
173
                self.logger.info("From %s to %s" % (lease.start.requested, lease.start.requested + lease.duration.requested))
174
            elif lease.get_type() == Lease.DEADLINE:
175
                self.logger.info("Starting at %s. Deadline: %s" % (lease.start.requested, lease.deadline))
176
                
177
            lease.print_contents()
178
       
179
            try:
180
                self.__schedule_lease(lease, nexttime=nexttime)
181
                self.logger.info("Lease #%i has been scheduled." % lease.id)
182
                ## BEGIN NOT-FIT-FOR-PRODUCTION CODE
183
                ## This should happen when the lease is requested.
184
                get_policy().pricing.feedback(lease)
185
                ## END NOT-FIT-FOR-PRODUCTION CODE
186
                lease.print_contents()
187
            except NotSchedulableException, exc:
188
                self.logger.info("Lease request #%i cannot be scheduled: %s" % (lease.id, exc.reason))
189
                ## BEGIN NOT-FIT-FOR-PRODUCTION CODE
190
                ## This should happen when the lease is requested.
191
                if lease.price == -1:
192
                    lease.set_state(Lease.STATE_REJECTED_BY_USER)
193
                else:
194
                    lease.set_state(Lease.STATE_REJECTED)                    
195
                self.completed_leases.add(lease)
196
                self.accounting.at_lease_done(lease)
197
                ## BEGIN NOT-FIT-FOR-PRODUCTION CODE
198
                ## This should happen when the lease is requested.
199
                get_policy().pricing.feedback(lease)
200
                ## END NOT-FIT-FOR-PRODUCTION CODE
201
                self.leases.remove(lease)            
202
            get_persistence().persist_lease(lease)
203

    
204
                        
205
        # Process queue (i.e., traverse queue in search of leases that can be scheduled)
206
        self.__process_queue(nexttime)
207
        get_persistence().persist_queue(self.queue)
208
    
209
    def process_starting_reservations(self, nowtime):
210
        """Processes starting reservations
211
        
212
        This method checks the slottable to see if there are any reservations that are
213
        starting at "nowtime". If so, the appropriate handler is called.
214

215
        Arguments:
216
        nowtime -- Time at which to check for starting reservations.
217
        """
218

    
219
        # Find starting/ending reservations
220
        starting = self.slottable.get_reservations_starting_at(nowtime)
221
        starting = [res for res in starting if res.state == ResourceReservation.STATE_SCHEDULED]
222
        
223
        # Process starting reservations
224
        for rr in starting:
225
            lease = rr.lease
226
            # Call the appropriate handler, and catch exceptions and errors.
227
            try:
228
                self.handlers[type(rr)].on_start(lease, rr)
229
                
230
            # An InconsistentLeaseStateError is raised when the lease is in an inconsistent
231
            # state. This is usually indicative of a programming error, but not necessarily
232
            # one that affects all leases, so we just fail this lease. Note that Haizea can also
233
            # be configured to stop immediately when a lease fails.
234
            except InconsistentLeaseStateError, exc:
235
                self.fail_lease(lease, exc)
236
            # An EnactmentError is raised when the handler had to perform an enactment action
237
            # (e.g., stopping a VM), and that enactment action failed. This is currently treated
238
            # as a non-recoverable error for the lease, and the lease is failed.
239
            except EnactmentError, exc:
240
                self.fail_lease(lease, exc)
241

    
242
            # Other exceptions are not expected, and generally indicate a programming error.
243
            # Thus, they are propagated upwards to the Manager where they will make
244
            # Haizea crash and burn.
245
            
246
            get_persistence().persist_lease(lease)
247

    
248
    def process_ending_reservations(self, nowtime):
249
        """Processes ending reservations
250
        
251
        This method checks the slottable to see if there are any reservations that are
252
        ending at "nowtime". If so, the appropriate handler is called.
253

254
        Arguments:
255
        nowtime -- Time at which to check for starting/ending reservations.
256
        """
257

    
258
        # Find starting/ending reservations
259
        ending = self.slottable.get_reservations_ending_at(nowtime)
260
        ending = [res for res in ending if res.state == ResourceReservation.STATE_ACTIVE]
261

    
262
        # Process ending reservations
263
        for rr in ending:
264
            lease = rr.lease
265
            self._handle_end_rr(rr)
266
            
267
            # Call the appropriate handler, and catch exceptions and errors.
268
            try:
269
                self.handlers[type(rr)].on_end(lease, rr)
270
                
271
            # A RescheduleLeaseException indicates that the lease has to be rescheduled
272
            except RescheduleLeaseException, exc:
273
                # Currently, the only leases that get rescheduled are best-effort leases,
274
                # once they've been suspended.
275
                if rr.lease.get_type() == Lease.BEST_EFFORT:
276
                    if lease.get_state() == Lease.STATE_SUSPENDED_PENDING:
277
                        # Put back in the queue, in the same order it arrived
278
                        self.__enqueue_in_order(lease)
279
                        lease.set_state(Lease.STATE_SUSPENDED_QUEUED)
280
                        get_persistence().persist_queue(self.queue)
281
                    else:
282
                        raise InconsistentLeaseStateError(lease, doing = "rescheduling best-effort lease")
283
                    
284
            # A NormalEndLeaseException indicates that the end of this reservations marks
285
            # the normal end of the lease.
286
            except NormalEndLeaseException, msg:
287
                self._handle_end_lease(lease)
288
                
289
            # An InconsistentLeaseStateError is raised when the lease is in an inconsistent
290
            # state. This is usually indicative of a programming error, but not necessarily
291
            # one that affects all leases, so we just fail this lease. Note that Haizea can also
292
            # be configured to stop immediately when a lease fails.
293
            except InconsistentLeaseStateError, exc:
294
                self.fail_lease(lease, exc)
295
                
296
            # An EnactmentError is raised when the handler had to perform an enactment action
297
            # (e.g., stopping a VM), and that enactment action failed. This is currently treated
298
            # as a non-recoverable error for the lease, and the lease is failed.
299
            except EnactmentError, exc:
300
                self.fail_lease(lease, exc)
301
                
302
            # Other exceptions are not expected, and generally indicate a programming error.
303
            # Thus, they are propagated upwards to the Manager where they will make
304
            # Haizea crash and burn.
305
            
306
            get_persistence().persist_lease(lease)
307

    
308
    def get_lease_by_id(self, lease_id):
309
        """Gets a lease with the given ID
310
        
311
        This method is useful for UIs (like the CLI) that operate on the lease ID.
312
        If no lease with a given ID is found, None is returned.
313

314
        Arguments:
315
        lease_id -- The ID of the lease
316
        """
317
        if not self.leases.has_lease(lease_id):
318
            return None
319
        else:
320
            return self.leases.get_lease(lease_id)
321

    
322
    def cancel_lease(self, lease):
323
        """Cancels a lease.
324
        
325
        Arguments:
326
        lease -- Lease to cancel
327
        """
328
        time = get_clock().get_time()
329
        
330
        self.logger.info("Cancelling lease %i..." % lease.id)
331
            
332
        lease_state = lease.get_state()
333
        
334
        if lease_state == Lease.STATE_PENDING:
335
            # If a lease is pending, we just need to change its state and
336
            # remove it from the lease table. Since this is done at the
337
            # end of this method, we do nothing here.
338
            pass
339

    
340
        elif lease_state == Lease.STATE_ACTIVE:
341
            # If a lease is active, that means we have to shut down its VMs to cancel it.
342
            self.logger.info("Lease %i is active. Stopping active reservation..." % lease.id)
343
            vmrr = lease.get_active_vmrrs(time)[0]
344
            self._handle_end_rr(vmrr)
345
            self.vm_scheduler._handle_unscheduled_end_vm(lease, vmrr)
346
            
347
            # Force machines to shut down
348
            try:
349
                self.vm_scheduler.resourcepool.stop_vms(lease, vmrr)
350
            except EnactmentError, exc:
351
                self.logger.error("Enactment error when shutting down VMs.")
352
                # Right now, this is a non-recoverable error, so we just
353
                # propagate it upwards.
354
                # In the future, it may be possible to react to these
355
                # kind of errors.
356
                raise            
357

    
358
        elif lease_state in [Lease.STATE_SCHEDULED, Lease.STATE_SUSPENDED_SCHEDULED, Lease.STATE_READY, Lease.STATE_RESUMED_READY]:
359
            # If a lease is scheduled or ready, we just need to cancel all future reservations
360
            # for that lease
361
            self.logger.info("Lease %i is scheduled. Cancelling reservations." % lease.id)
362
            rrs = lease.get_scheduled_reservations()
363
            for r in rrs:
364
                self.slottable.remove_reservation(r)
365
            
366
        elif lease_state in [Lease.STATE_QUEUED, Lease.STATE_SUSPENDED_QUEUED]:
367
            # If a lease is in the queue, waiting to be scheduled, cancelling
368
            # just requires removing it from the queue
369
            
370
            self.logger.info("Lease %i is in the queue. Removing..." % lease.id)
371
            self.queue.remove_lease(lease)
372
            get_persistence().persist_queue(self.queue)
373
        else:
374
            # Cancelling in any of the other states is currently unsupported
375
            raise InconsistentLeaseStateError(lease, doing = "cancelling the VM")
376
            
377
        # Change state, and remove from lease table
378
        lease.set_state(Lease.STATE_CANCELLED)
379
        self.completed_leases.add(lease)
380
        self.leases.remove(lease)
381
        get_persistence().persist_lease(lease)
382

    
383
    
384
    def fail_lease(self, lease, exc=None):
385
        """Transitions a lease to a failed state, and does any necessary cleaning up
386
        
387
        Arguments:
388
        lease -- Lease to fail
389
        exc -- The exception that made the lease fail
390
        """
391
        treatment = get_config().get("lease-failure-handling")
392
        
393
        if treatment == constants.ONFAILURE_CANCEL:
394
            # In this case, a lease failure is handled by cancelling the lease,
395
            # but allowing Haizea to continue to run normally.
396
            rrs = lease.get_scheduled_reservations()
397
            for r in rrs:
398
                self.slottable.remove_reservation(r)
399
            lease.set_state(Lease.STATE_FAIL)
400
            self.completed_leases.add(lease)
401
            self.leases.remove(lease)
402
            get_persistence().persist_lease(lease)
403
        elif treatment == constants.ONFAILURE_EXIT or treatment == constants.ONFAILURE_EXIT_RAISE:
404
            # In this case, a lease failure makes Haizea exit. This is useful when debugging,
405
            # so we can immediately know about any errors.
406
            raise UnrecoverableError(exc)
407
            
408
    
409
    def notify_event(self, lease, event):
410
        """Notifies an event that affects a lease.
411
        
412
        This is the entry point of asynchronous events into the scheduler. Currently,
413
        the only supported event is the premature end of a VM (i.e., before its
414
        scheduled end). Other events will emerge when we integrate Haizea with OpenNebula 1.4,
415
        since that version will support sending asynchronous events to Haizea.
416
        
417
        Arguments:
418
        lease -- Lease the event refers to
419
        event -- Event type
420
        """
421
        time = get_clock().get_time()
422
        if event == constants.EVENT_END_VM:
423
            vmrr = lease.get_vmrr_at(time)
424
            vmrrs_after = lease.get_vmrr_after(time)
425
            
426
            for vmrr_after in vmrrs_after:
427
                for rr in vmrr_after.pre_rrs:
428
                    self.slottable.remove_reservation(rr)
429
                for rr in vmrr_after.post_rrs:
430
                    self.slottable.remove_reservation(rr)
431
                self.slottable.remove_reservation(vmrr_after)
432
                lease.remove_vmrr(vmrr_after)
433
            
434
            self._handle_end_rr(vmrr)
435
            # TODO: Exception handling
436
            self.vm_scheduler._handle_unscheduled_end_vm(lease, vmrr)
437
            
438
            # We need to reevaluate the schedule to see if there are any 
439
            # leases scheduled in the future that could be rescheduled
440
            # to start earlier
441
            nexttime = get_clock().get_next_schedulable_time()
442
            self.reevaluate_schedule(nexttime, lease)
443

    
444
            self._handle_end_lease(lease)
445
            get_persistence().persist_lease(lease)
446

    
447

    
448
    def reevaluate_schedule(self, nexttime, ending_lease):
449
        """Reevaluates the schedule.
450
        
451
        This method can be called whenever resources are freed up
452
        unexpectedly (e.g., a lease than ends earlier than expected))
453
        to check if any leases scheduled in the future could be
454
        rescheduled to start earlier on the freed up resources.
455
        
456
        Currently, this method only checks if best-effort leases
457
        scheduled in the future (using a backfilling algorithm)
458
        can be rescheduled
459
        
460
        Arguments:
461
        nexttime -- The next time at which the scheduler can allocate resources.
462
        """        
463
        future = self.vm_scheduler.get_future_reschedulable_leases()
464
        ## BEGIN NOT-FIT-FOR-PRODUCTION CODE
465
        ## This is only necessary because we're currently using the best-effort
466
        ## scheduling algorithm for many of the deadline leases
467
        future_best_effort = [l for l in future if l.get_type() == Lease.BEST_EFFORT]
468
        ## END NOT-FIT-FOR-PRODUCTION CODE
469
        for l in future_best_effort:
470
            # We can only reschedule leases in the following four states
471
            # TODO: Leases in PREPARING state should be rescheduleable.
472
            #if l.get_state() in (Lease.STATE_PREPARING, Lease.STATE_READY, Lease.STATE_SCHEDULED, Lease.STATE_SUSPENDED_SCHEDULED):
473
            if l.get_state() in (Lease.STATE_READY, Lease.STATE_SCHEDULED, Lease.STATE_SUSPENDED_SCHEDULED):
474
                self.logger.debug("Rescheduling lease %i" % l.id)
475
                # For each reschedulable lease already scheduled in the
476
                # future, we cancel the lease's preparation and
477
                # the last scheduled VM.
478
                if l.get_state() in (Lease.STATE_READY, Lease.STATE_SCHEDULED, Lease.STATE_PREPARING):
479
                    self.preparation_scheduler.cancel_preparation(l)
480
                    l.set_state(Lease.STATE_PENDING)
481
                elif l.get_state() == Lease.STATE_SUSPENDED_SCHEDULED:
482
                    self.preparation_scheduler.cancel_preparation(l, remove_files = False)
483
                    l.set_state(Lease.STATE_SUSPENDED_PENDING)
484

    
485
                vmrr = l.get_last_vmrr()
486
                self.vm_scheduler.cancel_vm(vmrr)
487
                l.remove_vmrr(vmrr)
488

    
489
                # At this point, the lease just looks like a regular
490
                # pending lease that can be handed off directly to the
491
                # __schedule_lease method.
492
                # TODO: We should do exception handling here. However,
493
                # since we can only reschedule best-effort leases that were
494
                # originally schedule in the future, the scheduling function 
495
                # should always be able to schedule the lease (worst-case 
496
                # scenario is that it simply replicates the previous schedule)
497
                self.__schedule_lease(l, nexttime)
498

    
499
        numnodes = ending_lease.numnodes
500
        freetime = ending_lease.duration.requested - ending_lease.duration.accumulated
501
        until = nexttime + freetime
502
        freecapacity = numnodes * freetime
503

    
504
        future_vmrrs = self.slottable.get_reservations_starting_after(nexttime)
505
        future_vmrrs.sort(key=attrgetter("start"))        
506
        future_vmrrs = [rr for rr in future_vmrrs 
507
                        if isinstance(rr, VMResourceReservation) 
508
                        and rr.lease.get_type() == Lease.DEADLINE
509
                        and rr.lease.get_state() in (Lease.STATE_SCHEDULED, Lease.STATE_READY)
510
                        and not rr.is_suspending() and not rr.is_resuming()
511
                        and rr.start != rr.lease.start.requested]
512

    
513
        leases = list(set([future_vmrr.lease for future_vmrr in future_vmrrs]))
514
        leases = [l for l in leases if l.numnodes <= numnodes 
515
                  and l.start.requested <= until 
516
                  and l.duration.requested <= min(freetime, until - l.start.requested)]
517
        leases.sort(key= lambda l: (l.deadline - nexttime) / l.duration.requested)
518
        self.logger.debug("Rescheduling future deadline leases")
519

    
520
        filled = DateTimeDelta(0)        
521
        for l in leases:
522
            dur = min(until - l.start.requested, l.duration.requested)
523
            capacity = l.numnodes * dur
524
            
525
            if filled + capacity <= freecapacity:
526
                # This lease might fit
527
                self.logger.debug("Trying to reschedule lease %i" % l.id)
528
                self.slottable.push_state([l])
529
                node_ids = self.slottable.nodes.keys()
530
                earliest = {}
531
                for node in node_ids:
532
                    earliest[node] = EarliestStartingTime(nexttime, EarliestStartingTime.EARLIEST_NOPREPARATION)
533
    
534
                for vmrr in [vmrr2 for vmrr2 in future_vmrrs if vmrr2.lease == l]:
535
                    vmrr.lease.remove_vmrr(vmrr)
536
                    self.vm_scheduler.cancel_vm(vmrr)            
537

    
538
                try:
539
                    origd = l.deadline
540
                    l.deadline = until                    
541
                    (new_vmrr, preemptions) = self.vm_scheduler.reschedule_deadline(l, dur, nexttime, earliest)
542
                    l.deadline = origd
543
                    
544
                    # Add VMRR to lease
545
                    l.append_vmrr(new_vmrr)
546
                    
547
                    # Add resource reservations to slottable
548
                    
549
                    # Pre-VM RRs (if any)
550
                    for rr in new_vmrr.pre_rrs:
551
                        self.slottable.add_reservation(rr)
552
                        
553
                    # VM
554
                    self.slottable.add_reservation(new_vmrr)
555
                    
556
                    # Post-VM RRs (if any)
557
                    for rr in new_vmrr.post_rrs:
558
                        self.slottable.add_reservation(rr)             
559
    
560
                    self.logger.debug("Rescheduled lease %i" % l.id)
561
                    self.logger.vdebug("Lease after rescheduling:")
562
                    l.print_contents()
563
                                               
564
                    filled += capacity
565
                                               
566
                    self.slottable.pop_state(discard=True)
567
                except NotSchedulableException:
568
                    l.deadline = origd                    
569
                    self.logger.debug("Lease %i could not be rescheduled" % l.id)
570
                    self.slottable.pop_state()
571

    
572
    def is_queue_empty(self):
573
        """Return True is the queue is empty, False otherwise"""
574
        return self.queue.is_empty()
575

    
576
    
577
    def exists_scheduled_leases(self):
578
        """Return True if there are any leases scheduled in the future"""
579
        return not self.slottable.is_empty()    
580

    
581
            
582
    def __process_queue(self, nexttime):
583
        """ Traverses the queue in search of leases that can be scheduled.
584
        
585
        This method processes the queue in order, but takes into account that
586
        it may be possible to schedule leases in the future (using a 
587
        backfilling algorithm)
588
        
589
        Arguments:
590
        nexttime -- The next time at which the scheduler can allocate resources.
591
        """        
592
        
593
        done = False
594
        newqueue = Queue()
595
        while not done and not self.is_queue_empty():
596
            if not self.vm_scheduler.can_schedule_in_future() and self.slottable.is_full(nexttime, restype = constants.RES_CPU):
597
                self.logger.debug("Used up all future reservations and slot table is full. Skipping rest of queue.")
598
                done = True
599
            else:
600
                lease = self.queue.dequeue()
601
                try:
602
                    self.logger.info("Next request in the queue is lease %i. Attempting to schedule..." % lease.id)
603
                    lease.print_contents()
604
                    self.__schedule_lease(lease, nexttime)
605
                except NotSchedulableException, msg:
606
                    # Put back on queue
607
                    newqueue.enqueue(lease)
608
                    self.logger.info("Lease %i could not be scheduled at this time." % lease.id)
609
                    if get_config().get("backfilling") == constants.BACKFILLING_OFF:
610
                        done = True
611
        
612
        for lease in self.queue:
613
            newqueue.enqueue(lease)
614
        
615
        self.queue = newqueue 
616
    
617

    
618
    def __schedule_lease(self, lease, nexttime):            
619
        """ Schedules a lease.
620
        
621
        This method orchestrates the preparation and VM scheduler to
622
        schedule a lease.
623
        
624
        Arguments:
625
        lease -- Lease to schedule.
626
        nexttime -- The next time at which the scheduler can allocate resources.
627
        """       
628
                
629
        lease_state = lease.get_state()
630
        migration = get_config().get("migration")
631
        
632
        # Determine earliest start time in each node
633
        if lease_state == Lease.STATE_PENDING or lease_state == Lease.STATE_QUEUED:
634
            # This lease might require preparation. Ask the preparation
635
            # scheduler for the earliest starting time.
636
            earliest = self.preparation_scheduler.find_earliest_starting_times(lease, nexttime)
637
        elif lease_state == Lease.STATE_SUSPENDED_PENDING or lease_state == Lease.STATE_SUSPENDED_QUEUED:
638
            # This lease may have to be migrated.
639
            # We have to ask both the preparation scheduler and the VM
640
            # scheduler what would be the earliest possible starting time
641
            # on each node, assuming we have to transfer files between
642
            # nodes.
643

    
644
            node_ids = self.slottable.nodes.keys()
645
            earliest = {}
646
            if migration == constants.MIGRATE_NO:
647
                # If migration is disabled, the earliest starting time
648
                # is simply nexttime.
649
                for node in node_ids:
650
                    earliest[node] = EarliestStartingTime(nexttime, EarliestStartingTime.EARLIEST_NOPREPARATION)
651
            else:
652
                # Otherwise, we ask the preparation scheduler and the VM
653
                # scheduler how long it would take them to migrate the
654
                # lease state.
655
                prep_migr_time = self.preparation_scheduler.estimate_migration_time(lease)            
656
                vm_migr_time = self.vm_scheduler.estimate_migration_time(lease)
657
                for node in node_ids:
658
                    earliest[node] = EarliestStartingTime(nexttime + prep_migr_time + vm_migr_time, EarliestStartingTime.EARLIEST_MIGRATION)
659
        else:
660
            raise InconsistentLeaseStateError(lease, doing = "scheduling a best-effort lease")
661

    
662
        # Now, we give the lease to the VM scheduler, along with the
663
        # earliest possible starting times. If the VM scheduler can
664
        # schedule VMs for this lease, it will return a resource reservation
665
        # that we can add to the slot table, along with a list of
666
        # leases that have to be preempted.
667
        # If the VM scheduler can't schedule the VMs, it will throw an
668
        # exception (we don't catch it here, and it is just thrown up
669
        # to the calling method.
670
        (vmrr, preemptions) = self.vm_scheduler.schedule(lease, lease.duration.get_remaining_duration(), nexttime, earliest)
671
        
672
        ## BEGIN NOT-FIT-FOR-PRODUCTION CODE
673
        ## Pricing shouldn't live here. Instead, it should happen before a lease is accepted
674
        ## It is being done here in the interest of developing a first prototype
675
        ## that incorporates pricing in simulations (but not interactively yet)
676
        
677
        # Call pricing policy
678
        lease_price = get_policy().price_lease(lease, preemptions)
679
        
680
        # Determine whether to accept price or not (this in particular
681
        # should happen in the lease admission step)
682
        if lease.extras.has_key("simul_userrate"):
683
            user_rate = float(lease.extras["simul_userrate"])
684
            if get_config().get("policy.pricing") != "free":
685
                user_price = get_policy().pricing.get_base_price(lease, user_rate)
686
                # We want to record the rate at which the lease was priced
687
                lease.extras["rate"] = get_policy().pricing.rate
688
                if lease_price > user_price:
689
                    lease.price = -1
690
                    lease.extras["rejected_price"] = lease_price
691
                    raise NotSchedulableException, "Lease priced at %.2f. User is only willing to pay %.2f" % (lease_price, user_price)
692
        
693
        lease.price = lease_price
694
        ## END NOT-FIT-FOR-PRODUCTION CODE
695
                                
696
        # Schedule lease preparation
697
        is_ready = False
698
        preparation_rrs = []
699
        if lease_state in (Lease.STATE_SUSPENDED_PENDING, Lease.STATE_SUSPENDED_QUEUED) and migration != constants.MIGRATE_NO:
700
            # The lease might require migration
701
            migr_rrs = self.preparation_scheduler.schedule_migration(lease, vmrr, nexttime)
702
            if len(migr_rrs) > 0:
703
                end_migr = migr_rrs[-1].end
704
            else:
705
                end_migr = nexttime
706
            migr_rrs += self.vm_scheduler.schedule_migration(lease, vmrr, end_migr)
707
            migr_rrs.reverse()
708
            for migr_rr in migr_rrs:
709
                vmrr.pre_rrs.insert(0, migr_rr)
710
            if len(migr_rrs) == 0:
711
                is_ready = True
712
        elif lease_state in (Lease.STATE_SUSPENDED_PENDING, Lease.STATE_SUSPENDED_QUEUED) and migration == constants.MIGRATE_NO:
713
            # No migration means the lease is ready
714
            is_ready = True
715
        elif lease_state in (Lease.STATE_PENDING, Lease.STATE_QUEUED):
716
            # The lease might require initial preparation
717
            preparation_rrs, is_ready = self.preparation_scheduler.schedule(lease, vmrr, earliest, nexttime)
718

    
719
        # If scheduling the lease involves preempting other leases,
720
        # go ahead and preempt them.
721
        if len(preemptions) > 0:
722
            self.logger.info("Must preempt leases %s to make room for lease #%i" % ([l.id for l in preemptions], lease.id))
723
            for l in preemptions:
724
                self.__preempt_lease(l, preemption_time=vmrr.start)
725

    
726
        # At this point, the lease is feasible.
727
        # Commit changes by adding RRs to lease and to slot table
728
        
729
        # Add preparation RRs (if any) to lease
730
        for rr in preparation_rrs:
731
            lease.append_preparationrr(rr)
732
        
733
        # Add VMRR to lease
734
        lease.append_vmrr(vmrr)
735
        
736

    
737
        # Add resource reservations to slottable
738
        
739
        # Preparation RRs (if any)
740
        for rr in preparation_rrs:
741
            self.slottable.add_reservation(rr)
742
        
743
        # Pre-VM RRs (if any)
744
        for rr in vmrr.pre_rrs:
745
            self.slottable.add_reservation(rr)
746
            
747
        # VM
748
        self.slottable.add_reservation(vmrr)
749
        
750
        # Post-VM RRs (if any)
751
        for rr in vmrr.post_rrs:
752
            self.slottable.add_reservation(rr)
753
          
754
        # Change lease state
755
        if lease_state == Lease.STATE_PENDING or lease_state == Lease.STATE_QUEUED:
756
            lease.set_state(Lease.STATE_SCHEDULED)
757
            if is_ready:
758
                lease.set_state(Lease.STATE_READY)
759
        elif lease_state == Lease.STATE_SUSPENDED_PENDING or lease_state == Lease.STATE_SUSPENDED_QUEUED:
760
            lease.set_state(Lease.STATE_SUSPENDED_SCHEDULED)
761

    
762
        get_persistence().persist_lease(lease)
763

    
764
        lease.print_contents()
765

    
766
        
767
    def __preempt_lease(self, lease, preemption_time):
768
        """ Preempts a lease.
769
        
770
        This method preempts a lease such that any resources allocated
771
        to that lease after a given time are freed up. This may require
772
        scheduling the lease to suspend before that time, or cancelling
773
        the lease altogether.
774
        
775
        Arguments:
776
        lease -- Lease to schedule.
777
        preemption_time -- Time at which lease must be preempted
778
        """       
779
        
780
        self.logger.info("Preempting lease #%i..." % (lease.id))
781
        self.logger.vdebug("Lease before preemption:")
782
        lease.print_contents()
783
        vmrr = lease.get_last_vmrr()
784
        
785
        if vmrr.state == ResourceReservation.STATE_SCHEDULED and vmrr.start >= preemption_time:
786
            self.logger.debug("Lease was set to start in the middle of the preempting lease.")
787
            must_cancel_and_requeue = True
788
        else:
789
            susptype = get_config().get("suspension")
790
            if susptype == constants.SUSPENSION_NONE:
791
                must_cancel_and_requeue = True
792
            else:
793
                can_suspend = self.vm_scheduler.can_suspend_at(lease, preemption_time)
794
                if not can_suspend:
795
                    self.logger.debug("Suspending the lease does not meet scheduling threshold.")
796
                    must_cancel_and_requeue = True
797
                else:
798
                    if lease.numnodes > 1 and susptype == constants.SUSPENSION_SERIAL:
799
                        self.logger.debug("Can't suspend lease because only suspension of single-node leases is allowed.")
800
                        must_cancel_and_requeue = True
801
                    else:
802
                        self.logger.debug("Lease can be suspended")
803
                        must_cancel_and_requeue = False
804
                    
805
        if must_cancel_and_requeue:
806
            self.logger.info("... lease #%i has been cancelled and requeued." % lease.id)
807
            if lease.get_state() == Lease.STATE_SUSPENDED_SCHEDULED:
808
                self.preparation_scheduler.cancel_preparation(lease, remove_files = False)
809
            else:
810
                self.preparation_scheduler.cancel_preparation(lease)
811
            self.vm_scheduler.cancel_vm(vmrr)
812
            lease.remove_vmrr(vmrr)
813
            # TODO: Take into account other states
814
            if lease.get_state() == Lease.STATE_SUSPENDED_SCHEDULED:
815
                lease.set_state(Lease.STATE_SUSPENDED_QUEUED)
816
            else:
817
                lease.set_state(Lease.STATE_QUEUED)
818
            self.__enqueue_in_order(lease)
819
        else:
820
            self.logger.info("... lease #%i will be suspended at %s." % (lease.id, preemption_time))
821
            self.vm_scheduler.preempt_vm(vmrr, preemption_time)            
822
            
823
        get_persistence().persist_lease(lease)
824

    
825
        self.logger.vdebug("Lease after preemption:")
826
        lease.print_contents()
827
  
828
    def __enqueue(self, lease):
829
        """Queues a best-effort lease request
830
        
831
        Arguments:
832
        lease -- Lease to be queued
833
        """
834
        self.queue.enqueue(lease)
835

    
836

    
837
    def __enqueue_in_order(self, lease):
838
        """Queues a lease in order (currently, time of submission)
839
        
840
        Arguments:
841
        lease -- Lease to be queued
842
        """
843
        self.queue.enqueue_in_order(lease)
844

    
845

    
846
    def _handle_end_rr(self, rr):
847
        """Performs actions that have to be done each time a reservation ends.
848
        
849
        Arguments:
850
        rr -- Reservation that ended
851
        """
852
        self.slottable.remove_reservation(rr)
853
        
854

    
855
    def _handle_end_lease(self, l):
856
        """Performs actions that have to be done each time a lease ends.
857
        
858
        Arguments:
859
        lease -- Lease that has ended
860
        """
861
        l.set_state(Lease.STATE_DONE)
862
        l.duration.actual = l.duration.accumulated
863
        l.end = round_datetime(get_clock().get_time())
864
        if get_config().get("sanity-check"):
865
            if l.duration.known != None and l.duration.known < l.duration.requested:
866
                duration = l.duration.known
867
            else:
868
                duration = l.duration.requested
869
                
870
            assert duration == l.duration.actual
871

    
872
            if l.start.is_requested_exact():
873
                assert l.vm_rrs[0].start >= l.start.requested
874
            if l.deadline != None:
875
                assert l.end <= l.deadline
876

    
877
        self.preparation_scheduler.cleanup(l)
878
        self.completed_leases.add(l)
879
        self.leases.remove(l)
880
        self.accounting.at_lease_done(l)
881
        
882
        
883

    
884
        
885

    
886
class Queue(object):
887
    """A simple queue for leases
888
    
889
    This class is a simple queue container for leases, with some
890
    extra syntactic sugar added for convenience.    
891
    """    
892

    
893
    def __init__(self):
894
        self.__q = []
895
        
896
    def is_empty(self):
897
        return len(self.__q)==0
898
    
899
    def enqueue(self, r):
900
        self.__q.append(r)
901
    
902
    def dequeue(self):
903
        return self.__q.pop(0)
904
    
905
    def enqueue_in_order(self, r):
906
        self.__q.append(r)
907
        self.__q.sort(key=attrgetter("submit_time"))
908

    
909
    def length(self):
910
        return len(self.__q)
911
    
912
    def has_lease(self, lease_id):
913
        return (1 == len([l for l in self.__q if l.id == lease_id]))
914
    
915
    def get_lease(self, lease_id):
916
        return [l for l in self.__q if l.id == lease_id][0]
917
    
918
    def remove_lease(self, lease):
919
        self.__q.remove(lease)
920
    
921
    def __iter__(self):
922
        return iter(self.__q)
923
        
924
class LeaseTable(object):
925
    """A simple container for leases
926
    
927
    This class is a simple dictionary-like container for leases, with some
928
    extra syntactic sugar added for convenience.    
929
    """    
930
    
931
    def __init__(self):
932
        self.entries = {}
933
        
934
    def has_lease(self, lease_id):
935
        return self.entries.has_key(lease_id)
936
        
937
    def get_lease(self, lease_id):
938
        return self.entries[lease_id]
939
    
940
    def is_empty(self):
941
        return len(self.entries)==0
942
    
943
    def remove(self, lease):
944
        del self.entries[lease.id]
945
        
946
    def add(self, lease):
947
        self.entries[lease.id] = lease
948
        
949
    def get_leases(self, type=None):
950
        if type==None:
951
            return self.entries.values()
952
        else:
953
            return [e for e in self.entries.values() if e.get_type() == type]
954

    
955
    def get_leases_by_state(self, state):
956
        return [e for e in self.entries.values() if e.get_state() == state]
957
 
958