Project

General

Profile

root / trunk / src / haizea / core / manager.py @ 676

1
# -------------------------------------------------------------------------- #
2
# Copyright 2006-2009, University of Chicago                                 #
3
# Copyright 2008-2009, Distributed Systems Architecture Group, Universidad   #
4
# Complutense de Madrid (dsa-research.org)                                   #
5
#                                                                            #
6
# Licensed under the Apache License, Version 2.0 (the "License"); you may    #
7
# not use this file except in compliance with the License. You may obtain    #
8
# a copy of the License at                                                   #
9
#                                                                            #
10
# http://www.apache.org/licenses/LICENSE-2.0                                 #
11
#                                                                            #
12
# Unless required by applicable law or agreed to in writing, software        #
13
# distributed under the License is distributed on an "AS IS" BASIS,          #
14
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.   #
15
# See the License for the specific language governing permissions and        #
16
# limitations under the License.                                             #
17
# -------------------------------------------------------------------------- #
18

    
19
"""The manager (resource manager) module is the root of Haizea. If you want to
20
see where the ball starts rolling, look at the following two functions:
21

22
* manager.Manager.__init__()
23
* manager.Manager.start()
24

25
This module provides the following classes:
26

27
* Manager: Haizea itself. Pretty much everything else
28
  is contained in this class.
29
* Clock: A base class for Haizea's clock.
30
* SimulatedClock: A clock for simulations.
31
* RealClock: A clock that advances in realtime.
32
"""
33
 
34
import haizea.common.constants as constants
35
from haizea.core.scheduler.preparation_schedulers.unmanaged import UnmanagedPreparationScheduler
36
from haizea.core.scheduler.preparation_schedulers.imagetransfer import ImageTransferPreparationScheduler
37
from haizea.core.enact.opennebula import OpenNebulaResourcePoolInfo, OpenNebulaVMEnactment, OpenNebulaDummyDeploymentEnactment
38
from haizea.core.enact.simulated import SimulatedResourcePoolInfo, SimulatedVMEnactment, SimulatedDeploymentEnactment
39
from haizea.core.frontends.tracefile import TracefileFrontend
40
from haizea.core.frontends.opennebula import OpenNebulaFrontend
41
from haizea.core.frontends.rpc import RPCFrontend
42
from haizea.core.accounting import AccountingDataCollection
43
from haizea.core.scheduler import UnrecoverableError
44
from haizea.core.scheduler.lease_scheduler import LeaseScheduler
45
from haizea.core.scheduler.vm_scheduler import VMScheduler
46
from haizea.core.scheduler.mapper import class_mappings as mapper_mappings
47
from haizea.core.scheduler.slottable import SlotTable, ResourceReservation
48
from haizea.core.scheduler.policy import PolicyManager
49
from haizea.core.scheduler.resourcepool import ResourcePool, ResourcePoolWithReusableImages
50
from haizea.core.leases import Lease, Site
51
from haizea.core.log import HaizeaLogger
52
from haizea.core.rpcserver import RPCServer
53
from haizea.common.utils import abstract, round_datetime, Singleton, import_class, OpenNebulaXMLRPCClientSingleton
54
from haizea.common.opennebula_xmlrpc import OpenNebulaXMLRPCClient
55
from haizea.pluggable.policies import admission_class_mappings, preemption_class_mappings, host_class_mappings 
56
from haizea.pluggable.accounting import probe_class_mappings
57

    
58
import operator
59
import logging
60
import signal
61
import sys, os
62
import traceback
63
import shelve
64
import socket
65
from time import sleep
66
from math import ceil
67
from mx.DateTime import now, TimeDelta
68

    
69
DAEMON_STDOUT = DAEMON_STDIN = "/dev/null"
70
DAEMON_STDERR = "/var/tmp/haizea.err"
71
DEFAULT_LOGFILE = "/var/tmp/haizea.log"
72

    
73
class Manager(object):
74
    """The root of Haizea
75
    
76
    This class is the root of Haizea. Pretty much everything else (scheduler,
77
    enactment modules, etc.) is contained in this class. The Manager
78
    class is meant to be a singleton.
79
    
80
    """
81
    
82
    __metaclass__ = Singleton
83
    
84
    def __init__(self, config, daemon=False, pidfile=None):
85
        """Initializes the manager.
86
        
87
        Argument:
88
        config -- a populated instance of haizea.common.config.RMConfig
89
        daemon -- True if Haizea must run as a daemon, False if it must
90
                  run in the foreground
91
        pidfile -- When running as a daemon, file to save pid to
92
        """
93
        self.config = config
94
        
95
        # Create the RM components
96
        
97
        mode = config.get("mode")
98
        
99
        self.daemon = daemon
100
        self.pidfile = pidfile
101

    
102
        if mode == "simulated":
103
            # Simulated-time simulations always run in the foreground
104
            clock = self.config.get("clock")
105
            if clock == constants.CLOCK_SIMULATED:
106
                self.daemon = False
107
        elif mode == "opennebula":
108
            clock = constants.CLOCK_REAL        
109
        
110
        self.init_logging()
111
                
112
        if clock == constants.CLOCK_SIMULATED:
113
            starttime = self.config.get("starttime")
114
            self.clock = SimulatedClock(self, starttime)
115
            self.rpc_server = None
116
        elif clock == constants.CLOCK_REAL:
117
            wakeup_interval = self.config.get("wakeup-interval")
118
            non_sched = self.config.get("non-schedulable-interval")
119
            if mode == "opennebula":
120
                fastforward = self.config.get("dry-run")
121
            else:
122
                fastforward = False
123
            self.clock = RealClock(self, wakeup_interval, non_sched, fastforward)
124
            if fastforward:
125
                # No need for an RPC server when doing a dry run
126
                self.rpc_server = None
127
            else:
128
                self.rpc_server = RPCServer(self)
129
                    
130
        # Create the RPC singleton client for OpenNebula mode
131
        if mode == "opennebula":
132
            host = self.config.get("one.host")
133
            port = self.config.get("one.port")
134
            rv = OpenNebulaXMLRPCClient.get_userpass_from_env()
135
            if rv == None:
136
                print "ONE_AUTH environment variable is not set"
137
                exit(1)
138
            else:
139
                user, passw = rv[0], rv[1]
140
                try:
141
                    OpenNebulaXMLRPCClientSingleton(host, port, user, passw)
142
                except socket.error, e:
143
                    print "Unable to connect to OpenNebula"
144
                    print "Reason: %s" % e
145
                    exit(1)
146
                    
147
        # Enactment modules
148
        if mode == "simulated":
149
            resources = self.config.get("simul.resources")
150
            if resources == "in-tracefile":
151
                tracefile = self.config.get("tracefile")
152
                site = Site.from_lwf_file(tracefile)
153
            elif resources.startswith("file:"):
154
                sitefile = resources.split(":")
155
                site = Site.from_xml_file(sitefile)
156
            else:
157
                site = Site.from_resources_string(resources)
158
    
159
            deploy_bandwidth = config.get("imagetransfer-bandwidth")
160
            info_enact = SimulatedResourcePoolInfo(site)
161
            vm_enact = SimulatedVMEnactment()
162
            deploy_enact = SimulatedDeploymentEnactment(deploy_bandwidth)
163
        elif mode == "opennebula":
164
            # Enactment modules
165
            info_enact = OpenNebulaResourcePoolInfo()
166
            vm_enact = OpenNebulaVMEnactment()
167
            # No deployment in OpenNebula. Using dummy one for now.
168
            deploy_enact = OpenNebulaDummyDeploymentEnactment()            
169

    
170
        if mode == "simulated":
171
            preparation_type = self.config.get("lease-preparation")
172
        elif mode == "opennebula":
173
            # No deployment in OpenNebula.
174
            preparation_type = constants.PREPARATION_UNMANAGED
175

    
176
        # Resource pool
177
        if preparation_type == constants.PREPARATION_TRANSFER:
178
            if self.config.get("diskimage-reuse") == constants.REUSE_IMAGECACHES:
179
                resourcepool = ResourcePoolWithReusableImages(info_enact, vm_enact, deploy_enact)
180
            else:
181
                resourcepool = ResourcePool(info_enact, vm_enact, deploy_enact)
182
        else:
183
            resourcepool = ResourcePool(info_enact, vm_enact, deploy_enact)
184
    
185
        # Slot table
186
        slottable = SlotTable(info_enact.get_resource_types())
187
        for n in resourcepool.get_nodes() + resourcepool.get_aux_nodes():
188
            rt = slottable.create_resource_tuple_from_capacity(n.capacity)
189
            slottable.add_node(n.id, rt)
190

    
191
        # Policy manager
192
        admission = self.config.get("policy.admission")
193
        admission = admission_class_mappings.get(admission, admission)
194
        admission = import_class(admission)
195
        admission = admission(slottable)
196
        
197
        preemption = self.config.get("policy.preemption")
198
        preemption = preemption_class_mappings.get(preemption, preemption)
199
        preemption = import_class(preemption)
200
        preemption = preemption(slottable)
201

    
202
        host_selection = self.config.get("policy.host-selection")
203
        host_selection = host_class_mappings.get(host_selection, host_selection)
204
        host_selection = import_class(host_selection)
205
        host_selection = host_selection(slottable)
206

    
207
        self.policy = PolicyManager(admission, preemption, host_selection)
208

    
209
        # Preparation scheduler
210
        if preparation_type == constants.PREPARATION_UNMANAGED:
211
            preparation_scheduler = UnmanagedPreparationScheduler(slottable, resourcepool, deploy_enact)
212
        elif preparation_type == constants.PREPARATION_TRANSFER:
213
            preparation_scheduler = ImageTransferPreparationScheduler(slottable, resourcepool, deploy_enact)    
214
    
215
        # VM mapper and scheduler
216
        mapper = self.config.get("mapper")
217
        mapper = mapper_mappings.get(mapper, mapper)
218
        mapper = import_class(mapper)
219
        mapper = mapper(slottable, self.policy)
220
        
221
        # When using backfilling, set the number of leases that can be
222
        # scheduled in the future.
223
        backfilling = self.config.get("backfilling")
224
        if backfilling == constants.BACKFILLING_OFF:
225
            max_in_future = 0
226
        elif backfilling == constants.BACKFILLING_AGGRESSIVE:
227
            max_in_future = 1
228
        elif backfilling == constants.BACKFILLING_CONSERVATIVE:
229
            max_in_future = -1 # Unlimited
230
        elif backfilling == constants.BACKFILLING_INTERMEDIATE:
231
            max_in_future = self.config.get("backfilling-reservations")
232
        
233
        vm_scheduler = VMScheduler(slottable, resourcepool, mapper, max_in_future)
234
    
235
        # Statistics collection 
236
        attrs = dict([(attr, self.config.get_attr(attr)) for attr in self.config.get_attrs()])    
237
        
238
        self.accounting = AccountingDataCollection(self.config.get("datafile"), attrs)
239
        # Load probes
240
        probes = self.config.get("accounting-probes")
241
        probes = probes.split()
242
        for probe in probes:
243
            probe_class = probe_class_mappings.get(probe, probe)
244
            probe_class = import_class(probe_class)
245
            probe_obj = probe_class(self.accounting)
246
            self.accounting.add_probe(probe_obj)    
247
    
248
        # Lease Scheduler
249
        self.scheduler = LeaseScheduler(vm_scheduler, preparation_scheduler, slottable, self.accounting)
250
                                
251
        # Lease request frontends
252
        if mode == "simulated":
253
            if clock == constants.CLOCK_SIMULATED:
254
                # In pure simulation, we can only use the tracefile frontend
255
                self.frontends = [TracefileFrontend(self.clock.get_start_time())]
256
            elif clock == constants.CLOCK_REAL:
257
                # In simulation with a real clock, only the RPC frontend can be used
258
                self.frontends = [RPCFrontend()]             
259
        elif mode == "opennebula":
260
            self.frontends = [OpenNebulaFrontend()]               
261

    
262
        persistence_file = self.config.get("persistence-file")
263
        if persistence_file == "none":
264
            persistence_file = None
265
        self.persistence = PersistenceManager(persistence_file)
266
        
267
        self.logger = logging.getLogger("RM")
268

    
269

    
270
    def init_logging(self):
271
        """Initializes logging
272
        
273
        """
274

    
275
        logger = logging.getLogger("")
276
        if self.daemon:
277
            handler = logging.FileHandler(self.config.get("logfile"))
278
        else:
279
            handler = logging.StreamHandler()
280
        if sys.version_info[1] <= 4:
281
            formatter = logging.Formatter('%(name)-7s %(message)s')
282
        else:
283
            formatter = logging.Formatter('[%(haizeatime)s] %(name)-7s %(message)s')
284
        handler.setFormatter(formatter)
285
        logger.addHandler(handler)
286
        level = logging.getLevelName(self.config.get("loglevel"))
287
        logger.setLevel(level)
288
        logging.setLoggerClass(HaizeaLogger)
289

    
290
        
291
    def daemonize(self):
292
        """Daemonizes the Haizea process.
293
        
294
        Based on code in:  http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/66012
295
        
296
        """
297
        # First fork
298
        try:
299
            pid = os.fork()
300
            if pid > 0: 
301
                # Exit first parent
302
                sys.exit(0) 
303
        except OSError, e:
304
            sys.stderr.write("Failed to daemonize Haizea: (%d) %s\n" % (e.errno, e.strerror))
305
            sys.exit(1)
306
    
307
        # Decouple from parent environment.
308
        os.chdir(".")
309
        os.umask(0)
310
        os.setsid()
311
    
312
        # Second fork
313
        try:
314
            pid = os.fork()
315
            if pid > 0: 
316
                # Exit second parent.
317
                sys.exit(0) 
318
        except OSError, e:
319
            sys.stderr.write("Failed to daemonize Haizea: (%d) %s\n" % (e.errno, e.strerror))
320
            sys.exit(2)
321
            
322
        # Open file descriptors and print start message
323
        si = file(DAEMON_STDIN, 'r')
324
        so = file(DAEMON_STDOUT, 'a+')
325
        se = file(DAEMON_STDERR, 'a+', 0)
326
        pid = os.getpid()
327
        sys.stderr.write("\nStarted Haizea daemon with pid %i\n\n" % pid)
328
        sys.stderr.flush()
329
        file(self.pidfile,'w+').write("%i\n" % pid)
330
        
331
        # Redirect standard file descriptors.
332
        os.close(sys.stdin.fileno())
333
        os.close(sys.stdout.fileno())
334
        os.close(sys.stderr.fileno())
335
        os.dup2(si.fileno(), sys.stdin.fileno())
336
        os.dup2(so.fileno(), sys.stdout.fileno())
337
        os.dup2(se.fileno(), sys.stderr.fileno())
338

    
339
    def start(self):
340
        """Starts the resource manager"""
341
        self.logger.info("Starting resource manager")
342
        
343
        for frontend in self.frontends:
344
            frontend.load(self)
345
        
346
        if self.daemon:
347
            self.daemonize()
348
        if self.rpc_server:
349
            self.rpc_server.start()
350
            
351
        self.__recover()            
352
            
353
        # Start the clock
354
        try:
355
            self.clock.run()
356
        except UnrecoverableError, exc:
357
            self.__unrecoverable_error(exc)
358
        except Exception, exc:
359
            self.__unexpected_exception(exc)
360

    
361
    def stop(self):
362
        """Stops the resource manager by stopping the clock"""
363
        self.clock.stop()
364
        
365
    def graceful_stop(self):
366
        """Stops the resource manager gracefully and exits"""
367
        
368
        self.logger.status("Stopping resource manager gracefully...")
369
        
370
        # Stop collecting data (this finalizes counters)
371
        self.accounting.stop()
372
        
373
        self.persistence.close()
374
        
375
        # TODO: When gracefully stopping mid-scheduling, we need to figure out what to
376
        #       do with leases that are still running.
377

    
378
        self.print_status()
379
        
380
        # In debug mode, dump the lease descriptors.
381
        for lease in self.scheduler.completed_leases.entries.values():
382
            lease.print_contents()
383
            
384
        # Write all collected data to disk
385
        leases = self.scheduler.completed_leases.entries
386
        self.accounting.save_to_disk(leases)
387
        
388
        # Stop RPC server
389
        if self.rpc_server != None:
390
            self.rpc_server.stop()
391
                    
392
    def process_requests(self, nexttime):
393
        """Process any new requests in the request frontend
394
        
395
        Checks the request frontend to see if there are any new requests that
396
        have to be processed. AR leases are sent directly to the schedule.
397
        Best-effort leases are queued.
398
        
399
        Arguments:
400
        nexttime -- The next time at which the scheduler can allocate resources.
401
                    This is meant to be provided by the clock simply as a sanity
402
                    measure when running in real time (to avoid scheduling something
403
                    "now" to actually have "now" be in the past once the scheduling
404
                    function returns. i.e., nexttime has nothing to do with whether 
405
                    there are resources available at that time or not.
406
        
407
        """        
408
        
409
        # Get requests from frontend
410
        requests = []
411
        for frontend in self.frontends:
412
            requests += frontend.get_accumulated_requests()
413
        requests.sort(key=operator.attrgetter("submit_time"))
414
        
415
        # Request leases and run the scheduling function.
416
        try:
417
            self.logger.vdebug("Requesting leases")
418
            for req in requests:
419
                self.scheduler.request_lease(req)
420

    
421
            self.logger.vdebug("Running scheduling function")
422
            self.scheduler.schedule(nexttime)
423
        except UnrecoverableError, exc:
424
            self.__unrecoverable_error(exc)
425
        except Exception, exc:
426
            self.__unexpected_exception(exc)
427

    
428
    def process_starting_reservations(self, time):
429
        """Process reservations starting/stopping at specified time"""
430
        
431
        # The lease scheduler takes care of this.
432
        try:
433
            self.scheduler.process_starting_reservations(time)
434
        except UnrecoverableError, exc:
435
            self.__unrecoverable_error(exc)
436
        except Exception, exc:
437
            self.__unexpected_exception(exc)
438

    
439
    def process_ending_reservations(self, time):
440
        """Process reservations starting/stopping at specified time"""
441
        
442
        # The lease scheduler takes care of this.
443
        try:
444
            self.scheduler.process_ending_reservations(time)
445
        except UnrecoverableError, exc:
446
            self.__unrecoverable_error(exc)
447
        except Exception, exc:
448
            self.__unexpected_exception(exc)             
449
             
450
    def notify_event(self, lease_id, event):
451
        """Notifies an asynchronous event to Haizea.
452
        
453
        Arguments:
454
        lease_id -- ID of lease that is affected by event
455
        event -- Event (currently, only the constants.EVENT_END_VM event is supported)
456
        """
457
        try:
458
            lease = self.scheduler.get_lease_by_id(lease_id)
459
            self.scheduler.notify_event(lease, event)
460
        except UnrecoverableError, exc:
461
            self.__unrecoverable_error(exc)
462
        except Exception, exc:
463
            self.__unexpected_exception(exc)
464
        
465
    def cancel_lease(self, lease_id):
466
        """Cancels a lease.
467
        
468
        Arguments:
469
        lease_id -- ID of lease to cancel
470
        """    
471
        try:
472
            lease = self.scheduler.get_lease_by_id(lease_id)
473
            self.scheduler.cancel_lease(lease)
474
        except UnrecoverableError, exc:
475
            self.__unrecoverable_error(exc)
476
        except Exception, exc:
477
            self.__unexpected_exception(exc)
478
            
479
    def get_next_changepoint(self):
480
        """Return next changepoint in the slot table"""
481
        return self.scheduler.slottable.get_next_changepoint(self.clock.get_time())
482
   
483
    def exists_more_leases(self):
484
        """Return True if there are any leases still "in the system" """
485
        return self.scheduler.exists_scheduled_leases() or not self.scheduler.is_queue_empty()
486

    
487
    def print_status(self):
488
        """Prints status summary."""
489
        
490
        leases = self.scheduler.leases.get_leases()
491
        completed_leases = self.scheduler.completed_leases.get_leases()
492
        self.logger.status("--- Haizea status summary ---")
493
        self.logger.status("Number of leases (not including completed): %i" % len(leases))
494
        self.logger.status("Completed leases: %i" % len(completed_leases))
495
        self.logger.status("---- End summary ----")        
496

    
497
    def __recover(self):
498
        """Loads persisted leases and scheduling information
499
        
500
        This method does three things:
501
        1. Recover persisted leases. Note that not all persisted leases
502
           may be recoverable. For example, if a lease was scheduled
503
           to start at a certain time, but that time passed while
504
           Haizea was not running, the lease will simply be transitioned
505
           to a failed state.
506
        2. Recover the queue.
507
        3. Recover the list of "future leases" as determined by
508
           the backfilling algorithm.
509
        """
510
        
511
        # Load leases
512
        leases = self.persistence.get_leases()
513
        for lease in leases:
514
            # Create a list of RRs
515
            rrs = lease.preparation_rrs + lease.vm_rrs
516
            for vmrr in lease.vm_rrs:
517
                rrs += vmrr.pre_rrs + vmrr.post_rrs
518

    
519
            # Bind resource tuples in RRs to slot table
520
            for rr in rrs:
521
                for restuple in rr.resources_in_pnode.values():
522
                    restuple.slottable = self.scheduler.slottable
523

    
524
            self.logger.debug("Attempting to recover lease %i" % lease.id)
525
            lease.print_contents()
526
            
527
            # Check the lease's state and determine how to proceed.
528
            load_rrs = False
529
            lease_state = lease.get_state()
530
            if lease_state in (Lease.STATE_DONE, Lease.STATE_CANCELLED, Lease.STATE_REJECTED, Lease.STATE_FAIL):
531
                self.logger.info("Recovered lease %i (already done)" % lease.id)
532
                self.scheduler.completed_leases.add(lease)
533
            elif lease_state in (Lease.STATE_NEW, Lease.STATE_PENDING):
534
                self.scheduler.leases.add(lease)
535
            elif lease_state == Lease.STATE_QUEUED:
536
                load_rrs = True
537
                self.scheduler.leases.add(lease)
538
                self.logger.info("Recovered lease %i (queued)" % lease.id)
539
            elif lease_state in (Lease.STATE_SCHEDULED, Lease.STATE_READY):
540
                # Check if schedule is still valid.
541
                vmrr = lease.get_last_vmrr()
542
                if len(vmrr.pre_rrs) > 0:
543
                    start = vmrr.pre_rrs[0].start
544
                else:
545
                    start = vmrr.start
546
                if self.clock.get_time() < start:
547
                    load_rrs = True
548
                    self.scheduler.leases.add(lease)
549
                    self.logger.info("Recovered lease %i" % lease.id)
550
                else:
551
                    lease.set_state(Lease.STATE_FAIL)
552
                    self.scheduler.completed_leases.add(lease)
553
                    self.logger.info("Could not recover lease %i (scheduled starting time has passed)" % lease.id)
554
            elif lease_state == Lease.STATE_ACTIVE:
555
                vmrr = lease.get_last_vmrr()
556
                if self.clock.get_time() < self.clock.get_time():
557
                    # TODO: Check if VMs are actually running
558
                    load_rrs = True
559
                    self.scheduler.leases.add(lease)
560
                    self.logger.info("Recovered lease %i" % lease.id)
561
                else:
562
                    # TODO: May have to stop extant virtual machines
563
                    lease.set_state(Lease.STATE_FAIL)
564
                    self.scheduler.completed_leases.add(lease)                    
565
                    self.logger.info("Could not recover lease %i (scheduled ending time has passed)" % lease.id)
566
            else:
567
                # No support for recovering lease in the
568
                # remaining states
569
                lease.set_state(Lease.STATE_FAIL)
570
                self.scheduler.completed_leases.add(lease)                    
571
                self.logger.info("Could not recover lease %i (unsupported state %i for recovery)" % (lease.id, lease_state))
572
                
573
            # Load the lease's RRs into the slot table
574
            if load_rrs:
575
                for rr in rrs:
576
                    if rr.state in (ResourceReservation.STATE_ACTIVE, ResourceReservation.STATE_SCHEDULED):
577
                        self.scheduler.slottable.add_reservation(rr)
578
                
579
        # Rebuild the queue        
580
        queue = self.persistence.get_queue()
581
        for lease_id in queue:
582
            if self.scheduler.leases.has_lease(lease_id):
583
                lease = self.scheduler.leases.get_lease(lease_id)
584
                self.scheduler.queue.enqueue(lease)
585

    
586
        # Rebuild the "future leases"
587
        future = self.persistence.get_future_leases()
588
        for lease_id in future:
589
            if self.scheduler.leases.has_lease(lease_id):
590
                lease = self.scheduler.leases.get_lease(lease_id)
591
                self.scheduler.vm_scheduler.future_leases.add(lease)
592

    
593

    
594
    def __unrecoverable_error(self, exc):
595
        """Handles an unrecoverable error.
596
        
597
        This method prints information on the unrecoverable error and makes Haizea panic.
598
        """
599
        self.logger.error("An unrecoverable error has happened.")
600
        self.logger.error("Original exception:")
601
        self.__print_exception(exc.exc, exc.get_traceback())
602
        self.logger.error("Unrecoverable error traceback:")
603
        self.__print_exception(exc, sys.exc_info()[2])
604
        self.__panic()
605

    
606
    def __unexpected_exception(self, exc):
607
        """Handles an unrecoverable error.
608
        
609
        This method prints information on the unrecoverable error and makes Haizea panic.
610
        """
611
        self.logger.error("An unexpected exception has happened.")
612
        self.__print_exception(exc, sys.exc_info()[2])
613
        self.__panic()
614
            
615
    def __print_exception(self, exc, exc_traceback):
616
        """Prints an exception's traceback to the log."""
617
        tb = traceback.format_tb(exc_traceback)
618
        for line in tb:
619
            self.logger.error(line)
620
        self.logger.error("Message: %s" % exc)
621

    
622
    
623
    def __panic(self):
624
        """Makes Haizea crash and burn in a panicked frenzy"""
625
        
626
        self.logger.status("Panicking...")
627

    
628
        # Stop RPC server
629
        if self.rpc_server != None:
630
            self.rpc_server.stop()
631

    
632
        # Dump state
633
        self.print_status()
634
        self.logger.error("Next change point (in slot table): %s" % self.get_next_changepoint())
635

    
636
        # Print lease descriptors
637
        leases = self.scheduler.leases.get_leases()
638
        if len(leases)>0:
639
            self.logger.vdebug("vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv")
640
            for lease in leases:
641
                lease.print_contents()
642
            self.logger.vdebug("^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^")        
643

    
644
        # Exit
645
        treatment = self.config.get("lease-failure-handling")
646
        if treatment == constants.ONFAILURE_EXIT_RAISE:
647
            raise
648
        else:
649
            exit(1)
650

    
651
            
652
class Clock(object):
653
    """Base class for the resource manager's clock.
654
    
655
    The clock is in charge of periodically waking the resource manager so it
656
    will process new requests and handle existing reservations. This is a
657
    base class defining abstract methods.
658
    
659
    """
660
    def __init__(self, manager):
661
        self.manager = manager
662
        self.done = False
663
    
664
    def get_time(self): 
665
        """Return the current time"""
666
        return abstract()
667

    
668
    def get_start_time(self): 
669
        """Return the time at which the clock started ticking"""
670
        return abstract()
671
    
672
    def get_next_schedulable_time(self): 
673
        """Return the next time at which resources could be scheduled.
674
        
675
        The "next schedulable time" server sanity measure when running 
676
        in real time (to avoid scheduling something "now" to actually 
677
        have "now" be in the past once the scheduling function returns. 
678
        i.e., the "next schedulable time" has nothing to do with whether 
679
        there are resources available at that time or not.
680
        """
681
        return abstract()
682
    
683
    def run(self):
684
        """Start and run the clock. This function is, in effect,
685
        the main loop of the resource manager."""
686
        return abstract()     
687

    
688
    def stop(self):
689
        """Stop the clock.
690
        
691
        Stopping the clock makes Haizea exit.
692
        """
693
        self.done = True    
694
    
695
        
696
class SimulatedClock(Clock):
697
    """Simulates the passage of time... really fast.
698
    
699
    The simulated clock steps through time to produce an ideal schedule.
700
    See the run() function for a description of how time is incremented
701
    exactly in the simulated clock.
702
    
703
    """
704
    
705
    def __init__(self, manager, starttime):
706
        """Initialize the simulated clock, starting at the provided starttime"""
707
        Clock.__init__(self, manager)
708
        self.starttime = starttime
709
        self.time = starttime
710
        self.logger = logging.getLogger("CLOCK")
711
        self.statusinterval = self.manager.config.get("status-message-interval")
712
       
713
    def get_time(self):
714
        """See docstring in base Clock class."""
715
        return self.time
716
    
717
    def get_start_time(self):
718
        """See docstring in base Clock class."""
719
        return self.starttime
720

    
721
    def get_next_schedulable_time(self):
722
        """See docstring in base Clock class."""
723
        return self.time    
724
    
725
    def run(self):
726
        """Runs the simulated clock through time.
727
        
728
        The clock starts at the provided start time. At each point in time,
729
        it wakes up the resource manager and then skips to the next time
730
        where "something" is happening (see __get_next_time for a more
731
        rigorous description of this).
732
        
733
        The clock stops when there is nothing left to do (no pending or 
734
        queue requests, and no future reservations)
735
        
736
        The simulated clock can only work in conjunction with the
737
        tracefile request frontend.
738
        """
739
        self.logger.status("Starting simulated clock")
740
        self.manager.accounting.start(self.get_start_time())
741
        prevstatustime = self.time
742
        
743
        # Main loop
744
        while not self.done:
745
            # Check if there are any changes in the resource pool
746
            new_nodes = self.manager.scheduler.vm_scheduler.resourcepool.refresh_nodes()      
747
            for n in new_nodes:
748
                rt = slottable.create_resource_tuple_from_capacity(n.capacity)
749
                slottable.add_node(n.id, rt)   
750
            
751
            # Check to see if there are any leases which are ending prematurely.
752
            # Note that this is unique to simulation.
753
            prematureends = self.manager.scheduler.slottable.get_prematurely_ending_res(self.time)
754
            
755
            # Notify the resource manager about the premature ends
756
            for rr in prematureends:
757
                self.manager.notify_event(rr.lease.id, constants.EVENT_END_VM)
758
                
759
            # Process reservations starting/stopping at the current time and
760
            # check if there are any new requests.
761
            self.manager.process_ending_reservations(self.time)
762
            self.manager.process_starting_reservations(self.time)
763
            self.manager.process_requests(self.time)
764
            
765
            # Since processing requests may have resulted in new reservations
766
            # starting now, we process reservations again.
767
            self.manager.process_starting_reservations(self.time)
768
            # And one final call to deal with nil-duration reservations
769
            self.manager.process_ending_reservations(self.time)
770
            
771
            self.manager.accounting.at_timestep(self.manager.scheduler)
772
            
773
            # Print a status message
774
            if self.statusinterval != None and (self.time - prevstatustime).minutes >= self.statusinterval:
775
                self.manager.print_status()
776
                prevstatustime = self.time
777
                
778
            # Skip to next point in time.
779
            self.time, self.done = self.__get_next_time()
780
                    
781
        self.logger.status("Simulated clock has stopped")
782

    
783
        # Stop the resource manager
784
        self.manager.graceful_stop()
785
        
786
    
787
    def __get_next_time(self):
788
        """Determines what is the next point in time to skip to.
789
        
790
        At a given point in time, the next time is the earliest of the following:
791
        * The arrival of the next lease request
792
        * The start or end of a reservation (a "changepoint" in the slot table)
793
        * A premature end of a lease
794
        """
795
        
796
        # Determine candidate next times
797
        tracefrontend = self.__get_trace_frontend()
798
        nextchangepoint = self.manager.get_next_changepoint()
799
        nextprematureend = self.manager.scheduler.slottable.get_next_premature_end(self.time)
800
        nextreqtime = tracefrontend.get_next_request_time()
801
        self.logger.debug("Next change point (in slot table): %s" % nextchangepoint)
802
        self.logger.debug("Next request time: %s" % nextreqtime)
803
        self.logger.debug("Next premature end: %s" % nextprematureend)
804
        
805
        # The previous time is now
806
        prevtime = self.time
807
        
808
        # We initialize the next time to now too, to detect if
809
        # we've been unable to determine what the next time is.
810
        newtime = self.time
811
        
812
        # Find the earliest of the three, accounting for None values
813
        if nextchangepoint != None and nextreqtime == None:
814
            newtime = nextchangepoint
815
        elif nextchangepoint == None and nextreqtime != None:
816
            newtime = nextreqtime
817
        elif nextchangepoint != None and nextreqtime != None:
818
            newtime = min(nextchangepoint, nextreqtime)
819
            
820
        if nextprematureend != None:
821
            newtime = min(nextprematureend, newtime)
822
                        
823
        # If there's no more leases in the system, and no more pending requests,
824
        # then we're done.
825
        if not self.manager.exists_more_leases() and not tracefrontend.exists_more_requests():
826
            self.done = True
827
        
828
        # We can also be done if we've specified that we want to stop when
829
        # the best-effort requests are all done or when they've all been submitted.
830
        stopwhen = self.manager.config.get("stop-when")
831
        besteffort = self.manager.scheduler.leases.get_leases(type = Lease.BEST_EFFORT)
832
        pendingbesteffort = [r for r in tracefrontend.requests if r.get_type() == Lease.BEST_EFFORT]
833
        if stopwhen == constants.STOPWHEN_BEDONE:
834
            if self.manager.scheduler.is_queue_empty() and len(besteffort) + len(pendingbesteffort) == 0:
835
                self.done = True
836
        elif stopwhen == constants.STOPWHEN_BESUBMITTED:
837
            if len(pendingbesteffort) == 0:
838
                self.done = True
839
                
840
        # If we didn't arrive at a new time, and we're not done, we've fallen into
841
        # an infinite loop. This is A Bad Thing(tm).
842
        if newtime == prevtime and self.done != True:
843
            raise Exception, "Simulated clock has fallen into an infinite loop."
844
        
845
        return newtime, self.done
846

    
847
    def __get_trace_frontend(self):
848
        """Gets the tracefile frontend from the resource manager"""
849
        frontends = self.manager.frontends
850
        tracef = [f for f in frontends if isinstance(f, TracefileFrontend)]
851
        if len(tracef) != 1:
852
            raise Exception, "The simulated clock can only work with a tracefile request frontend."
853
        else:
854
            return tracef[0] 
855
        
856
        
857
class RealClock(Clock):
858
    """A realtime clock.
859
    
860
    The real clock wakes up periodically to, in turn, tell the resource manager
861
    to wake up. The real clock can also be run in a "fastforward" mode for
862
    debugging purposes (however, unlike the simulated clock, the clock will
863
    always skip a fixed amount of time into the future).
864
    """
865
    def __init__(self, manager, quantum, non_sched, fastforward = False):
866
        """Initializes the real clock.
867
        
868
        Arguments:
869
        manager -- the resource manager
870
        quantum -- interval between clock wakeups
871
        fastforward -- if True, the clock won't actually sleep
872
                       for the duration of the quantum."""
873
        Clock.__init__(self, manager)
874
        self.fastforward = fastforward
875
        if not self.fastforward:
876
            self.lastwakeup = None
877
        else:
878
            self.lastwakeup = round_datetime(now())
879
        self.logger = logging.getLogger("CLOCK")
880
        self.starttime = self.get_time()
881
        self.nextschedulable = None
882
        self.nextperiodicwakeup = None
883
        self.quantum = TimeDelta(seconds=quantum)
884
        self.non_sched = TimeDelta(seconds=non_sched)
885
               
886
    def get_time(self):
887
        """See docstring in base Clock class."""
888
        if not self.fastforward:
889
            return now()
890
        else:
891
            return self.lastwakeup
892
    
893
    def get_start_time(self):
894
        """See docstring in base Clock class."""
895
        return self.starttime
896

    
897
    def get_next_schedulable_time(self):
898
        """See docstring in base Clock class."""
899
        return self.nextschedulable    
900
    
901
    def run(self):
902
        """Runs the real clock through time.
903
        
904
        The clock starts when run() is called. In each iteration of the main loop
905
        it will do the following:
906
        - Wake up the resource manager
907
        - Determine if there will be anything to do before the next
908
          time the clock will wake up (after the quantum has passed). Note
909
          that this information is readily available on the slot table.
910
          If so, set next-wakeup-time to (now + time until slot table
911
          event). Otherwise, set it to (now + quantum)
912
        - Sleep until next-wake-up-time
913
        
914
        The clock keeps on tickin' until a SIGINT signal (Ctrl-C if running in the
915
        foreground) or a SIGTERM signal is received.
916
        """
917
        self.logger.status("Starting clock")
918
        self.manager.accounting.start(self.get_start_time())
919
        
920
        try:
921
            signal.signal(signal.SIGINT, self.signalhandler_gracefulstop)
922
            signal.signal(signal.SIGTERM, self.signalhandler_gracefulstop)
923
        except ValueError, exc:
924
            # This means Haizea is not the main thread, which will happen
925
            # when running it as part of a py.test. We simply ignore this
926
            # to allow the test to continue.
927
            pass
928
        
929
        # Main loop
930
        while not self.done:
931
            self.logger.status("Waking up to manage resources")
932
                        
933
            # Save the waking time. We want to use a consistent time in the 
934
            # resource manager operations (if we use now(), we'll get a different
935
            # time every time)
936
            if not self.fastforward:
937
                self.lastwakeup = round_datetime(self.get_time())
938
            self.logger.status("Wake-up time recorded as %s" % self.lastwakeup)
939
                
940
            # Next schedulable time
941
            self.nextschedulable = round_datetime(self.lastwakeup + self.non_sched)
942
            
943
            # Check if there are any changes in the resource pool
944
            new_nodes = self.manager.scheduler.vm_scheduler.resourcepool.refresh_nodes()      
945
            for n in new_nodes:
946
                rt = self.manager.scheduler.slottable.create_resource_tuple_from_capacity(n.capacity)
947
                self.manager.scheduler.slottable.add_node(n.id, rt)                  
948
            
949
            # Wake up the resource manager
950
            self.manager.process_ending_reservations(self.lastwakeup)
951
            self.manager.process_starting_reservations(self.lastwakeup)
952
            # TODO: Compute nextschedulable here, before processing requests
953
            self.manager.process_requests(self.nextschedulable)
954

    
955
            self.manager.accounting.at_timestep(self.manager.scheduler)
956
            
957
            # Next wakeup time
958
            time_now = now()
959
            if self.lastwakeup + self.quantum <= time_now:
960
                quantums = (time_now - self.lastwakeup) / self.quantum
961
                quantums = int(ceil(quantums)) * self.quantum
962
                self.nextperiodicwakeup = round_datetime(self.lastwakeup + quantums)
963
            else:
964
                self.nextperiodicwakeup = round_datetime(self.lastwakeup + self.quantum)
965
            
966
            # Determine if there's anything to do before the next wakeup time
967
            nextchangepoint = self.manager.get_next_changepoint()
968
            if nextchangepoint != None and nextchangepoint <= self.nextperiodicwakeup:
969
                # We need to wake up earlier to handle a slot table event
970
                nextwakeup = nextchangepoint
971
                self.logger.status("Going back to sleep. Waking up at %s to handle slot table event." % nextwakeup)
972
            else:
973
                # Nothing to do before waking up
974
                nextwakeup = self.nextperiodicwakeup
975
                self.logger.status("Going back to sleep. Waking up at %s to see if something interesting has happened by then." % nextwakeup)
976
            
977
            # The only exit condition from the real clock is if the stop_when_no_more_leases
978
            # is set to True, and there's no more work left to do.
979
            # TODO: This first if is a kludge. Other options should only interact with
980
            # options through the configfile's get method. The "stop-when-no-more-leases"
981
            # option is currently OpenNebula-specific (while the real clock isn't; it can
982
            # be used by both the simulator and the OpenNebula mode). This has to be
983
            # fixed.            
984
            if self.manager.config._options.has_key("stop-when-no-more-leases"):
985
                stop_when_no_more_leases = self.manager.config.get("stop-when-no-more-leases")
986
                if stop_when_no_more_leases and not self.manager.exists_more_leases():
987
                    self.done = True
988
            
989
            # Sleep
990
            if not self.done:
991
                if not self.fastforward:
992
                    sleep((nextwakeup - now()).seconds)
993
                else:
994
                    self.lastwakeup = nextwakeup
995

    
996
        self.logger.status("Real clock has stopped")
997

    
998
        # Stop the resource manager
999
        self.manager.graceful_stop()
1000
    
1001
    def signalhandler_gracefulstop(self, signum, frame):
1002
        """Handler for SIGTERM and SIGINT. Allows Haizea to stop gracefully."""
1003
        
1004
        sigstr = ""
1005
        if signum == signal.SIGTERM:
1006
            sigstr = " (SIGTERM)"
1007
        elif signum == signal.SIGINT:
1008
            sigstr = " (SIGINT)"
1009
        self.logger.status("Received signal %i%s" %(signum, sigstr))
1010
        self.done = True
1011

    
1012

    
1013
class PersistenceManager(object):    
1014
    """Persistence manager.
1015
    
1016
    The persistence manager is in charge of persisting leases, and some
1017
    scheduling data, to disk. This allows Haizea to recover from crashes.
1018
    """    
1019
    
1020
    def __init__(self, file):
1021
        """Constructor
1022
        
1023
        Initializes the persistence manager. If the specified file
1024
        does not exist, it is created. If the file is created, it
1025
        is opened but the information is not recovered (this is
1026
        the responsibility of the Manager class)
1027
        
1028
        Arguments:
1029
        file -- Persistence file. If None is specified, then
1030
                persistence is disabled and Haizea will run entirely
1031
                in-memory.
1032
        """
1033
        if file == None:
1034
            self.disabled = True
1035
            self.shelf = {}
1036
        else:
1037
            self.disabled = False
1038
            file = os.path.expanduser(file)
1039
            d = os.path.dirname(file)
1040
            if not os.path.exists(d):
1041
                os.makedirs(d)
1042
            self.shelf = shelve.open(file, flag='c', protocol = -1)
1043
        
1044
    def persist_lease(self, lease):
1045
        """Persists a single lease to disk
1046
                
1047
        Arguments:
1048
        lease -- Lease to persist
1049
        """        
1050
        if not self.disabled:
1051
            self.shelf["lease-%i" % lease.id] = lease
1052
            self.shelf.sync()
1053

    
1054
    def persist_queue(self, queue):
1055
        """Persists the queue to disk
1056
                
1057
        Arguments:
1058
        queue -- The queue
1059
        """        
1060
        if not self.disabled:
1061
            self.shelf["queue"] = [l.id for l in queue]
1062
            self.shelf.sync()
1063
        
1064
    def persist_future_leases(self, leases):
1065
        """Persists the set of future leases
1066
                
1067
        Arguments:
1068
        leases -- "Future leases" (as determined by backfilling algorithm)
1069
        """              
1070
        if not self.disabled:
1071
            self.shelf["future"] = [l.id for l in leases]        
1072
            self.shelf.sync()
1073
        
1074
    def get_leases(self):
1075
        """Returns the leases persisted to disk.
1076
                
1077
        """              
1078
        return [v for k,v in self.shelf.items() if k.startswith("lease-")]
1079
    
1080
    def get_queue(self):
1081
        """Returns the queue persisted to disk.
1082
                
1083
        """              
1084
        if self.shelf.has_key("queue"):
1085
            return self.shelf["queue"]
1086
        else:
1087
            return []
1088
        
1089
    def get_future_leases(self):
1090
        """Returns the future leases persisted to disk.
1091
                
1092
        """              
1093
        if self.shelf.has_key("future"):
1094
            return self.shelf["future"]
1095
        else:
1096
            return []        
1097
    
1098
    def close(self):
1099
        """Closes the persistence manager.
1100
        
1101
        Closing the persistence manager saves any remaining
1102
        data to disk.
1103
        """              
1104
        if not self.disabled:
1105
            self.shelf.close()
1106
        
1107