Project

General

Profile

root / trunk / src / haizea / core / manager.py @ 675

1
# -------------------------------------------------------------------------- #
2
# Copyright 2006-2009, University of Chicago                                 #
3
# Copyright 2008-2009, Distributed Systems Architecture Group, Universidad   #
4
# Complutense de Madrid (dsa-research.org)                                   #
5
#                                                                            #
6
# Licensed under the Apache License, Version 2.0 (the "License"); you may    #
7
# not use this file except in compliance with the License. You may obtain    #
8
# a copy of the License at                                                   #
9
#                                                                            #
10
# http://www.apache.org/licenses/LICENSE-2.0                                 #
11
#                                                                            #
12
# Unless required by applicable law or agreed to in writing, software        #
13
# distributed under the License is distributed on an "AS IS" BASIS,          #
14
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.   #
15
# See the License for the specific language governing permissions and        #
16
# limitations under the License.                                             #
17
# -------------------------------------------------------------------------- #
18

    
19
"""The manager (resource manager) module is the root of Haizea. If you want to
20
see where the ball starts rolling, look at the following two functions:
21

22
* manager.Manager.__init__()
23
* manager.Manager.start()
24

25
This module provides the following classes:
26

27
* Manager: Haizea itself. Pretty much everything else
28
  is contained in this class.
29
* Clock: A base class for Haizea's clock.
30
* SimulatedClock: A clock for simulations.
31
* RealClock: A clock that advances in realtime.
32
"""
33
 
34
import haizea.common.constants as constants
35
from haizea.core.scheduler.preparation_schedulers.unmanaged import UnmanagedPreparationScheduler
36
from haizea.core.scheduler.preparation_schedulers.imagetransfer import ImageTransferPreparationScheduler
37
from haizea.core.enact.opennebula import OpenNebulaResourcePoolInfo, OpenNebulaVMEnactment, OpenNebulaDummyDeploymentEnactment
38
from haizea.core.enact.simulated import SimulatedResourcePoolInfo, SimulatedVMEnactment, SimulatedDeploymentEnactment
39
from haizea.core.frontends.tracefile import TracefileFrontend
40
from haizea.core.frontends.opennebula import OpenNebulaFrontend
41
from haizea.core.frontends.rpc import RPCFrontend
42
from haizea.core.accounting import AccountingDataCollection
43
from haizea.core.scheduler import UnrecoverableError
44
from haizea.core.scheduler.lease_scheduler import LeaseScheduler
45
from haizea.core.scheduler.vm_scheduler import VMScheduler
46
from haizea.core.scheduler.mapper import class_mappings as mapper_mappings
47
from haizea.core.scheduler.slottable import SlotTable, ResourceReservation
48
from haizea.core.scheduler.policy import PolicyManager
49
from haizea.core.scheduler.resourcepool import ResourcePool, ResourcePoolWithReusableImages
50
from haizea.core.leases import Lease, Site
51
from haizea.core.log import HaizeaLogger
52
from haizea.core.rpcserver import RPCServer
53
from haizea.common.utils import abstract, round_datetime, Singleton, import_class, OpenNebulaXMLRPCClientSingleton
54
from haizea.common.opennebula_xmlrpc import OpenNebulaXMLRPCClient
55
from haizea.pluggable.policies import admission_class_mappings, preemption_class_mappings, host_class_mappings 
56
from haizea.pluggable.accounting import probe_class_mappings
57

    
58
import operator
59
import logging
60
import signal
61
import sys, os
62
import traceback
63
import shelve
64
import socket
65
from time import sleep
66
from math import ceil
67
from mx.DateTime import now, TimeDelta
68

    
69
DAEMON_STDOUT = DAEMON_STDIN = "/dev/null"
70
DAEMON_STDERR = "/var/tmp/haizea.err"
71
DEFAULT_LOGFILE = "/var/tmp/haizea.log"
72

    
73
class Manager(object):
74
    """The root of Haizea
75
    
76
    This class is the root of Haizea. Pretty much everything else (scheduler,
77
    enactment modules, etc.) is contained in this class. The Manager
78
    class is meant to be a singleton.
79
    
80
    """
81
    
82
    __metaclass__ = Singleton
83
    
84
    def __init__(self, config, daemon=False, pidfile=None):
85
        """Initializes the manager.
86
        
87
        Argument:
88
        config -- a populated instance of haizea.common.config.RMConfig
89
        daemon -- True if Haizea must run as a daemon, False if it must
90
                  run in the foreground
91
        pidfile -- When running as a daemon, file to save pid to
92
        """
93
        self.config = config
94
        
95
        # Create the RM components
96
        
97
        mode = config.get("mode")
98
        
99
        self.daemon = daemon
100
        self.pidfile = pidfile
101

    
102
        if mode == "simulated":
103
            # Simulated-time simulations always run in the foreground
104
            clock = self.config.get("clock")
105
            if clock == constants.CLOCK_SIMULATED:
106
                self.daemon = False
107
        elif mode == "opennebula":
108
            clock = constants.CLOCK_REAL        
109
        
110
        self.init_logging()
111
                
112
        if clock == constants.CLOCK_SIMULATED:
113
            starttime = self.config.get("starttime")
114
            self.clock = SimulatedClock(self, starttime)
115
            self.rpc_server = None
116
        elif clock == constants.CLOCK_REAL:
117
            wakeup_interval = self.config.get("wakeup-interval")
118
            non_sched = self.config.get("non-schedulable-interval")
119
            if mode == "opennebula":
120
                fastforward = self.config.get("dry-run")
121
            else:
122
                fastforward = False
123
            self.clock = RealClock(self, wakeup_interval, non_sched, fastforward)
124
            if fastforward:
125
                # No need for an RPC server when doing a dry run
126
                self.rpc_server = None
127
            else:
128
                self.rpc_server = RPCServer(self)
129
                    
130
        # Create the RPC singleton client for OpenNebula mode
131
        if mode == "opennebula":
132
            host = self.config.get("one.host")
133
            port = self.config.get("one.port")
134
            rv = OpenNebulaXMLRPCClient.get_userpass_from_env()
135
            if rv == None:
136
                print "ONE_AUTH environment variable is not set"
137
                exit(1)
138
            else:
139
                user, passw = rv[0], rv[1]
140
                try:
141
                    OpenNebulaXMLRPCClientSingleton(host, port, user, passw)
142
                except socket.error, e:
143
                    print "Unable to connect to OpenNebula"
144
                    print "Reason: %s" % e
145
                    exit(1)
146
                    
147
        # Enactment modules
148
        if mode == "simulated":
149
            resources = self.config.get("simul.resources")
150
            if resources == "in-tracefile":
151
                tracefile = self.config.get("tracefile")
152
                site = Site.from_lwf_file(tracefile)
153
            elif resources.startswith("file:"):
154
                sitefile = resources.split(":")
155
                site = Site.from_xml_file(sitefile)
156
            else:
157
                site = Site.from_resources_string(resources)
158
    
159
            deploy_bandwidth = config.get("imagetransfer-bandwidth")
160
            info_enact = SimulatedResourcePoolInfo(site)
161
            vm_enact = SimulatedVMEnactment()
162
            deploy_enact = SimulatedDeploymentEnactment(deploy_bandwidth)
163
        elif mode == "opennebula":
164
            # Enactment modules
165
            info_enact = OpenNebulaResourcePoolInfo()
166
            vm_enact = OpenNebulaVMEnactment()
167
            # No deployment in OpenNebula. Using dummy one for now.
168
            deploy_enact = OpenNebulaDummyDeploymentEnactment()            
169

    
170
        if mode == "simulated":
171
            preparation_type = self.config.get("lease-preparation")
172
        elif mode == "opennebula":
173
            # No deployment in OpenNebula.
174
            preparation_type = constants.PREPARATION_UNMANAGED
175

    
176
        # Resource pool
177
        if preparation_type == constants.PREPARATION_TRANSFER:
178
            if self.config.get("diskimage-reuse") == constants.REUSE_IMAGECACHES:
179
                resourcepool = ResourcePoolWithReusableImages(info_enact, vm_enact, deploy_enact)
180
            else:
181
                resourcepool = ResourcePool(info_enact, vm_enact, deploy_enact)
182
        else:
183
            resourcepool = ResourcePool(info_enact, vm_enact, deploy_enact)
184
    
185
        # Slot table
186
        slottable = SlotTable(info_enact.get_resource_types())
187
        for n in resourcepool.get_nodes() + resourcepool.get_aux_nodes():
188
            rt = slottable.create_resource_tuple_from_capacity(n.capacity)
189
            slottable.add_node(n.id, rt)
190

    
191
        # Policy manager
192
        admission = self.config.get("policy.admission")
193
        admission = admission_class_mappings.get(admission, admission)
194
        admission = import_class(admission)
195
        admission = admission(slottable)
196
        
197
        preemption = self.config.get("policy.preemption")
198
        preemption = preemption_class_mappings.get(preemption, preemption)
199
        preemption = import_class(preemption)
200
        preemption = preemption(slottable)
201

    
202
        host_selection = self.config.get("policy.host-selection")
203
        host_selection = host_class_mappings.get(host_selection, host_selection)
204
        host_selection = import_class(host_selection)
205
        host_selection = host_selection(slottable)
206

    
207
        self.policy = PolicyManager(admission, preemption, host_selection)
208

    
209
        # Preparation scheduler
210
        if preparation_type == constants.PREPARATION_UNMANAGED:
211
            preparation_scheduler = UnmanagedPreparationScheduler(slottable, resourcepool, deploy_enact)
212
        elif preparation_type == constants.PREPARATION_TRANSFER:
213
            preparation_scheduler = ImageTransferPreparationScheduler(slottable, resourcepool, deploy_enact)    
214
    
215
        # VM mapper and scheduler
216
        mapper = self.config.get("mapper")
217
        mapper = mapper_mappings.get(mapper, mapper)
218
        mapper = import_class(mapper)
219
        mapper = mapper(slottable, self.policy)
220
        
221
        # When using backfilling, set the number of leases that can be
222
        # scheduled in the future.
223
        backfilling = self.config.get("backfilling")
224
        if backfilling == constants.BACKFILLING_OFF:
225
            max_in_future = 0
226
        elif backfilling == constants.BACKFILLING_AGGRESSIVE:
227
            max_in_future = 1
228
        elif backfilling == constants.BACKFILLING_CONSERVATIVE:
229
            max_in_future = -1 # Unlimited
230
        elif backfilling == constants.BACKFILLING_INTERMEDIATE:
231
            max_in_future = self.config.get("backfilling-reservations")
232
        
233
        vm_scheduler = VMScheduler(slottable, resourcepool, mapper, max_in_future)
234
    
235
        # Statistics collection 
236
        attrs = dict([(attr, self.config.get_attr(attr)) for attr in self.config.get_attrs()])    
237
        
238
        self.accounting = AccountingDataCollection(self.config.get("datafile"), attrs)
239
        # Load probes
240
        probes = self.config.get("accounting-probes")
241
        probes = probes.split()
242
        for probe in probes:
243
            probe_class = probe_class_mappings.get(probe, probe)
244
            probe_class = import_class(probe_class)
245
            probe_obj = probe_class(self.accounting)
246
            self.accounting.add_probe(probe_obj)    
247
    
248
        # Lease Scheduler
249
        self.scheduler = LeaseScheduler(vm_scheduler, preparation_scheduler, slottable, self.accounting)
250
                                
251
        # Lease request frontends
252
        if mode == "simulated":
253
            if clock == constants.CLOCK_SIMULATED:
254
                # In pure simulation, we can only use the tracefile frontend
255
                self.frontends = [TracefileFrontend(self.clock.get_start_time())]
256
            elif clock == constants.CLOCK_REAL:
257
                # In simulation with a real clock, only the RPC frontend can be used
258
                self.frontends = [RPCFrontend()]             
259
        elif mode == "opennebula":
260
            self.frontends = [OpenNebulaFrontend()]               
261

    
262
        persistence_file = self.config.get("persistence-file")
263
        if persistence_file == "none":
264
            persistence_file = None
265
        self.persistence = PersistenceManager(persistence_file)
266
        
267
        self.logger = logging.getLogger("RM")
268

    
269

    
270
    def init_logging(self):
271
        """Initializes logging
272
        
273
        """
274

    
275
        logger = logging.getLogger("")
276
        if self.daemon:
277
            handler = logging.FileHandler(self.config.get("logfile"))
278
        else:
279
            handler = logging.StreamHandler()
280
        if sys.version_info[1] <= 4:
281
            formatter = logging.Formatter('%(name)-7s %(message)s')
282
        else:
283
            formatter = logging.Formatter('[%(haizeatime)s] %(name)-7s %(message)s')
284
        handler.setFormatter(formatter)
285
        logger.addHandler(handler)
286
        level = logging.getLevelName(self.config.get("loglevel"))
287
        logger.setLevel(level)
288
        logging.setLoggerClass(HaizeaLogger)
289

    
290
        
291
    def daemonize(self):
292
        """Daemonizes the Haizea process.
293
        
294
        Based on code in:  http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/66012
295
        
296
        """
297
        # First fork
298
        try:
299
            pid = os.fork()
300
            if pid > 0: 
301
                # Exit first parent
302
                sys.exit(0) 
303
        except OSError, e:
304
            sys.stderr.write("Failed to daemonize Haizea: (%d) %s\n" % (e.errno, e.strerror))
305
            sys.exit(1)
306
    
307
        # Decouple from parent environment.
308
        os.chdir(".")
309
        os.umask(0)
310
        os.setsid()
311
    
312
        # Second fork
313
        try:
314
            pid = os.fork()
315
            if pid > 0: 
316
                # Exit second parent.
317
                sys.exit(0) 
318
        except OSError, e:
319
            sys.stderr.write("Failed to daemonize Haizea: (%d) %s\n" % (e.errno, e.strerror))
320
            sys.exit(2)
321
            
322
        # Open file descriptors and print start message
323
        si = file(DAEMON_STDIN, 'r')
324
        so = file(DAEMON_STDOUT, 'a+')
325
        se = file(DAEMON_STDERR, 'a+', 0)
326
        pid = os.getpid()
327
        sys.stderr.write("\nStarted Haizea daemon with pid %i\n\n" % pid)
328
        sys.stderr.flush()
329
        file(self.pidfile,'w+').write("%i\n" % pid)
330
        
331
        # Redirect standard file descriptors.
332
        os.close(sys.stdin.fileno())
333
        os.close(sys.stdout.fileno())
334
        os.close(sys.stderr.fileno())
335
        os.dup2(si.fileno(), sys.stdin.fileno())
336
        os.dup2(so.fileno(), sys.stdout.fileno())
337
        os.dup2(se.fileno(), sys.stderr.fileno())
338

    
339
    def start(self):
340
        """Starts the resource manager"""
341
        self.logger.info("Starting resource manager")
342
        
343
        for frontend in self.frontends:
344
            frontend.load(self)
345
        
346
        if self.daemon:
347
            self.daemonize()
348
        if self.rpc_server:
349
            self.rpc_server.start()
350
            
351
        self.__recover()            
352
            
353
        # Start the clock
354
        try:
355
            self.clock.run()
356
        except UnrecoverableError, exc:
357
            self.__unrecoverable_error(exc)
358
        except Exception, exc:
359
            self.__unexpected_exception(exc)
360

    
361
    def stop(self):
362
        """Stops the resource manager by stopping the clock"""
363
        self.clock.stop()
364
        
365
    def graceful_stop(self):
366
        """Stops the resource manager gracefully and exits"""
367
        
368
        self.logger.status("Stopping resource manager gracefully...")
369
        
370
        # Stop collecting data (this finalizes counters)
371
        self.accounting.stop()
372
        
373
        self.persistence.close()
374
        
375
        # TODO: When gracefully stopping mid-scheduling, we need to figure out what to
376
        #       do with leases that are still running.
377

    
378
        self.print_status()
379
        
380
        # In debug mode, dump the lease descriptors.
381
        for lease in self.scheduler.completed_leases.entries.values():
382
            lease.print_contents()
383
            
384
        # Write all collected data to disk
385
        leases = self.scheduler.completed_leases.entries
386
        self.accounting.save_to_disk(leases)
387
        
388
        # Stop RPC server
389
        if self.rpc_server != None:
390
            self.rpc_server.stop()
391
                    
392
    def process_requests(self, nexttime):
393
        """Process any new requests in the request frontend
394
        
395
        Checks the request frontend to see if there are any new requests that
396
        have to be processed. AR leases are sent directly to the schedule.
397
        Best-effort leases are queued.
398
        
399
        Arguments:
400
        nexttime -- The next time at which the scheduler can allocate resources.
401
                    This is meant to be provided by the clock simply as a sanity
402
                    measure when running in real time (to avoid scheduling something
403
                    "now" to actually have "now" be in the past once the scheduling
404
                    function returns. i.e., nexttime has nothing to do with whether 
405
                    there are resources available at that time or not.
406
        
407
        """        
408
        
409
        # Get requests from frontend
410
        requests = []
411
        for frontend in self.frontends:
412
            requests += frontend.get_accumulated_requests()
413
        requests.sort(key=operator.attrgetter("submit_time"))
414
        
415
        # Request leases and run the scheduling function.
416
        try:
417
            self.logger.vdebug("Requesting leases")
418
            for req in requests:
419
                self.scheduler.request_lease(req)
420

    
421
            self.logger.vdebug("Running scheduling function")
422
            self.scheduler.schedule(nexttime)
423
        except UnrecoverableError, exc:
424
            self.__unrecoverable_error(exc)
425
        except Exception, exc:
426
            self.__unexpected_exception(exc)
427

    
428
    def process_starting_reservations(self, time):
429
        """Process reservations starting/stopping at specified time"""
430
        
431
        # The lease scheduler takes care of this.
432
        try:
433
            self.scheduler.process_starting_reservations(time)
434
        except UnrecoverableError, exc:
435
            self.__unrecoverable_error(exc)
436
        except Exception, exc:
437
            self.__unexpected_exception(exc)
438

    
439
    def process_ending_reservations(self, time):
440
        """Process reservations starting/stopping at specified time"""
441
        
442
        # The lease scheduler takes care of this.
443
        try:
444
            self.scheduler.process_ending_reservations(time)
445
        except UnrecoverableError, exc:
446
            self.__unrecoverable_error(exc)
447
        except Exception, exc:
448
            self.__unexpected_exception(exc)             
449
             
450
    def notify_event(self, lease_id, event):
451
        """Notifies an asynchronous event to Haizea.
452
        
453
        Arguments:
454
        lease_id -- ID of lease that is affected by event
455
        event -- Event (currently, only the constants.EVENT_END_VM event is supported)
456
        """
457
        try:
458
            lease = self.scheduler.get_lease_by_id(lease_id)
459
            self.scheduler.notify_event(lease, event)
460
        except UnrecoverableError, exc:
461
            self.__unrecoverable_error(exc)
462
        except Exception, exc:
463
            self.__unexpected_exception(exc)
464
        
465
    def cancel_lease(self, lease_id):
466
        """Cancels a lease.
467
        
468
        Arguments:
469
        lease_id -- ID of lease to cancel
470
        """    
471
        try:
472
            lease = self.scheduler.get_lease_by_id(lease_id)
473
            self.scheduler.cancel_lease(lease)
474
        except UnrecoverableError, exc:
475
            self.__unrecoverable_error(exc)
476
        except Exception, exc:
477
            self.__unexpected_exception(exc)
478
            
479
    def get_next_changepoint(self):
480
        """Return next changepoint in the slot table"""
481
        return self.scheduler.slottable.get_next_changepoint(self.clock.get_time())
482
   
483
    def exists_more_leases(self):
484
        """Return True if there are any leases still "in the system" """
485
        return self.scheduler.exists_scheduled_leases() or not self.scheduler.is_queue_empty()
486

    
487
    def print_status(self):
488
        """Prints status summary."""
489
        
490
        leases = self.scheduler.leases.get_leases()
491
        completed_leases = self.scheduler.completed_leases.get_leases()
492
        self.logger.status("--- Haizea status summary ---")
493
        self.logger.status("Number of leases (not including completed): %i" % len(leases))
494
        self.logger.status("Completed leases: %i" % len(completed_leases))
495
        self.logger.status("---- End summary ----")        
496

    
497
    def __recover(self):
498
        """Loads persisted leases and scheduling information
499
        
500
        This method does three things:
501
        1. Recover persisted leases. Note that not all persisted leases
502
           may be recoverable. For example, if a lease was scheduled
503
           to start at a certain time, but that time passed while
504
           Haizea was not running, the lease will simply be transitioned
505
           to a failed state.
506
        2. Recover the queue.
507
        3. Recover the list of "future leases" as determined by
508
           the backfilling algorithm.
509
        """
510
        
511
        # Load leases
512
        leases = self.persistence.get_leases()
513
        for lease in leases:
514
            # Create a list of RRs
515
            rrs = lease.preparation_rrs + lease.vm_rrs
516
            for vmrr in lease.vm_rrs:
517
                rrs += vmrr.pre_rrs + vmrr.post_rrs
518

    
519
            # Bind resource tuples in RRs to slot table
520
            for rr in rrs:
521
                for restuple in rr.resources_in_pnode.values():
522
                    restuple.slottable = self.scheduler.slottable
523

    
524
            self.logger.debug("Attempting to recover lease %i" % lease.id)
525
            lease.print_contents()
526
            
527
            # Check the lease's state and determine how to proceed.
528
            load_rrs = False
529
            lease_state = lease.get_state()
530
            if lease_state in (Lease.STATE_DONE, Lease.STATE_CANCELLED, Lease.STATE_REJECTED, Lease.STATE_FAIL):
531
                self.logger.info("Recovered lease %i (already done)" % lease.id)
532
                self.scheduler.completed_leases.add(lease)
533
            elif lease_state in (Lease.STATE_NEW, Lease.STATE_PENDING):
534
                self.scheduler.leases.add(lease)
535
            elif lease_state == Lease.STATE_QUEUED:
536
                load_rrs = True
537
                self.scheduler.leases.add(lease)
538
                self.logger.info("Recovered lease %i (queued)" % lease.id)
539
            elif lease_state in (Lease.STATE_SCHEDULED, Lease.STATE_READY):
540
                # Check if schedule is still valid.
541
                vmrr = lease.get_last_vmrr()
542
                if len(vmrr.pre_rrs) > 0:
543
                    start = vmrr.pre_rrs[0].start
544
                else:
545
                    start = vmrr.start
546
                if self.clock.get_time() < start:
547
                    load_rrs = True
548
                    self.scheduler.leases.add(lease)
549
                    self.logger.info("Recovered lease %i" % lease.id)
550
                else:
551
                    lease.set_state(Lease.STATE_FAIL)
552
                    self.scheduler.completed_leases.add(lease)
553
                    self.logger.info("Could not recover lease %i (scheduled starting time has passed)" % lease.id)
554
            elif lease_state == Lease.STATE_ACTIVE:
555
                vmrr = lease.get_last_vmrr()
556
                if self.clock.get_time() < self.clock.get_time():
557
                    # TODO: Check if VMs are actually running
558
                    load_rrs = True
559
                    self.scheduler.leases.add(lease)
560
                    self.logger.info("Recovered lease %i" % lease.id)
561
                else:
562
                    # TODO: May have to stop extant virtual machines
563
                    lease.set_state(Lease.STATE_FAIL)
564
                    self.scheduler.completed_leases.add(lease)                    
565
                    self.logger.info("Could not recover lease %i (scheduled ending time has passed)" % lease.id)
566
            else:
567
                # No support for recovering lease in the
568
                # remaining states
569
                lease.set_state(Lease.STATE_FAIL)
570
                self.scheduler.completed_leases.add(lease)                    
571
                self.logger.info("Could not recover lease %i (unsupported state %i for recovery)" % (lease.id, lease_state))
572
                
573
            # Load the lease's RRs into the slot table
574
            if load_rrs:
575
                for rr in rrs:
576
                    if rr.state in (ResourceReservation.STATE_ACTIVE, ResourceReservation.STATE_SCHEDULED):
577
                        self.scheduler.slottable.add_reservation(rr)
578
                
579
        # Rebuild the queue        
580
        queue = self.persistence.get_queue()
581
        for lease_id in queue:
582
            if self.scheduler.leases.has_lease(lease_id):
583
                lease = self.scheduler.leases.get_lease(lease_id)
584
                self.scheduler.queue.enqueue(lease)
585

    
586
        # Rebuild the "future leases"
587
        future = self.persistence.get_future_leases()
588
        for lease_id in future:
589
            if self.scheduler.leases.has_lease(lease_id):
590
                lease = self.scheduler.leases.get_lease(lease_id)
591
                self.scheduler.vm_scheduler.future_leases.add(lease)
592

    
593

    
594
    def __unrecoverable_error(self, exc):
595
        """Handles an unrecoverable error.
596
        
597
        This method prints information on the unrecoverable error and makes Haizea panic.
598
        """
599
        self.logger.error("An unrecoverable error has happened.")
600
        self.logger.error("Original exception:")
601
        self.__print_exception(exc.exc, exc.get_traceback())
602
        self.logger.error("Unrecoverable error traceback:")
603
        self.__print_exception(exc, sys.exc_info()[2])
604
        self.__panic()
605

    
606
    def __unexpected_exception(self, exc):
607
        """Handles an unrecoverable error.
608
        
609
        This method prints information on the unrecoverable error and makes Haizea panic.
610
        """
611
        self.logger.error("An unexpected exception has happened.")
612
        self.__print_exception(exc, sys.exc_info()[2])
613
        self.__panic()
614
            
615
    def __print_exception(self, exc, exc_traceback):
616
        """Prints an exception's traceback to the log."""
617
        tb = traceback.format_tb(exc_traceback)
618
        for line in tb:
619
            self.logger.error(line)
620
        self.logger.error("Message: %s" % exc)
621

    
622
    
623
    def __panic(self):
624
        """Makes Haizea crash and burn in a panicked frenzy"""
625
        
626
        self.logger.status("Panicking...")
627

    
628
        # Stop RPC server
629
        if self.rpc_server != None:
630
            self.rpc_server.stop()
631

    
632
        # Dump state
633
        self.print_status()
634
        self.logger.error("Next change point (in slot table): %s" % self.get_next_changepoint())
635

    
636
        # Print lease descriptors
637
        leases = self.scheduler.leases.get_leases()
638
        if len(leases)>0:
639
            self.logger.vdebug("vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv")
640
            for lease in leases:
641
                lease.print_contents()
642
            self.logger.vdebug("^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^")        
643

    
644
        # Exit
645
        treatment = self.config.get("lease-failure-handling")
646
        if treatment == constants.ONFAILURE_EXIT_RAISE:
647
            raise
648
        else:
649
            exit(1)
650

    
651
            
652
class Clock(object):
653
    """Base class for the resource manager's clock.
654
    
655
    The clock is in charge of periodically waking the resource manager so it
656
    will process new requests and handle existing reservations. This is a
657
    base class defining abstract methods.
658
    
659
    """
660
    def __init__(self, manager):
661
        self.manager = manager
662
        self.done = False
663
    
664
    def get_time(self): 
665
        """Return the current time"""
666
        return abstract()
667

    
668
    def get_start_time(self): 
669
        """Return the time at which the clock started ticking"""
670
        return abstract()
671
    
672
    def get_next_schedulable_time(self): 
673
        """Return the next time at which resources could be scheduled.
674
        
675
        The "next schedulable time" server sanity measure when running 
676
        in real time (to avoid scheduling something "now" to actually 
677
        have "now" be in the past once the scheduling function returns. 
678
        i.e., the "next schedulable time" has nothing to do with whether 
679
        there are resources available at that time or not.
680
        """
681
        return abstract()
682
    
683
    def run(self):
684
        """Start and run the clock. This function is, in effect,
685
        the main loop of the resource manager."""
686
        return abstract()     
687

    
688
    def stop(self):
689
        """Stop the clock.
690
        
691
        Stopping the clock makes Haizea exit.
692
        """
693
        self.done = True    
694
    
695
        
696
class SimulatedClock(Clock):
697
    """Simulates the passage of time... really fast.
698
    
699
    The simulated clock steps through time to produce an ideal schedule.
700
    See the run() function for a description of how time is incremented
701
    exactly in the simulated clock.
702
    
703
    """
704
    
705
    def __init__(self, manager, starttime):
706
        """Initialize the simulated clock, starting at the provided starttime"""
707
        Clock.__init__(self, manager)
708
        self.starttime = starttime
709
        self.time = starttime
710
        self.logger = logging.getLogger("CLOCK")
711
        self.statusinterval = self.manager.config.get("status-message-interval")
712
       
713
    def get_time(self):
714
        """See docstring in base Clock class."""
715
        return self.time
716
    
717
    def get_start_time(self):
718
        """See docstring in base Clock class."""
719
        return self.starttime
720

    
721
    def get_next_schedulable_time(self):
722
        """See docstring in base Clock class."""
723
        return self.time    
724
    
725
    def run(self):
726
        """Runs the simulated clock through time.
727
        
728
        The clock starts at the provided start time. At each point in time,
729
        it wakes up the resource manager and then skips to the next time
730
        where "something" is happening (see __get_next_time for a more
731
        rigorous description of this).
732
        
733
        The clock stops when there is nothing left to do (no pending or 
734
        queue requests, and no future reservations)
735
        
736
        The simulated clock can only work in conjunction with the
737
        tracefile request frontend.
738
        """
739
        self.logger.status("Starting simulated clock")
740
        self.manager.accounting.start(self.get_start_time())
741
        prevstatustime = self.time
742
        
743
        # Main loop
744
        while not self.done:
745
            # Check to see if there are any leases which are ending prematurely.
746
            # Note that this is unique to simulation.
747
            prematureends = self.manager.scheduler.slottable.get_prematurely_ending_res(self.time)
748
            
749
            # Notify the resource manager about the premature ends
750
            for rr in prematureends:
751
                self.manager.notify_event(rr.lease.id, constants.EVENT_END_VM)
752
                
753
            # Process reservations starting/stopping at the current time and
754
            # check if there are any new requests.
755
            self.manager.process_ending_reservations(self.time)
756
            self.manager.process_starting_reservations(self.time)
757
            self.manager.process_requests(self.time)
758
            
759
            # Since processing requests may have resulted in new reservations
760
            # starting now, we process reservations again.
761
            self.manager.process_starting_reservations(self.time)
762
            # And one final call to deal with nil-duration reservations
763
            self.manager.process_ending_reservations(self.time)
764
            
765
            self.manager.accounting.at_timestep(self.manager.scheduler)
766
            
767
            # Print a status message
768
            if self.statusinterval != None and (self.time - prevstatustime).minutes >= self.statusinterval:
769
                self.manager.print_status()
770
                prevstatustime = self.time
771
                
772
            # Skip to next point in time.
773
            self.time, self.done = self.__get_next_time()
774
                    
775
        self.logger.status("Simulated clock has stopped")
776

    
777
        # Stop the resource manager
778
        self.manager.graceful_stop()
779
        
780
    
781
    def __get_next_time(self):
782
        """Determines what is the next point in time to skip to.
783
        
784
        At a given point in time, the next time is the earliest of the following:
785
        * The arrival of the next lease request
786
        * The start or end of a reservation (a "changepoint" in the slot table)
787
        * A premature end of a lease
788
        """
789
        
790
        # Determine candidate next times
791
        tracefrontend = self.__get_trace_frontend()
792
        nextchangepoint = self.manager.get_next_changepoint()
793
        nextprematureend = self.manager.scheduler.slottable.get_next_premature_end(self.time)
794
        nextreqtime = tracefrontend.get_next_request_time()
795
        self.logger.debug("Next change point (in slot table): %s" % nextchangepoint)
796
        self.logger.debug("Next request time: %s" % nextreqtime)
797
        self.logger.debug("Next premature end: %s" % nextprematureend)
798
        
799
        # The previous time is now
800
        prevtime = self.time
801
        
802
        # We initialize the next time to now too, to detect if
803
        # we've been unable to determine what the next time is.
804
        newtime = self.time
805
        
806
        # Find the earliest of the three, accounting for None values
807
        if nextchangepoint != None and nextreqtime == None:
808
            newtime = nextchangepoint
809
        elif nextchangepoint == None and nextreqtime != None:
810
            newtime = nextreqtime
811
        elif nextchangepoint != None and nextreqtime != None:
812
            newtime = min(nextchangepoint, nextreqtime)
813
            
814
        if nextprematureend != None:
815
            newtime = min(nextprematureend, newtime)
816
                        
817
        # If there's no more leases in the system, and no more pending requests,
818
        # then we're done.
819
        if not self.manager.exists_more_leases() and not tracefrontend.exists_more_requests():
820
            self.done = True
821
        
822
        # We can also be done if we've specified that we want to stop when
823
        # the best-effort requests are all done or when they've all been submitted.
824
        stopwhen = self.manager.config.get("stop-when")
825
        besteffort = self.manager.scheduler.leases.get_leases(type = Lease.BEST_EFFORT)
826
        pendingbesteffort = [r for r in tracefrontend.requests if r.get_type() == Lease.BEST_EFFORT]
827
        if stopwhen == constants.STOPWHEN_BEDONE:
828
            if self.manager.scheduler.is_queue_empty() and len(besteffort) + len(pendingbesteffort) == 0:
829
                self.done = True
830
        elif stopwhen == constants.STOPWHEN_BESUBMITTED:
831
            if len(pendingbesteffort) == 0:
832
                self.done = True
833
                
834
        # If we didn't arrive at a new time, and we're not done, we've fallen into
835
        # an infinite loop. This is A Bad Thing(tm).
836
        if newtime == prevtime and self.done != True:
837
            raise Exception, "Simulated clock has fallen into an infinite loop."
838
        
839
        return newtime, self.done
840

    
841
    def __get_trace_frontend(self):
842
        """Gets the tracefile frontend from the resource manager"""
843
        frontends = self.manager.frontends
844
        tracef = [f for f in frontends if isinstance(f, TracefileFrontend)]
845
        if len(tracef) != 1:
846
            raise Exception, "The simulated clock can only work with a tracefile request frontend."
847
        else:
848
            return tracef[0] 
849
        
850
        
851
class RealClock(Clock):
852
    """A realtime clock.
853
    
854
    The real clock wakes up periodically to, in turn, tell the resource manager
855
    to wake up. The real clock can also be run in a "fastforward" mode for
856
    debugging purposes (however, unlike the simulated clock, the clock will
857
    always skip a fixed amount of time into the future).
858
    """
859
    def __init__(self, manager, quantum, non_sched, fastforward = False):
860
        """Initializes the real clock.
861
        
862
        Arguments:
863
        manager -- the resource manager
864
        quantum -- interval between clock wakeups
865
        fastforward -- if True, the clock won't actually sleep
866
                       for the duration of the quantum."""
867
        Clock.__init__(self, manager)
868
        self.fastforward = fastforward
869
        if not self.fastforward:
870
            self.lastwakeup = None
871
        else:
872
            self.lastwakeup = round_datetime(now())
873
        self.logger = logging.getLogger("CLOCK")
874
        self.starttime = self.get_time()
875
        self.nextschedulable = None
876
        self.nextperiodicwakeup = None
877
        self.quantum = TimeDelta(seconds=quantum)
878
        self.non_sched = TimeDelta(seconds=non_sched)
879
               
880
    def get_time(self):
881
        """See docstring in base Clock class."""
882
        if not self.fastforward:
883
            return now()
884
        else:
885
            return self.lastwakeup
886
    
887
    def get_start_time(self):
888
        """See docstring in base Clock class."""
889
        return self.starttime
890

    
891
    def get_next_schedulable_time(self):
892
        """See docstring in base Clock class."""
893
        return self.nextschedulable    
894
    
895
    def run(self):
896
        """Runs the real clock through time.
897
        
898
        The clock starts when run() is called. In each iteration of the main loop
899
        it will do the following:
900
        - Wake up the resource manager
901
        - Determine if there will be anything to do before the next
902
          time the clock will wake up (after the quantum has passed). Note
903
          that this information is readily available on the slot table.
904
          If so, set next-wakeup-time to (now + time until slot table
905
          event). Otherwise, set it to (now + quantum)
906
        - Sleep until next-wake-up-time
907
        
908
        The clock keeps on tickin' until a SIGINT signal (Ctrl-C if running in the
909
        foreground) or a SIGTERM signal is received.
910
        """
911
        self.logger.status("Starting clock")
912
        self.manager.accounting.start(self.get_start_time())
913
        
914
        try:
915
            signal.signal(signal.SIGINT, self.signalhandler_gracefulstop)
916
            signal.signal(signal.SIGTERM, self.signalhandler_gracefulstop)
917
        except ValueError, exc:
918
            # This means Haizea is not the main thread, which will happen
919
            # when running it as part of a py.test. We simply ignore this
920
            # to allow the test to continue.
921
            pass
922
        
923
        # Main loop
924
        while not self.done:
925
            self.logger.status("Waking up to manage resources")
926
            
927
            # Save the waking time. We want to use a consistent time in the 
928
            # resource manager operations (if we use now(), we'll get a different
929
            # time every time)
930
            if not self.fastforward:
931
                self.lastwakeup = round_datetime(self.get_time())
932
            self.logger.status("Wake-up time recorded as %s" % self.lastwakeup)
933
                
934
            # Next schedulable time
935
            self.nextschedulable = round_datetime(self.lastwakeup + self.non_sched)
936
            
937
            # Wake up the resource manager
938
            self.manager.process_ending_reservations(self.lastwakeup)
939
            self.manager.process_starting_reservations(self.lastwakeup)
940
            # TODO: Compute nextschedulable here, before processing requests
941
            self.manager.process_requests(self.nextschedulable)
942

    
943
            self.manager.accounting.at_timestep(self.manager.scheduler)
944
            
945
            # Next wakeup time
946
            time_now = now()
947
            if self.lastwakeup + self.quantum <= time_now:
948
                quantums = (time_now - self.lastwakeup) / self.quantum
949
                quantums = int(ceil(quantums)) * self.quantum
950
                self.nextperiodicwakeup = round_datetime(self.lastwakeup + quantums)
951
            else:
952
                self.nextperiodicwakeup = round_datetime(self.lastwakeup + self.quantum)
953
            
954
            # Determine if there's anything to do before the next wakeup time
955
            nextchangepoint = self.manager.get_next_changepoint()
956
            if nextchangepoint != None and nextchangepoint <= self.nextperiodicwakeup:
957
                # We need to wake up earlier to handle a slot table event
958
                nextwakeup = nextchangepoint
959
                self.logger.status("Going back to sleep. Waking up at %s to handle slot table event." % nextwakeup)
960
            else:
961
                # Nothing to do before waking up
962
                nextwakeup = self.nextperiodicwakeup
963
                self.logger.status("Going back to sleep. Waking up at %s to see if something interesting has happened by then." % nextwakeup)
964
            
965
            # The only exit condition from the real clock is if the stop_when_no_more_leases
966
            # is set to True, and there's no more work left to do.
967
            # TODO: This first if is a kludge. Other options should only interact with
968
            # options through the configfile's get method. The "stop-when-no-more-leases"
969
            # option is currently OpenNebula-specific (while the real clock isn't; it can
970
            # be used by both the simulator and the OpenNebula mode). This has to be
971
            # fixed.            
972
            if self.manager.config._options.has_key("stop-when-no-more-leases"):
973
                stop_when_no_more_leases = self.manager.config.get("stop-when-no-more-leases")
974
                if stop_when_no_more_leases and not self.manager.exists_more_leases():
975
                    self.done = True
976
            
977
            # Sleep
978
            if not self.done:
979
                if not self.fastforward:
980
                    sleep((nextwakeup - now()).seconds)
981
                else:
982
                    self.lastwakeup = nextwakeup
983

    
984
        self.logger.status("Real clock has stopped")
985

    
986
        # Stop the resource manager
987
        self.manager.graceful_stop()
988
    
989
    def signalhandler_gracefulstop(self, signum, frame):
990
        """Handler for SIGTERM and SIGINT. Allows Haizea to stop gracefully."""
991
        
992
        sigstr = ""
993
        if signum == signal.SIGTERM:
994
            sigstr = " (SIGTERM)"
995
        elif signum == signal.SIGINT:
996
            sigstr = " (SIGINT)"
997
        self.logger.status("Received signal %i%s" %(signum, sigstr))
998
        self.done = True
999

    
1000

    
1001
class PersistenceManager(object):    
1002
    """Persistence manager.
1003
    
1004
    The persistence manager is in charge of persisting leases, and some
1005
    scheduling data, to disk. This allows Haizea to recover from crashes.
1006
    """    
1007
    
1008
    def __init__(self, file):
1009
        """Constructor
1010
        
1011
        Initializes the persistence manager. If the specified file
1012
        does not exist, it is created. If the file is created, it
1013
        is opened but the information is not recovered (this is
1014
        the responsibility of the Manager class)
1015
        
1016
        Arguments:
1017
        file -- Persistence file. If None is specified, then
1018
                persistence is disabled and Haizea will run entirely
1019
                in-memory.
1020
        """
1021
        if file == None:
1022
            self.disabled = True
1023
            self.shelf = {}
1024
        else:
1025
            self.disabled = False
1026
            file = os.path.expanduser(file)
1027
            d = os.path.dirname(file)
1028
            if not os.path.exists(d):
1029
                os.makedirs(d)
1030
            self.shelf = shelve.open(file, flag='c', protocol = -1)
1031
        
1032
    def persist_lease(self, lease):
1033
        """Persists a single lease to disk
1034
                
1035
        Arguments:
1036
        lease -- Lease to persist
1037
        """        
1038
        if not self.disabled:
1039
            self.shelf["lease-%i" % lease.id] = lease
1040
            self.shelf.sync()
1041

    
1042
    def persist_queue(self, queue):
1043
        """Persists the queue to disk
1044
                
1045
        Arguments:
1046
        queue -- The queue
1047
        """        
1048
        if not self.disabled:
1049
            self.shelf["queue"] = [l.id for l in queue]
1050
            self.shelf.sync()
1051
        
1052
    def persist_future_leases(self, leases):
1053
        """Persists the set of future leases
1054
                
1055
        Arguments:
1056
        leases -- "Future leases" (as determined by backfilling algorithm)
1057
        """              
1058
        if not self.disabled:
1059
            self.shelf["future"] = [l.id for l in leases]        
1060
            self.shelf.sync()
1061
        
1062
    def get_leases(self):
1063
        """Returns the leases persisted to disk.
1064
                
1065
        """              
1066
        return [v for k,v in self.shelf.items() if k.startswith("lease-")]
1067
    
1068
    def get_queue(self):
1069
        """Returns the queue persisted to disk.
1070
                
1071
        """              
1072
        if self.shelf.has_key("queue"):
1073
            return self.shelf["queue"]
1074
        else:
1075
            return []
1076
        
1077
    def get_future_leases(self):
1078
        """Returns the future leases persisted to disk.
1079
                
1080
        """              
1081
        if self.shelf.has_key("future"):
1082
            return self.shelf["future"]
1083
        else:
1084
            return []        
1085
    
1086
    def close(self):
1087
        """Closes the persistence manager.
1088
        
1089
        Closing the persistence manager saves any remaining
1090
        data to disk.
1091
        """              
1092
        if not self.disabled:
1093
            self.shelf.close()
1094
        
1095