Project

General

Profile

root / branches / 1.1 / src / haizea / core / manager.py @ 693

1
# -------------------------------------------------------------------------- #
2
# Copyright 2006-2009, University of Chicago                                 #
3
# Copyright 2008-2009, Distributed Systems Architecture Group, Universidad   #
4
# Complutense de Madrid (dsa-research.org)                                   #
5
#                                                                            #
6
# Licensed under the Apache License, Version 2.0 (the "License"); you may    #
7
# not use this file except in compliance with the License. You may obtain    #
8
# a copy of the License at                                                   #
9
#                                                                            #
10
# http://www.apache.org/licenses/LICENSE-2.0                                 #
11
#                                                                            #
12
# Unless required by applicable law or agreed to in writing, software        #
13
# distributed under the License is distributed on an "AS IS" BASIS,          #
14
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.   #
15
# See the License for the specific language governing permissions and        #
16
# limitations under the License.                                             #
17
# -------------------------------------------------------------------------- #
18

    
19
"""The manager (resource manager) module is the root of Haizea. If you want to
20
see where the ball starts rolling, look at the following two functions:
21

22
* manager.Manager.__init__()
23
* manager.Manager.start()
24

25
This module provides the following classes:
26

27
* Manager: Haizea itself. Pretty much everything else
28
  is contained in this class.
29
* Clock: A base class for Haizea's clock.
30
* SimulatedClock: A clock for simulations.
31
* RealClock: A clock that advances in realtime.
32
"""
33
 
34
import haizea.common.constants as constants
35
from haizea.core.scheduler.preparation_schedulers.unmanaged import UnmanagedPreparationScheduler
36
from haizea.core.scheduler.preparation_schedulers.imagetransfer import ImageTransferPreparationScheduler
37
from haizea.core.enact.opennebula import OpenNebulaResourcePoolInfo, OpenNebulaVMEnactment, OpenNebulaDummyDeploymentEnactment
38
from haizea.core.enact.simulated import SimulatedResourcePoolInfo, SimulatedVMEnactment, SimulatedDeploymentEnactment
39
from haizea.core.frontends.tracefile import TracefileFrontend
40
from haizea.core.frontends.opennebula import OpenNebulaFrontend
41
from haizea.core.frontends.rpc import RPCFrontend
42
from haizea.core.accounting import AccountingDataCollection
43
from haizea.core.scheduler import UnrecoverableError
44
from haizea.core.scheduler.lease_scheduler import LeaseScheduler
45
from haizea.core.scheduler.vm_scheduler import VMScheduler
46
from haizea.core.scheduler.mapper import class_mappings as mapper_mappings
47
from haizea.core.scheduler.slottable import SlotTable, ResourceReservation
48
from haizea.core.scheduler.policy import PolicyManager
49
from haizea.core.scheduler.resourcepool import ResourcePool, ResourcePoolWithReusableImages
50
from haizea.core.leases import Lease, Site
51
from haizea.core.log import HaizeaLogger
52
from haizea.core.rpcserver import RPCServer
53
from haizea.common.utils import abstract, round_datetime, Singleton, import_class, OpenNebulaXMLRPCClientSingleton
54
from haizea.common.opennebula_xmlrpc import OpenNebulaXMLRPCClient
55
from haizea.pluggable.policies import admission_class_mappings, preemption_class_mappings, host_class_mappings, pricing_mappings
56
from haizea.pluggable.accounting import probe_class_mappings
57

    
58
import operator
59
import logging
60
import signal
61
import sys, os
62
import traceback
63
import shelve
64
import socket
65
from time import sleep
66
from math import ceil
67
from mx.DateTime import now, TimeDelta
68

    
69
DAEMON_STDOUT = DAEMON_STDIN = "/dev/null"
70
DAEMON_STDERR = "/var/tmp/haizea.err"
71
DEFAULT_LOGFILE = "/var/tmp/haizea.log"
72

    
73
class Manager(object):
74
    """The root of Haizea
75
    
76
    This class is the root of Haizea. Pretty much everything else (scheduler,
77
    enactment modules, etc.) is contained in this class. The Manager
78
    class is meant to be a singleton.
79
    
80
    """
81
    
82
    __metaclass__ = Singleton
83
    
84
    def __init__(self, config, daemon=False, pidfile=None):
85
        """Initializes the manager.
86
        
87
        Argument:
88
        config -- a populated instance of haizea.common.config.RMConfig
89
        daemon -- True if Haizea must run as a daemon, False if it must
90
                  run in the foreground
91
        pidfile -- When running as a daemon, file to save pid to
92
        """
93
        self.config = config
94
        self.daemon = daemon
95
        self.pidfile = pidfile
96

    
97
    # Has to be in a separate function since some pluggable modules need to
98
    # access the Manager singleton
99
    def __initialize(self):
100
        # Create the RM components
101
        
102
        mode = self.config.get("mode")
103
        
104

    
105
        if mode == "simulated":
106
            # Simulated-time simulations always run in the foreground
107
            clock = self.config.get("clock")
108
            if clock == constants.CLOCK_SIMULATED:
109
                self.daemon = False
110
        elif mode == "opennebula":
111
            clock = constants.CLOCK_REAL        
112
        
113
        self.init_logging()
114
                
115
        if clock == constants.CLOCK_SIMULATED:
116
            starttime = self.config.get("starttime")
117
            self.clock = SimulatedClock(self, starttime)
118
            self.rpc_server = None
119
        elif clock == constants.CLOCK_REAL:
120
            wakeup_interval = self.config.get("wakeup-interval")
121
            non_sched = self.config.get("non-schedulable-interval")
122
            if mode == "opennebula":
123
                fastforward = self.config.get("dry-run")
124
            else:
125
                fastforward = False
126
            self.clock = RealClock(self, wakeup_interval, non_sched, fastforward)
127
            if fastforward:
128
                # No need for an RPC server when doing a dry run
129
                self.rpc_server = None
130
            else:
131
                self.rpc_server = RPCServer(self)
132
                    
133
        # Create the RPC singleton client for OpenNebula mode
134
        if mode == "opennebula":
135
            host = self.config.get("one.host")
136
            port = self.config.get("one.port")
137
            try:
138
                rv = OpenNebulaXMLRPCClient.get_userpass_from_env()
139
            except:
140
                print "ONE_AUTH environment variable is not set or authorization file is malformed"
141
                exit(1)
142
                
143
            user, passw = rv[0], rv[1]
144
            try:
145
                OpenNebulaXMLRPCClientSingleton(host, port, user, passw)
146
            except socket.error, e:
147
                print "Unable to connect to OpenNebula"
148
                print "Reason: %s" % e
149
                exit(1)
150
                    
151
        # Enactment modules
152
        if mode == "simulated":
153
            resources = self.config.get("simul.resources")
154
            if resources == "in-tracefile":
155
                tracefile = self.config.get("tracefile")
156
                site = Site.from_lwf_file(tracefile)
157
            elif resources.startswith("file:"):
158
                sitefile = resources.split(":")
159
                site = Site.from_xml_file(sitefile)
160
            else:
161
                site = Site.from_resources_string(resources)
162
    
163
            deploy_bandwidth = self.config.get("imagetransfer-bandwidth")
164
            info_enact = SimulatedResourcePoolInfo(site)
165
            vm_enact = SimulatedVMEnactment()
166
            deploy_enact = SimulatedDeploymentEnactment(deploy_bandwidth)
167
        elif mode == "opennebula":
168
            # Enactment modules
169
            info_enact = OpenNebulaResourcePoolInfo()
170
            vm_enact = OpenNebulaVMEnactment()
171
            # No deployment in OpenNebula. Using dummy one for now.
172
            deploy_enact = OpenNebulaDummyDeploymentEnactment()            
173

    
174
        if mode == "simulated":
175
            preparation_type = self.config.get("lease-preparation")
176
        elif mode == "opennebula":
177
            # No deployment in OpenNebula.
178
            preparation_type = constants.PREPARATION_UNMANAGED
179

    
180
        # Resource pool
181
        if preparation_type == constants.PREPARATION_TRANSFER:
182
            if self.config.get("diskimage-reuse") == constants.REUSE_IMAGECACHES:
183
                resourcepool = ResourcePoolWithReusableImages(info_enact, vm_enact, deploy_enact)
184
            else:
185
                resourcepool = ResourcePool(info_enact, vm_enact, deploy_enact)
186
        else:
187
            resourcepool = ResourcePool(info_enact, vm_enact, deploy_enact)
188
    
189
        # Slot table
190
        slottable = SlotTable(info_enact.get_resource_types())
191
        for n in resourcepool.get_nodes() + resourcepool.get_aux_nodes():
192
            rt = slottable.create_resource_tuple_from_capacity(n.capacity)
193
            slottable.add_node(n.id, rt)
194

    
195
        # Policy manager
196
        admission = self.config.get("policy.admission")
197
        admission = admission_class_mappings.get(admission, admission)
198
        admission = import_class(admission)
199
        admission = admission(slottable)
200
        
201
        preemption = self.config.get("policy.preemption")
202
        preemption = preemption_class_mappings.get(preemption, preemption)
203
        preemption = import_class(preemption)
204
        preemption = preemption(slottable)
205

    
206
        host_selection = self.config.get("policy.host-selection")
207
        host_selection = host_class_mappings.get(host_selection, host_selection)
208
        host_selection = import_class(host_selection)
209
        host_selection = host_selection(slottable)
210

    
211
        pricing = self.config.get("policy.pricing")
212
        pricing = pricing_mappings.get(pricing, pricing)
213
        pricing = import_class(pricing)
214
        pricing = pricing(slottable)
215

    
216
        self.policy = PolicyManager(admission, preemption, host_selection, pricing)
217

    
218
        # Preparation scheduler
219
        if preparation_type == constants.PREPARATION_UNMANAGED:
220
            preparation_scheduler = UnmanagedPreparationScheduler(slottable, resourcepool, deploy_enact)
221
        elif preparation_type == constants.PREPARATION_TRANSFER:
222
            preparation_scheduler = ImageTransferPreparationScheduler(slottable, resourcepool, deploy_enact)    
223
    
224
        # VM mapper and scheduler
225
        mapper = self.config.get("mapper")
226
        mapper = mapper_mappings.get(mapper, mapper)
227
        mapper = import_class(mapper)
228
        mapper = mapper(slottable, self.policy)
229
        
230
        # When using backfilling, set the number of leases that can be
231
        # scheduled in the future.
232
        backfilling = self.config.get("backfilling")
233
        if backfilling == constants.BACKFILLING_OFF:
234
            max_in_future = 0
235
        elif backfilling == constants.BACKFILLING_AGGRESSIVE:
236
            max_in_future = 1
237
        elif backfilling == constants.BACKFILLING_CONSERVATIVE:
238
            max_in_future = -1 # Unlimited
239
        elif backfilling == constants.BACKFILLING_INTERMEDIATE:
240
            max_in_future = self.config.get("backfilling-reservations")
241
        
242
        vm_scheduler = VMScheduler(slottable, resourcepool, mapper, max_in_future)
243
    
244
        # Statistics collection 
245
        attrs = dict([(attr, self.config.get_attr(attr)) for attr in self.config.get_attrs()])    
246
        
247
        self.accounting = AccountingDataCollection(self.config.get("datafile"), attrs)
248
        # Load probes
249
        probes = self.config.get("accounting-probes")
250
        probes = probes.split()
251
        for probe in probes:
252
            probe_class = probe_class_mappings.get(probe, probe)
253
            probe_class = import_class(probe_class)
254
            probe_obj = probe_class(self.accounting)
255
            self.accounting.add_probe(probe_obj)    
256
    
257
        # Lease Scheduler
258
        self.scheduler = LeaseScheduler(vm_scheduler, preparation_scheduler, slottable, self.accounting)
259
                                
260
        # Lease request frontends
261
        if mode == "simulated":
262
            if clock == constants.CLOCK_SIMULATED:
263
                # In pure simulation, we can only use the tracefile frontend
264
                self.frontends = [TracefileFrontend(self.clock.get_start_time())]
265
            elif clock == constants.CLOCK_REAL:
266
                # In simulation with a real clock, only the RPC frontend can be used
267
                self.frontends = [RPCFrontend()]             
268
        elif mode == "opennebula":
269
            self.frontends = [OpenNebulaFrontend()]               
270

    
271
        persistence_file = self.config.get("persistence-file")
272
        if persistence_file == "none":
273
            persistence_file = None
274
        self.persistence = PersistenceManager(persistence_file)
275
        
276
        self.logger = logging.getLogger("RM")
277

    
278

    
279
    def init_logging(self):
280
        """Initializes logging
281
        
282
        """
283

    
284
        logger = logging.getLogger("")
285
        if self.daemon:
286
            handler = logging.FileHandler(self.config.get("logfile"))
287
        else:
288
            handler = logging.StreamHandler()
289
        if sys.version_info[1] <= 4:
290
            formatter = logging.Formatter('%(name)-7s %(message)s')
291
        else:
292
            formatter = logging.Formatter('[%(haizeatime)s] %(name)-7s %(message)s')
293
        handler.setFormatter(formatter)
294
        logger.addHandler(handler)
295
        level = logging.getLevelName(self.config.get("loglevel"))
296
        logger.setLevel(level)
297
        logging.setLoggerClass(HaizeaLogger)
298

    
299
        
300
    def daemonize(self):
301
        """Daemonizes the Haizea process.
302
        
303
        Based on code in:  http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/66012
304
        
305
        """
306
        # First fork
307
        try:
308
            pid = os.fork()
309
            if pid > 0: 
310
                # Exit first parent
311
                sys.exit(0) 
312
        except OSError, e:
313
            sys.stderr.write("Failed to daemonize Haizea: (%d) %s\n" % (e.errno, e.strerror))
314
            sys.exit(1)
315
    
316
        # Decouple from parent environment.
317
        os.chdir(".")
318
        os.umask(0)
319
        os.setsid()
320
    
321
        # Second fork
322
        try:
323
            pid = os.fork()
324
            if pid > 0: 
325
                # Exit second parent.
326
                sys.exit(0) 
327
        except OSError, e:
328
            sys.stderr.write("Failed to daemonize Haizea: (%d) %s\n" % (e.errno, e.strerror))
329
            sys.exit(2)
330
            
331
        # Open file descriptors and print start message
332
        si = file(DAEMON_STDIN, 'r')
333
        so = file(DAEMON_STDOUT, 'a+')
334
        se = file(DAEMON_STDERR, 'a+', 0)
335
        pid = os.getpid()
336
        sys.stderr.write("\nStarted Haizea daemon with pid %i\n\n" % pid)
337
        sys.stderr.flush()
338
        file(self.pidfile,'w+').write("%i\n" % pid)
339
        
340
        # Redirect standard file descriptors.
341
        os.close(sys.stdin.fileno())
342
        os.close(sys.stdout.fileno())
343
        os.close(sys.stderr.fileno())
344
        os.dup2(si.fileno(), sys.stdin.fileno())
345
        os.dup2(so.fileno(), sys.stdout.fileno())
346
        os.dup2(se.fileno(), sys.stderr.fileno())
347

    
348
    def start(self):
349
        """Starts the resource manager"""
350
        
351
        self.__initialize()
352

    
353
        self.logger.info("Starting resource manager")
354
        
355
        for frontend in self.frontends:
356
            frontend.load(self)
357
        
358
        if self.daemon:
359
            self.daemonize()
360
        if self.rpc_server:
361
            self.rpc_server.start()
362
            
363
        self.__recover()            
364
            
365
        # Start the clock
366
        try:
367
            self.clock.run()
368
        except UnrecoverableError, exc:
369
            self.__unrecoverable_error(exc)
370
        except Exception, exc:
371
            self.__unexpected_exception(exc)
372

    
373
    def stop(self):
374
        """Stops the resource manager by stopping the clock"""
375
        self.clock.stop()
376
        
377
    def graceful_stop(self):
378
        """Stops the resource manager gracefully and exits"""
379
        
380
        self.logger.status("Stopping resource manager gracefully...")
381
        
382
        # Stop collecting data (this finalizes counters)
383
        self.accounting.stop()
384
        
385
        self.persistence.close()
386
        
387
        # TODO: When gracefully stopping mid-scheduling, we need to figure out what to
388
        #       do with leases that are still running.
389

    
390
        self.print_status()
391
        
392
        # In debug mode, dump the lease descriptors.
393
        for lease in self.scheduler.completed_leases.entries.values():
394
            lease.print_contents()
395
            
396
        # Write all collected data to disk
397
        leases = self.scheduler.completed_leases.entries
398
        self.accounting.save_to_disk(leases)
399
        
400
        # Stop RPC server
401
        if self.rpc_server != None:
402
            self.rpc_server.stop()
403
                    
404
    def process_requests(self, nexttime):
405
        """Process any new requests in the request frontend
406
        
407
        Checks the request frontend to see if there are any new requests that
408
        have to be processed. AR leases are sent directly to the schedule.
409
        Best-effort leases are queued.
410
        
411
        Arguments:
412
        nexttime -- The next time at which the scheduler can allocate resources.
413
                    This is meant to be provided by the clock simply as a sanity
414
                    measure when running in real time (to avoid scheduling something
415
                    "now" to actually have "now" be in the past once the scheduling
416
                    function returns. i.e., nexttime has nothing to do with whether 
417
                    there are resources available at that time or not.
418
        
419
        """        
420
        
421
        # Get requests from frontend
422
        requests = []
423
        for frontend in self.frontends:
424
            requests += frontend.get_accumulated_requests()
425
        requests.sort(key=operator.attrgetter("submit_time"))
426
        
427
        # Request leases and run the scheduling function.
428
        try:
429
            self.logger.vdebug("Requesting leases")
430
            for req in requests:
431
                self.scheduler.request_lease(req)
432

    
433
            self.logger.vdebug("Running scheduling function")
434
            self.scheduler.schedule(nexttime)
435
        except UnrecoverableError, exc:
436
            self.__unrecoverable_error(exc)
437
        except Exception, exc:
438
            self.__unexpected_exception(exc)
439

    
440
    def process_starting_reservations(self, time):
441
        """Process reservations starting/stopping at specified time"""
442
        
443
        # The lease scheduler takes care of this.
444
        try:
445
            self.scheduler.process_starting_reservations(time)
446
        except UnrecoverableError, exc:
447
            self.__unrecoverable_error(exc)
448
        except Exception, exc:
449
            self.__unexpected_exception(exc)
450

    
451
    def process_ending_reservations(self, time):
452
        """Process reservations starting/stopping at specified time"""
453
        
454
        # The lease scheduler takes care of this.
455
        try:
456
            self.scheduler.process_ending_reservations(time)
457
        except UnrecoverableError, exc:
458
            self.__unrecoverable_error(exc)
459
        except Exception, exc:
460
            self.__unexpected_exception(exc)             
461
             
462
    def notify_event(self, lease_id, event):
463
        """Notifies an asynchronous event to Haizea.
464
        
465
        Arguments:
466
        lease_id -- ID of lease that is affected by event
467
        event -- Event (currently, only the constants.EVENT_END_VM event is supported)
468
        """
469
        try:
470
            lease = self.scheduler.get_lease_by_id(lease_id)
471
            self.scheduler.notify_event(lease, event)
472
        except UnrecoverableError, exc:
473
            self.__unrecoverable_error(exc)
474
        except Exception, exc:
475
            self.__unexpected_exception(exc)
476
        
477
    def cancel_lease(self, lease_id):
478
        """Cancels a lease.
479
        
480
        Arguments:
481
        lease_id -- ID of lease to cancel
482
        """    
483
        try:
484
            lease = self.scheduler.get_lease_by_id(lease_id)
485
            self.scheduler.cancel_lease(lease)
486
        except UnrecoverableError, exc:
487
            self.__unrecoverable_error(exc)
488
        except Exception, exc:
489
            self.__unexpected_exception(exc)
490
            
491
    def get_next_changepoint(self):
492
        """Return next changepoint in the slot table"""
493
        return self.scheduler.slottable.get_next_changepoint(self.clock.get_time())
494
   
495
    def exists_more_leases(self):
496
        """Return True if there are any leases still "in the system" """
497
        return self.scheduler.exists_scheduled_leases() or not self.scheduler.is_queue_empty()
498

    
499
    def print_status(self):
500
        """Prints status summary."""
501
        
502
        leases = self.scheduler.leases.get_leases()
503
        completed_leases = self.scheduler.completed_leases.get_leases()
504
        self.logger.status("--- Haizea status summary ---")
505
        self.logger.status("Number of leases (not including completed): %i" % len(leases))
506
        self.logger.status("Completed leases: %i" % len(completed_leases))
507
        self.logger.status("---- End summary ----")        
508

    
509
    def __recover(self):
510
        """Loads persisted leases and scheduling information
511
        
512
        This method does three things:
513
        1. Recover persisted leases. Note that not all persisted leases
514
           may be recoverable. For example, if a lease was scheduled
515
           to start at a certain time, but that time passed while
516
           Haizea was not running, the lease will simply be transitioned
517
           to a failed state.
518
        2. Recover the queue.
519
        3. Recover the list of "future leases" as determined by
520
           the backfilling algorithm.
521
        """
522
        
523
        # Load leases
524
        leases = self.persistence.get_leases()
525
        for lease in leases:
526
            # Create a list of RRs
527
            rrs = lease.preparation_rrs + lease.vm_rrs
528
            for vmrr in lease.vm_rrs:
529
                rrs += vmrr.pre_rrs + vmrr.post_rrs
530

    
531
            # Bind resource tuples in RRs to slot table
532
            for rr in rrs:
533
                for restuple in rr.resources_in_pnode.values():
534
                    restuple.slottable = self.scheduler.slottable
535

    
536
            self.logger.debug("Attempting to recover lease %i" % lease.id)
537
            lease.print_contents()
538
            
539
            # Check the lease's state and determine how to proceed.
540
            load_rrs = False
541
            lease_state = lease.get_state()
542
            if lease_state in (Lease.STATE_DONE, Lease.STATE_CANCELLED, Lease.STATE_REJECTED, Lease.STATE_FAIL):
543
                self.logger.info("Recovered lease %i (already done)" % lease.id)
544
                self.scheduler.completed_leases.add(lease)
545
            elif lease_state in (Lease.STATE_NEW, Lease.STATE_PENDING):
546
                self.scheduler.leases.add(lease)
547
            elif lease_state == Lease.STATE_QUEUED:
548
                load_rrs = True
549
                self.scheduler.leases.add(lease)
550
                self.logger.info("Recovered lease %i (queued)" % lease.id)
551
            elif lease_state in (Lease.STATE_SCHEDULED, Lease.STATE_READY):
552
                # Check if schedule is still valid.
553
                vmrr = lease.get_last_vmrr()
554
                if len(vmrr.pre_rrs) > 0:
555
                    start = vmrr.pre_rrs[0].start
556
                else:
557
                    start = vmrr.start
558
                if self.clock.get_time() < start:
559
                    load_rrs = True
560
                    self.scheduler.leases.add(lease)
561
                    self.logger.info("Recovered lease %i" % lease.id)
562
                else:
563
                    lease.set_state(Lease.STATE_FAIL)
564
                    self.scheduler.completed_leases.add(lease)
565
                    self.logger.info("Could not recover lease %i (scheduled starting time has passed)" % lease.id)
566
            elif lease_state == Lease.STATE_ACTIVE:
567
                vmrr = lease.get_last_vmrr()
568
                if self.clock.get_time() < self.clock.get_time():
569
                    # TODO: Check if VMs are actually running
570
                    load_rrs = True
571
                    self.scheduler.leases.add(lease)
572
                    self.logger.info("Recovered lease %i" % lease.id)
573
                else:
574
                    # TODO: May have to stop extant virtual machines
575
                    lease.set_state(Lease.STATE_FAIL)
576
                    self.scheduler.completed_leases.add(lease)                    
577
                    self.logger.info("Could not recover lease %i (scheduled ending time has passed)" % lease.id)
578
            else:
579
                # No support for recovering lease in the
580
                # remaining states
581
                lease.set_state(Lease.STATE_FAIL)
582
                self.scheduler.completed_leases.add(lease)                    
583
                self.logger.info("Could not recover lease %i (unsupported state %i for recovery)" % (lease.id, lease_state))
584
                
585
            # Load the lease's RRs into the slot table
586
            if load_rrs:
587
                for rr in rrs:
588
                    if rr.state in (ResourceReservation.STATE_ACTIVE, ResourceReservation.STATE_SCHEDULED):
589
                        self.scheduler.slottable.add_reservation(rr)
590
                
591
        # Rebuild the queue        
592
        queue = self.persistence.get_queue()
593
        for lease_id in queue:
594
            if self.scheduler.leases.has_lease(lease_id):
595
                lease = self.scheduler.leases.get_lease(lease_id)
596
                self.scheduler.queue.enqueue(lease)
597

    
598
        # Rebuild the "future leases"
599
        future = self.persistence.get_future_leases()
600
        for lease_id in future:
601
            if self.scheduler.leases.has_lease(lease_id):
602
                lease = self.scheduler.leases.get_lease(lease_id)
603
                self.scheduler.vm_scheduler.future_leases.add(lease)
604

    
605

    
606
    def __unrecoverable_error(self, exc):
607
        """Handles an unrecoverable error.
608
        
609
        This method prints information on the unrecoverable error and makes Haizea panic.
610
        """
611
        self.logger.error("An unrecoverable error has happened.")
612
        self.logger.error("Original exception:")
613
        self.__print_exception(exc.exc, exc.get_traceback())
614
        self.logger.error("Unrecoverable error traceback:")
615
        self.__print_exception(exc, sys.exc_info()[2])
616
        self.__panic()
617

    
618
    def __unexpected_exception(self, exc):
619
        """Handles an unrecoverable error.
620
        
621
        This method prints information on the unrecoverable error and makes Haizea panic.
622
        """
623
        self.logger.error("An unexpected exception has happened.")
624
        self.__print_exception(exc, sys.exc_info()[2])
625
        self.__panic()
626
            
627
    def __print_exception(self, exc, exc_traceback):
628
        """Prints an exception's traceback to the log."""
629
        tb = traceback.format_tb(exc_traceback)
630
        for line in tb:
631
            self.logger.error(line)
632
        self.logger.error("Message: %s" % exc)
633

    
634
    
635
    def __panic(self):
636
        """Makes Haizea crash and burn in a panicked frenzy"""
637
        
638
        self.logger.status("Panicking...")
639

    
640
        # Stop RPC server
641
        if self.rpc_server != None:
642
            self.rpc_server.stop()
643

    
644
        # Dump state
645
        self.print_status()
646
        self.logger.error("Next change point (in slot table): %s" % self.get_next_changepoint())
647

    
648
        # Print lease descriptors
649
        leases = self.scheduler.leases.get_leases()
650
        if len(leases)>0:
651
            self.logger.vdebug("vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv")
652
            for lease in leases:
653
                lease.print_contents()
654
            self.logger.vdebug("^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^")        
655

    
656
        # Exit
657
        treatment = self.config.get("lease-failure-handling")
658
        if treatment == constants.ONFAILURE_EXIT_RAISE:
659
            raise
660
        else:
661
            exit(1)
662

    
663
            
664
class Clock(object):
665
    """Base class for the resource manager's clock.
666
    
667
    The clock is in charge of periodically waking the resource manager so it
668
    will process new requests and handle existing reservations. This is a
669
    base class defining abstract methods.
670
    
671
    """
672
    def __init__(self, manager):
673
        self.manager = manager
674
        self.done = False
675
    
676
    def get_time(self): 
677
        """Return the current time"""
678
        return abstract()
679

    
680
    def get_start_time(self): 
681
        """Return the time at which the clock started ticking"""
682
        return abstract()
683
    
684
    def get_next_schedulable_time(self): 
685
        """Return the next time at which resources could be scheduled.
686
        
687
        The "next schedulable time" server sanity measure when running 
688
        in real time (to avoid scheduling something "now" to actually 
689
        have "now" be in the past once the scheduling function returns. 
690
        i.e., the "next schedulable time" has nothing to do with whether 
691
        there are resources available at that time or not.
692
        """
693
        return abstract()
694
    
695
    def run(self):
696
        """Start and run the clock. This function is, in effect,
697
        the main loop of the resource manager."""
698
        return abstract()     
699

    
700
    def stop(self):
701
        """Stop the clock.
702
        
703
        Stopping the clock makes Haizea exit.
704
        """
705
        self.done = True    
706
    
707
        
708
class SimulatedClock(Clock):
709
    """Simulates the passage of time... really fast.
710
    
711
    The simulated clock steps through time to produce an ideal schedule.
712
    See the run() function for a description of how time is incremented
713
    exactly in the simulated clock.
714
    
715
    """
716
    
717
    def __init__(self, manager, starttime):
718
        """Initialize the simulated clock, starting at the provided starttime"""
719
        Clock.__init__(self, manager)
720
        self.starttime = starttime
721
        self.time = starttime
722
        self.logger = logging.getLogger("CLOCK")
723
        self.statusinterval = self.manager.config.get("status-message-interval")
724
       
725
    def get_time(self):
726
        """See docstring in base Clock class."""
727
        return self.time
728
    
729
    def get_start_time(self):
730
        """See docstring in base Clock class."""
731
        return self.starttime
732

    
733
    def get_next_schedulable_time(self):
734
        """See docstring in base Clock class."""
735
        return self.time    
736
    
737
    def run(self):
738
        """Runs the simulated clock through time.
739
        
740
        The clock starts at the provided start time. At each point in time,
741
        it wakes up the resource manager and then skips to the next time
742
        where "something" is happening (see __get_next_time for a more
743
        rigorous description of this).
744
        
745
        The clock stops when there is nothing left to do (no pending or 
746
        queue requests, and no future reservations)
747
        
748
        The simulated clock can only work in conjunction with the
749
        tracefile request frontend.
750
        """
751
        self.logger.status("Starting simulated clock")
752
        self.manager.accounting.start(self.get_start_time())
753
        prevstatustime = self.time
754
        
755
        # Main loop
756
        while not self.done:
757
            # Check if there are any changes in the resource pool
758
            new_nodes = self.manager.scheduler.vm_scheduler.resourcepool.refresh_nodes()      
759
            for n in new_nodes:
760
                rt = slottable.create_resource_tuple_from_capacity(n.capacity)
761
                slottable.add_node(n.id, rt)   
762
            
763
            # Check to see if there are any leases which are ending prematurely.
764
            # Note that this is unique to simulation.
765
            prematureends = self.manager.scheduler.slottable.get_prematurely_ending_res(self.time)
766
            
767
            # Notify the resource manager about the premature ends
768
            for rr in prematureends:
769
                self.manager.notify_event(rr.lease.id, constants.EVENT_END_VM)
770
                
771
            # Process reservations starting/stopping at the current time and
772
            # check if there are any new requests.
773
            self.manager.process_ending_reservations(self.time)
774
            self.manager.process_starting_reservations(self.time)
775
            self.manager.process_requests(self.time)
776
            
777
            # Since processing requests may have resulted in new reservations
778
            # starting now, we process reservations again.
779
            self.manager.process_starting_reservations(self.time)
780
            # And one final call to deal with nil-duration reservations
781
            self.manager.process_ending_reservations(self.time)
782
            
783
            self.manager.accounting.at_timestep(self.manager.scheduler)
784
            
785
            # Print a status message
786
            if self.statusinterval != None and (self.time - prevstatustime).minutes >= self.statusinterval:
787
                self.manager.print_status()
788
                prevstatustime = self.time
789
                
790
            # Skip to next point in time.
791
            self.time, self.done = self.__get_next_time()
792
                    
793
        self.logger.status("Simulated clock has stopped")
794

    
795
        # Stop the resource manager
796
        self.manager.graceful_stop()
797
        
798
    
799
    def __get_next_time(self):
800
        """Determines what is the next point in time to skip to.
801
        
802
        At a given point in time, the next time is the earliest of the following:
803
        * The arrival of the next lease request
804
        * The start or end of a reservation (a "changepoint" in the slot table)
805
        * A premature end of a lease
806
        """
807
        
808
        # Determine candidate next times
809
        tracefrontend = self.__get_trace_frontend()
810
        nextchangepoint = self.manager.get_next_changepoint()
811
        nextprematureend = self.manager.scheduler.slottable.get_next_premature_end(self.time)
812
        nextreqtime = tracefrontend.get_next_request_time()
813
        self.logger.debug("Next change point (in slot table): %s" % nextchangepoint)
814
        self.logger.debug("Next request time: %s" % nextreqtime)
815
        self.logger.debug("Next premature end: %s" % nextprematureend)
816
        
817
        # The previous time is now
818
        prevtime = self.time
819
        
820
        # We initialize the next time to now too, to detect if
821
        # we've been unable to determine what the next time is.
822
        newtime = self.time
823
        
824
        # Find the earliest of the three, accounting for None values
825
        if nextchangepoint != None and nextreqtime == None:
826
            newtime = nextchangepoint
827
        elif nextchangepoint == None and nextreqtime != None:
828
            newtime = nextreqtime
829
        elif nextchangepoint != None and nextreqtime != None:
830
            newtime = min(nextchangepoint, nextreqtime)
831
            
832
        if nextprematureend != None:
833
            newtime = min(nextprematureend, newtime)
834
                        
835
        # If there's no more leases in the system, and no more pending requests,
836
        # then we're done.
837
        if not self.manager.exists_more_leases() and not tracefrontend.exists_more_requests():
838
            self.done = True
839
        
840
        # We can also be done if we've specified that we want to stop when
841
        # the best-effort requests are all done or when they've all been submitted.
842
        stopwhen = self.manager.config.get("stop-when")
843
        besteffort = self.manager.scheduler.leases.get_leases(type = Lease.BEST_EFFORT)
844
        pendingbesteffort = [r for r in tracefrontend.requests if r.get_type() == Lease.BEST_EFFORT]
845
        if stopwhen == constants.STOPWHEN_BEDONE:
846
            if self.manager.scheduler.is_queue_empty() and len(besteffort) + len(pendingbesteffort) == 0:
847
                self.done = True
848
        elif stopwhen == constants.STOPWHEN_BESUBMITTED:
849
            if len(pendingbesteffort) == 0:
850
                self.done = True
851
                
852
        # If we didn't arrive at a new time, and we're not done, we've fallen into
853
        # an infinite loop. This is A Bad Thing(tm).
854
        if newtime == prevtime and self.done != True:
855
            raise Exception, "Simulated clock has fallen into an infinite loop."
856
        
857
        return newtime, self.done
858

    
859
    def __get_trace_frontend(self):
860
        """Gets the tracefile frontend from the resource manager"""
861
        frontends = self.manager.frontends
862
        tracef = [f for f in frontends if isinstance(f, TracefileFrontend)]
863
        if len(tracef) != 1:
864
            raise Exception, "The simulated clock can only work with a tracefile request frontend."
865
        else:
866
            return tracef[0] 
867
        
868
        
869
class RealClock(Clock):
870
    """A realtime clock.
871
    
872
    The real clock wakes up periodically to, in turn, tell the resource manager
873
    to wake up. The real clock can also be run in a "fastforward" mode for
874
    debugging purposes (however, unlike the simulated clock, the clock will
875
    always skip a fixed amount of time into the future).
876
    """
877
    def __init__(self, manager, quantum, non_sched, fastforward = False):
878
        """Initializes the real clock.
879
        
880
        Arguments:
881
        manager -- the resource manager
882
        quantum -- interval between clock wakeups
883
        fastforward -- if True, the clock won't actually sleep
884
                       for the duration of the quantum."""
885
        Clock.__init__(self, manager)
886
        self.fastforward = fastforward
887
        if not self.fastforward:
888
            self.lastwakeup = None
889
        else:
890
            self.lastwakeup = round_datetime(now())
891
        self.logger = logging.getLogger("CLOCK")
892
        self.starttime = self.get_time()
893
        self.nextschedulable = None
894
        self.nextperiodicwakeup = None
895
        self.quantum = TimeDelta(seconds=quantum)
896
        self.non_sched = TimeDelta(seconds=non_sched)
897
               
898
    def get_time(self):
899
        """See docstring in base Clock class."""
900
        if not self.fastforward:
901
            return now()
902
        else:
903
            return self.lastwakeup
904
    
905
    def get_start_time(self):
906
        """See docstring in base Clock class."""
907
        return self.starttime
908

    
909
    def get_next_schedulable_time(self):
910
        """See docstring in base Clock class."""
911
        return self.nextschedulable    
912
    
913
    def run(self):
914
        """Runs the real clock through time.
915
        
916
        The clock starts when run() is called. In each iteration of the main loop
917
        it will do the following:
918
        - Wake up the resource manager
919
        - Determine if there will be anything to do before the next
920
          time the clock will wake up (after the quantum has passed). Note
921
          that this information is readily available on the slot table.
922
          If so, set next-wakeup-time to (now + time until slot table
923
          event). Otherwise, set it to (now + quantum)
924
        - Sleep until next-wake-up-time
925
        
926
        The clock keeps on tickin' until a SIGINT signal (Ctrl-C if running in the
927
        foreground) or a SIGTERM signal is received.
928
        """
929
        self.logger.status("Starting clock")
930
        self.manager.accounting.start(self.get_start_time())
931
        
932
        try:
933
            signal.signal(signal.SIGINT, self.signalhandler_gracefulstop)
934
            signal.signal(signal.SIGTERM, self.signalhandler_gracefulstop)
935
        except ValueError, exc:
936
            # This means Haizea is not the main thread, which will happen
937
            # when running it as part of a py.test. We simply ignore this
938
            # to allow the test to continue.
939
            pass
940
        
941
        # Main loop
942
        while not self.done:
943
            self.logger.status("Waking up to manage resources")
944
                        
945
            # Save the waking time. We want to use a consistent time in the 
946
            # resource manager operations (if we use now(), we'll get a different
947
            # time every time)
948
            if not self.fastforward:
949
                self.lastwakeup = round_datetime(self.get_time())
950
            self.logger.status("Wake-up time recorded as %s" % self.lastwakeup)
951
                
952
            # Next schedulable time
953
            self.nextschedulable = round_datetime(self.lastwakeup + self.non_sched)
954
            
955
            # Check if there are any changes in the resource pool
956
            new_nodes = self.manager.scheduler.vm_scheduler.resourcepool.refresh_nodes()      
957
            for n in new_nodes:
958
                rt = self.manager.scheduler.slottable.create_resource_tuple_from_capacity(n.capacity)
959
                self.manager.scheduler.slottable.add_node(n.id, rt)                  
960
            
961
            # Wake up the resource manager
962
            self.manager.process_ending_reservations(self.lastwakeup)
963
            self.manager.process_starting_reservations(self.lastwakeup)
964
            # TODO: Compute nextschedulable here, before processing requests
965
            self.manager.process_requests(self.nextschedulable)
966

    
967
            self.manager.accounting.at_timestep(self.manager.scheduler)
968
            
969
            # Next wakeup time
970
            time_now = now()
971
            if self.lastwakeup + self.quantum <= time_now:
972
                quantums = (time_now - self.lastwakeup) / self.quantum
973
                quantums = int(ceil(quantums)) * self.quantum
974
                self.nextperiodicwakeup = round_datetime(self.lastwakeup + quantums)
975
            else:
976
                self.nextperiodicwakeup = round_datetime(self.lastwakeup + self.quantum)
977
            
978
            # Determine if there's anything to do before the next wakeup time
979
            nextchangepoint = self.manager.get_next_changepoint()
980
            if nextchangepoint != None and nextchangepoint <= self.nextperiodicwakeup:
981
                # We need to wake up earlier to handle a slot table event
982
                nextwakeup = nextchangepoint
983
                self.logger.status("Going back to sleep. Waking up at %s to handle slot table event." % nextwakeup)
984
            else:
985
                # Nothing to do before waking up
986
                nextwakeup = self.nextperiodicwakeup
987
                self.logger.status("Going back to sleep. Waking up at %s to see if something interesting has happened by then." % nextwakeup)
988
            
989
            # The only exit condition from the real clock is if the stop_when_no_more_leases
990
            # is set to True, and there's no more work left to do.
991
            # TODO: This first if is a kludge. Other options should only interact with
992
            # options through the configfile's get method. The "stop-when-no-more-leases"
993
            # option is currently OpenNebula-specific (while the real clock isn't; it can
994
            # be used by both the simulator and the OpenNebula mode). This has to be
995
            # fixed.            
996
            if self.manager.config._options.has_key("stop-when-no-more-leases"):
997
                stop_when_no_more_leases = self.manager.config.get("stop-when-no-more-leases")
998
                if stop_when_no_more_leases and not self.manager.exists_more_leases():
999
                    self.done = True
1000
            
1001
            # Sleep
1002
            if not self.done:
1003
                if not self.fastforward:
1004
                    sleep((nextwakeup - now()).seconds)
1005
                else:
1006
                    self.lastwakeup = nextwakeup
1007

    
1008
        self.logger.status("Real clock has stopped")
1009

    
1010
        # Stop the resource manager
1011
        self.manager.graceful_stop()
1012
    
1013
    def signalhandler_gracefulstop(self, signum, frame):
1014
        """Handler for SIGTERM and SIGINT. Allows Haizea to stop gracefully."""
1015
        
1016
        sigstr = ""
1017
        if signum == signal.SIGTERM:
1018
            sigstr = " (SIGTERM)"
1019
        elif signum == signal.SIGINT:
1020
            sigstr = " (SIGINT)"
1021
        self.logger.status("Received signal %i%s" %(signum, sigstr))
1022
        self.done = True
1023

    
1024

    
1025
class PersistenceManager(object):    
1026
    """Persistence manager.
1027
    
1028
    The persistence manager is in charge of persisting leases, and some
1029
    scheduling data, to disk. This allows Haizea to recover from crashes.
1030
    """    
1031
    
1032
    def __init__(self, file):
1033
        """Constructor
1034
        
1035
        Initializes the persistence manager. If the specified file
1036
        does not exist, it is created. If the file is created, it
1037
        is opened but the information is not recovered (this is
1038
        the responsibility of the Manager class)
1039
        
1040
        Arguments:
1041
        file -- Persistence file. If None is specified, then
1042
                persistence is disabled and Haizea will run entirely
1043
                in-memory.
1044
        """
1045
        if file == None:
1046
            self.disabled = True
1047
            self.shelf = {}
1048
        else:
1049
            self.disabled = False
1050
            file = os.path.expanduser(file)
1051
            d = os.path.dirname(file)
1052
            if not os.path.exists(d):
1053
                os.makedirs(d)
1054
            self.shelf = shelve.open(file, flag='c', protocol = -1)
1055
        
1056
    def persist_lease(self, lease):
1057
        """Persists a single lease to disk
1058
                
1059
        Arguments:
1060
        lease -- Lease to persist
1061
        """        
1062
        if not self.disabled:
1063
            self.shelf["lease-%i" % lease.id] = lease
1064
            self.shelf.sync()
1065

    
1066
    def persist_queue(self, queue):
1067
        """Persists the queue to disk
1068
                
1069
        Arguments:
1070
        queue -- The queue
1071
        """        
1072
        if not self.disabled:
1073
            self.shelf["queue"] = [l.id for l in queue]
1074
            self.shelf.sync()
1075
        
1076
    def persist_future_leases(self, leases):
1077
        """Persists the set of future leases
1078
                
1079
        Arguments:
1080
        leases -- "Future leases" (as determined by backfilling algorithm)
1081
        """              
1082
        if not self.disabled:
1083
            self.shelf["future"] = [l.id for l in leases]        
1084
            self.shelf.sync()
1085
        
1086
    def get_leases(self):
1087
        """Returns the leases persisted to disk.
1088
                
1089
        """              
1090
        return [v for k,v in self.shelf.items() if k.startswith("lease-")]
1091
    
1092
    def get_queue(self):
1093
        """Returns the queue persisted to disk.
1094
                
1095
        """              
1096
        if self.shelf.has_key("queue"):
1097
            return self.shelf["queue"]
1098
        else:
1099
            return []
1100
        
1101
    def get_future_leases(self):
1102
        """Returns the future leases persisted to disk.
1103
                
1104
        """              
1105
        if self.shelf.has_key("future"):
1106
            return self.shelf["future"]
1107
        else:
1108
            return []        
1109
    
1110
    def close(self):
1111
        """Closes the persistence manager.
1112
        
1113
        Closing the persistence manager saves any remaining
1114
        data to disk.
1115
        """              
1116
        if not self.disabled:
1117
            self.shelf.close()
1118
        
1119