Project

General

Profile

root / branches / 1.1 / src / haizea / core / scheduler / mapper.py @ 842

1
# -------------------------------------------------------------------------- #
2
# Copyright 2006-2009, University of Chicago                                 #
3
# Copyright 2008-2009, Distributed Systems Architecture Group, Universidad   #
4
# Complutense de Madrid (dsa-research.org)                                   #
5
#                                                                            #
6
# Licensed under the Apache License, Version 2.0 (the "License"); you may    #
7
# not use this file except in compliance with the License. You may obtain    #
8
# a copy of the License at                                                   #
9
#                                                                            #
10
# http://www.apache.org/licenses/LICENSE-2.0                                 #
11
#                                                                            #
12
# Unless required by applicable law or agreed to in writing, software        #
13
# distributed under the License is distributed on an "AS IS" BASIS,          #
14
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.   #
15
# See the License for the specific language governing permissions and        #
16
# limitations under the License.                                             #
17
# -------------------------------------------------------------------------- #
18

    
19
"""This module provides the base class for writing custom "mappers" and the
20
default greedy mapper used in Haizea. A mapper is a class with a single function
21
"map" that takes a set of requested resources (typically corresponding to
22
VMs) and maps them to physical nodes (if such a mapping exists).
23
"""
24

    
25
from haizea.common.utils import abstract, get_config, get_clock
26
from haizea.core.scheduler import NotSchedulableException, EarliestStartingTime
27
from haizea.core.scheduler.slottable import ResourceReservation
28
from haizea.core.leases import Lease
29
import haizea.common.constants as constants
30

    
31
import operator
32
import logging
33

    
34
# This dictionary provides a shorthand notation for any mappers
35
# included in this module (this shorthand notation can be used in
36
# the configuration file)
37
class_mappings = {"greedy": "haizea.core.scheduler.mapper.GreedyMapper",
38
                  "deadline": "haizea.core.scheduler.mapper.DeadlineMapper"}
39

    
40
class Mapper(object):
41
    """Base class for mappers
42
    
43
    """
44
    
45
    def __init__(self, slottable, policy):
46
        """Constructor
47
        
48
        Arguments
49
        slottable -- A fully constructed SlotTable
50
        policy -- A fully constructed PolicyManager
51
        """
52
        self.slottable = slottable
53
        self.policy = policy
54
        self.logger = logging.getLogger("MAP")
55
    
56
    
57
    def map(self, requested_resources, start, end, strictend, onlynodes = None):
58
        """The mapping function
59
        
60
        The mapping function takes a set of requested resources and maps
61
        them to physical resources (based on the availability 
62
        in the slot table) in a specified time interval. The mapper
63
        may return a mapping that only satisfies part of the specified
64
        time interval.
65
        
66
        Arguments:
67
        requested_resources -- A dictionary mapping lease nodes (integers) to
68
        ResourceTuples (representing the desired amount of resources for
69
        that lease node)
70
        start -- Starting time of the interval during which the resources
71
        are required
72
        end -- Ending time of the interval
73
        strictend -- If True, the only valid mappings are those that span
74
        the entire requested interval. If False, the mapper is allowed to
75
        return mappings that only span part of the interval (this reduced
76
        interval must always start at "start"; the earlier end time is
77
        returned as a return value)
78
        onlynodes -- List of physical nodes. Only look for a mapping in
79
        these nodes.
80
        
81
        Returns:
82
        mapping -- A dictionary mapping lease nodes to physical nodes
83
        maxend -- The end of the interval for which a mapping was found.
84
        As noted in argument "strictend", this return value might not
85
        be the same as "end"
86
        preempting -- Leases that would have to be preempted for the
87
        mapping to be valid.
88
        
89
        If no mapping is found, the three return values are set to None
90
        """
91
        abstract()
92

    
93

    
94
class GreedyMapper(Mapper):
95
    """Haizea's default greedy mapper
96
    
97
    Haizea uses a greedy algorithm to determine how VMs are mapped to
98
    physical resources at a specific point in time (determining that point
99
    in time, when using best-effort scheduling, is determined in the lease
100
    and VM scheduling classes). 
101
    
102
    The way the algorithm works is by, first, greedily ordering the
103
    physical nodes from "most desirable" to "least desirable". For example,
104
    a physical node with no leases scheduled on it in the future is preferable
105
    to one with leases (since this reduces the probability of having to
106
    preempt leases to obtain a mapping). This ordering, however, is done by the 
107
    policy engine (see the GreedyPolicy class in the host_selection module) so, 
108
    to be a truly greedy algorithm, this mapper must be used in conjunction with 
109
    the "greedy" host selection policy).
110
    
111
    Then, the algorithm traverses the list of nodes and tries to map as many
112
    lease nodes into each physical node before moving on to the next. If
113
    the list of physical nodes is exhausted without finding a mapping for all
114
    the lease nodes, then the algorithm tries to find a mapping by preempting
115
    other leases.
116
    
117
    Before doing this, the mapper must first determine what leases could be
118
    preempted. This decision is delegated to the policy engine, which returns
119
    a list of leases ordered from "most preemptable" to "least preemptable".
120
    The mapper attempts a mapping assuming that the first lease is going
121
    to be preempted, then assuming the first and the second, etc.
122
    
123
    If no mapping is found with preemption, then there is no mapping at the
124
    requested time.
125
    
126
    """
127
    
128
    def __init__(self, slottable, policy):
129
        """Constructor
130
        
131
        Arguments
132
        slottable -- A fully constructed SlotTable
133
        policy -- A fully constructed PolicyManager
134
        """        
135
        Mapper.__init__(self, slottable, policy)
136
        
137
    def map(self, lease, requested_resources, start, end, strictend, allow_preemption=False, onlynodes=None):
138
        """The mapping function
139
        
140
        See documentation in Mapper for more details
141
        """        
142
        
143
        # Generate an availability window at time "start"
144
        aw = self.slottable.get_availability_window(start)
145

    
146
        nodes = aw.get_nodes_at(start)     
147
        if onlynodes != None:
148
            nodes = list(set(nodes) & onlynodes)
149

    
150
        # Get an ordered list of physical nodes
151
        pnodes = self.policy.sort_hosts(nodes, start, lease)
152
        
153
        # Get an ordered list of lease nodes
154
        vnodes = self.__sort_vnodes(requested_resources)
155
        
156
        if allow_preemption:
157
            # Get the leases that intersect with the requested interval.
158
            leases = aw.get_leases_between(start, end)
159
            # Ask the policy engine to sort the leases based on their
160
            # preemptability
161
            leases = self.policy.sort_leases(lease, leases, start)
162
            
163
            preemptable_leases = leases
164
        else:
165
            preemptable_leases = []
166

    
167
        preempting = []
168
        
169
        # Try to find a mapping. Each iteration of this loop goes through
170
        # all the lease nodes and tries to find a mapping. The first
171
        # iteration assumes no leases can be preempted, and each successive
172
        # iteration assumes one more lease can be preempted.
173
        mapping = {}
174
        done = False
175
        while not done:
176
            # Start at the first lease node
177
            vnodes_pos = 0
178
            cur_vnode = vnodes[vnodes_pos]
179
            cur_vnode_capacity = requested_resources[cur_vnode]
180
            maxend = end 
181
            
182
            # Go through all the physical nodes.
183
            # In each iteration, we try to map as many lease nodes
184
            # as possible into the physical nodes.
185
            # "cur_vnode_capacity" holds the capacity of the vnode we are currently
186
            # trying to map. "need_to_map" is the amount of resources we are 
187
            # trying to map into the current physical node (which might be
188
            # more than one lease node).
189
            for pnode in pnodes:
190
                # need_to_map is initialized to the capacity of whatever
191
                # lease node we are trying to map now.
192
                need_to_map = self.slottable.create_empty_resource_tuple()
193
                need_to_map.incr(cur_vnode_capacity)
194
                avail=aw.get_ongoing_availability(start, pnode, preempted_leases = preempting)
195
                
196
                # Try to fit as many lease nodes as we can into this physical node
197
                pnode_done = False
198
                while not pnode_done:
199
                    if avail.fits(need_to_map, until = maxend):
200
                        # In this case, we can fit "need_to_map" into the
201
                        # physical node.
202
                        mapping[cur_vnode] = pnode
203
                        vnodes_pos += 1
204
                        if vnodes_pos >= len(vnodes):
205
                            # No more lease nodes to map, we're done.
206
                            done = True
207
                            break
208
                        else:
209
                            # Advance to the next lease node, and add its
210
                            # capacity to need_to_map
211
                            cur_vnode = vnodes[vnodes_pos]
212
                            cur_vnode_capacity = requested_resources[cur_vnode]
213
                            need_to_map.incr(cur_vnode_capacity)
214
                    else:
215
                        # We couldn't fit the lease node. If we need to
216
                        # find a mapping that spans the entire requested
217
                        # interval, then we're done checking this physical node.
218
                        if strictend:
219
                            pnode_done = True
220
                        else:
221
                            # Otherwise, check what the longest interval
222
                            # we could fit in this physical node
223
                            latest = avail.latest_fit(need_to_map)
224
                            if latest == None:
225
                                pnode_done = True
226
                            else:
227
                                maxend = latest
228
                    
229
                if done:
230
                    break
231

    
232
            # If there's no more leases that we could preempt,
233
            # we're done.
234
            if len(preemptable_leases) == 0:
235
                done = True
236
            elif not done:
237
                # Otherwise, add another lease to the list of
238
                # leases we are preempting
239
                preempting.append(preemptable_leases.pop())
240

    
241
        if len(mapping) != len(requested_resources):
242
            # No mapping found
243
            return None, None, None
244
        else:
245
            return mapping, maxend, preempting
246

    
247
    def __sort_vnodes(self, requested_resources):
248
        """Sorts the lease nodes
249
        
250
        Greedily sorts the lease nodes so the mapping algorithm
251
        will first try to map those that require the highest
252
        capacity.
253
        """            
254
        
255
        # Find the maximum requested resources for each resource type
256
        max_res = self.slottable.create_empty_resource_tuple()
257
        for res in requested_resources.values():
258
            for i in range(len(res._single_instance)):
259
                if res._single_instance[i] > max_res._single_instance[i]:
260
                    max_res._single_instance[i] = res._single_instance[i]
261
                    
262
        # Normalize the capacities of the lease nodes (divide each
263
        # requested amount of a resource type by the maximum amount)
264
        norm_res = {}
265
        for k,v in requested_resources.items():
266
            norm_capacity = 0
267
            for i in range(len(max_res._single_instance)):
268
                if max_res._single_instance[i] > 0:
269
                    norm_capacity += v._single_instance[i] / float(max_res._single_instance[i])
270
            norm_res[k] = norm_capacity
271
             
272
        vnodes = norm_res.items()
273
        vnodes.sort(key=operator.itemgetter(1), reverse = True)
274
        vnodes = [k for k,v in vnodes]
275
        return vnodes      
276
                    
277

    
278
class DeadlineMapper(Mapper):
279
    """Haizea's greedy mapper w/ deadline-sensitive preemptions
280
    
281
    """
282
    
283
    def __init__(self, slottable, policy):
284
        """Constructor
285
        
286
        Arguments
287
        slottable -- A fully constructed SlotTable
288
        policy -- A fully constructed PolicyManager
289
        """        
290
        Mapper.__init__(self, slottable, policy)
291
        
292
    def set_vm_scheduler(self, vm_scheduler):
293
        self.vm_scheduler = vm_scheduler
294
        
295
    def map(self, lease, requested_resources, start, end, strictend, allow_preemption=False, onlynodes=None):
296
        """The mapping function
297
        
298
        See documentation in Mapper for more details
299
        """        
300
        # Generate an availability window at time "start"
301
        aw = self.slottable.get_availability_window(start)
302

    
303
        nodes = aw.get_nodes_at(start)     
304
        if onlynodes != None:
305
            nodes = list(set(nodes) & onlynodes)
306

    
307
        # Get an ordered list of physical nodes
308
        pnodes = self.policy.sort_hosts(nodes, start, lease)
309
        
310
        # Get an ordered list of lease nodes
311
        vnodes = self.__sort_vnodes(requested_resources)
312
        
313
        if allow_preemption:
314
            # Get the leases that intersect with the requested interval.
315
            leases = aw.get_leases_between(start, end)
316
            # Ask the policy engine to sort the leases based on their
317
            # preemptability
318
            leases = self.policy.sort_leases(lease, leases, start)
319
            
320
            preemptable_leases = leases
321
        else:
322
            preemptable_leases = []
323

    
324
        if allow_preemption:
325
            self.slottable.push_state(preemptable_leases) 
326

    
327
        preempting = []
328
        nexttime = get_clock().get_next_schedulable_time()
329
        
330
        # Try to find a mapping. Each iteration of this loop goes through
331
        # all the lease nodes and tries to find a mapping. The first
332
        # iteration assumes no leases can be preempted, and each successive
333
        # iteration assumes one more lease can be preempted.
334
        mapping = {}
335
        done = False
336
        while not done:
337
            # Start at the first lease node
338
            vnodes_pos = 0
339
            cur_vnode = vnodes[vnodes_pos]
340
            cur_vnode_capacity = requested_resources[cur_vnode]
341
            maxend = end 
342
            
343
            # Go through all the physical nodes.
344
            # In each iteration, we try to map as many lease nodes
345
            # as possible into the physical nodes.
346
            # "cur_vnode_capacity" holds the capacity of the vnode we are currently
347
            # trying to map. "need_to_map" is the amount of resources we are 
348
            # trying to map into the current physical node (which might be
349
            # more than one lease node).
350
            for pnode in pnodes:
351
                # need_to_map is initialized to the capacity of whatever
352
                # lease node we are trying to map now.
353
                need_to_map = self.slottable.create_empty_resource_tuple()
354
                need_to_map.incr(cur_vnode_capacity)
355
                avail=aw.get_ongoing_availability(start, pnode, preempted_leases = preempting)
356
                
357
                # Try to fit as many lease nodes as we can into this physical node
358
                pnode_done = False
359
                while not pnode_done:
360
                    if avail.fits(need_to_map, until = maxend):
361
                        # In this case, we can fit "need_to_map" into the
362
                        # physical node.
363
                        mapping[cur_vnode] = pnode
364
                        vnodes_pos += 1
365
                        if vnodes_pos >= len(vnodes):
366
                            # No more lease nodes to map, we're done.
367
                            done = True
368
                            break
369
                        else:
370
                            # Advance to the next lease node, and add its
371
                            # capacity to need_to_map
372
                            cur_vnode = vnodes[vnodes_pos]
373
                            cur_vnode_capacity = requested_resources[cur_vnode]
374
                            need_to_map.incr(cur_vnode_capacity)
375
                    else:
376
                        # We couldn't fit the lease node. If we need to
377
                        # find a mapping that spans the entire requested
378
                        # interval, then we're done checking this physical node.
379
                        if strictend:
380
                            pnode_done = True
381
                        else:
382
                            # Otherwise, check what the longest interval
383
                            # we could fit in this physical node
384
                            latest = avail.latest_fit(need_to_map)
385
                            if latest == None:
386
                                pnode_done = True
387
                            else:
388
                                maxend = latest
389
                    
390
                if done:
391
                    break
392

    
393
            # If there's no more leases that we could preempt,
394
            # we're done.
395
            if len(preemptable_leases) == 0:
396
                done = True
397
            elif not done:
398
                # Otherwise, add another lease to the list of
399
                # leases we are preempting
400
                added = False
401
                while not added:
402
                    preemptee = preemptable_leases.pop()
403
                    try:
404
                        self.__preempt_lease_deadline(preemptee, start, end, nexttime)
405
                        preempting.append(preemptee)
406
                        added = True
407
                    except NotSchedulableException:
408
                        if len(preemptable_leases) == 0:
409
                            done = True
410
                            break
411
                    
412

    
413
        if len(mapping) != len(requested_resources):
414
            # No mapping found
415
            if allow_preemption:
416
                self.slottable.pop_state()
417
            return None, None, None
418
        else:
419
            if allow_preemption:
420
                self.slottable.pop_state(discard = True)
421
            return mapping, maxend, preempting
422

    
423
    def __sort_vnodes(self, requested_resources):
424
        """Sorts the lease nodes
425
        
426
        Greedily sorts the lease nodes so the mapping algorithm
427
        will first try to map those that require the highest
428
        capacity.
429
        """            
430
        
431
        # Find the maximum requested resources for each resource type
432
        max_res = self.slottable.create_empty_resource_tuple()
433
        for res in requested_resources.values():
434
            for i in range(len(res._single_instance)):
435
                if res._single_instance[i] > max_res._single_instance[i]:
436
                    max_res._single_instance[i] = res._single_instance[i]
437
                    
438
        # Normalize the capacities of the lease nodes (divide each
439
        # requested amount of a resource type by the maximum amount)
440
        norm_res = {}
441
        for k,v in requested_resources.items():
442
            norm_capacity = 0
443
            for i in range(len(max_res._single_instance)):
444
                if max_res._single_instance[i] > 0:
445
                    norm_capacity += v._single_instance[i] / float(max_res._single_instance[i])
446
            norm_res[k] = norm_capacity
447
             
448
        vnodes = norm_res.items()
449
        vnodes.sort(key=operator.itemgetter(1), reverse = True)
450
        vnodes = [k for k,v in vnodes]
451
        return vnodes      
452
                    
453
    def __preempt_lease_deadline(self, lease_to_preempt, preemption_start_time, preemption_end_time, nexttime):
454
        self.logger.debug("Attempting to preempt lease %i" % lease_to_preempt.id)
455
        self.slottable.push_state([lease_to_preempt])  
456
         
457
        feasible = True
458
        cancelled = []
459
        new_state = {}
460
        durs = {}
461

    
462
        preempt_vmrr = lease_to_preempt.get_vmrr_at(preemption_start_time)
463
        
464
        susptype = get_config().get("suspension")
465
        
466
        cancel = False
467
        
468
        if susptype == constants.SUSPENSION_NONE:
469
            self.logger.debug("Lease %i will be cancelled because suspension is not supported." % lease_to_preempt.id)
470
            cancel = True
471
        else:
472
            if preempt_vmrr == None:
473
                self.logger.debug("Lease %i was set to start in the middle of the preempting lease." % lease_to_preempt.id)
474
                cancel = True
475
            else:
476
                can_suspend = self.vm_scheduler.can_suspend_at(lease_to_preempt, preemption_start_time, nexttime)
477
                
478
                if not can_suspend:
479
                    self.logger.debug("Suspending lease %i does not meet scheduling threshold." % lease_to_preempt.id)
480
                    cancel = True
481
                else:
482
                    self.logger.debug("Lease %i will be suspended." % lease_to_preempt.id)
483
                    
484
        after_vmrrs = lease_to_preempt.get_vmrr_after(preemption_start_time)
485

    
486
        if not cancel:
487
            # Preempting
488
            durs[lease_to_preempt] = lease_to_preempt.get_remaining_duration_at(preemption_start_time)             
489
            self.vm_scheduler.preempt_vm(preempt_vmrr, min(preemption_start_time,preempt_vmrr.end))
490
            susp_time = preempt_vmrr.post_rrs[-1].end - preempt_vmrr.post_rrs[0].start
491
            durs[lease_to_preempt] += susp_time
492
                                    
493
        else:                                
494
            cancelled.append(lease_to_preempt.id)
495

    
496
            if preempt_vmrr != None:
497
                durs[lease_to_preempt] = lease_to_preempt.get_remaining_duration_at(preempt_vmrr.start)             
498
                
499
                lease_to_preempt.remove_vmrr(preempt_vmrr)
500
                self.vm_scheduler.cancel_vm(preempt_vmrr)
501

    
502
                # Cancel future VMs
503
                for after_vmrr in after_vmrrs:
504
                    lease_to_preempt.remove_vmrr(after_vmrr)
505
                    self.vm_scheduler.cancel_vm(after_vmrr)                   
506
                after_vmrrs=[]
507
                if preempt_vmrr.state == ResourceReservation.STATE_ACTIVE:
508
                    last_vmrr = lease_to_preempt.get_last_vmrr()
509
                    if last_vmrr != None and last_vmrr.is_suspending():
510
                        new_state[lease_to_preempt] = Lease.STATE_SUSPENDED_SCHEDULED
511
                    else:
512
                        # The VMRR we're preempting is the active one
513
                        new_state[lease_to_preempt] = Lease.STATE_READY
514
            else:
515
                durs[lease_to_preempt] = lease_to_preempt.get_remaining_duration_at(preemption_start_time)             
516
                lease_state = lease_to_preempt.get_state()
517
                if lease_state == Lease.STATE_ACTIVE:
518
                    # Don't do anything. The lease is active, but not in the VMs
519
                    # we're preempting.
520
                    new_state[lease_to_preempt] = None
521
                elif lease_state in (Lease.STATE_SUSPENDING, Lease.STATE_SUSPENDED_PENDING, Lease.STATE_SUSPENDED_SCHEDULED):
522
                    # Don't do anything. The lease is suspending or suspended. 
523
                    # Must stay that way.
524
                    new_state[lease_to_preempt] = None
525
                elif lease_state != Lease.STATE_READY:
526
                    new_state[lease_to_preempt] = Lease.STATE_READY   
527
                    
528
        # Cancel future VMs
529
        for after_vmrr in after_vmrrs:
530
            lease_to_preempt.remove_vmrr(after_vmrr)
531
            self.vm_scheduler.cancel_vm(after_vmrr)                   
532

    
533
        dur = durs[lease_to_preempt]
534
        node_ids = self.slottable.nodes.keys()
535
        earliest = {}
536
   
537
        try:
538
            if lease_to_preempt.id in cancelled:
539
                last_vmrr = lease_to_preempt.get_last_vmrr()
540
                if last_vmrr != None and last_vmrr.is_suspending():
541
                    override_state = Lease.STATE_SUSPENDED_PENDING
542
                else:
543
                    override_state = None
544
                for node in node_ids:
545
                    earliest[node] = EarliestStartingTime(preemption_end_time, EarliestStartingTime.EARLIEST_NOPREPARATION)                
546
                (new_vmrr, preemptions) = self.vm_scheduler.reschedule_deadline(lease_to_preempt, dur, nexttime, earliest, override_state)
547
            else:
548
                for node in node_ids:
549
                    earliest[node] = EarliestStartingTime(preemption_end_time, EarliestStartingTime.EARLIEST_NOPREPARATION)                
550
                (new_vmrr, preemptions) = self.vm_scheduler.reschedule_deadline(lease_to_preempt, dur, nexttime, earliest, override_state = Lease.STATE_SUSPENDED_PENDING)
551

    
552
            # Add VMRR to lease
553
            lease_to_preempt.append_vmrr(new_vmrr)
554
            
555
    
556
            # Add resource reservations to slottable
557
            
558
            # Pre-VM RRs (if any)
559
            for rr in new_vmrr.pre_rrs:
560
                self.slottable.add_reservation(rr)
561
                
562
            # VM
563
            self.slottable.add_reservation(new_vmrr)
564
            
565
            # Post-VM RRs (if any)
566
            for rr in new_vmrr.post_rrs:
567
                self.slottable.add_reservation(rr)                    
568
        except NotSchedulableException:
569
            feasible = False
570

    
571
        if not feasible:
572
            self.logger.debug("Unable to preempt lease %i" % lease_to_preempt.id)
573
            self.slottable.pop_state()
574
            raise NotSchedulableException, "Unable to preempt leases to make room for lease."
575
        else:
576
            self.logger.debug("Was able to preempt lease %i" % lease_to_preempt.id)
577
            self.slottable.pop_state(discard = True)
578

    
579
            for l in new_state:
580
                if new_state[l] != None:
581
                    l.state_machine.state = new_state[l]
582

    
583
            self.logger.vdebug("Lease %i after preemption:" % lease_to_preempt.id)
584
            lease_to_preempt.print_contents()