Project

General

Profile

root / trunk / src / haizea / core / manager.py @ 632

1
# -------------------------------------------------------------------------- #
2
# Copyright 2006-2008, University of Chicago                                 #
3
# Copyright 2008, Distributed Systems Architecture Group, Universidad        #
4
# Complutense de Madrid (dsa-research.org)                                   #
5
#                                                                            #
6
# Licensed under the Apache License, Version 2.0 (the "License"); you may    #
7
# not use this file except in compliance with the License. You may obtain    #
8
# a copy of the License at                                                   #
9
#                                                                            #
10
# http://www.apache.org/licenses/LICENSE-2.0                                 #
11
#                                                                            #
12
# Unless required by applicable law or agreed to in writing, software        #
13
# distributed under the License is distributed on an "AS IS" BASIS,          #
14
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.   #
15
# See the License for the specific language governing permissions and        #
16
# limitations under the License.                                             #
17
# -------------------------------------------------------------------------- #
18

    
19
"""The manager (resource manager) module is the root of Haizea. If you want to
20
see where the ball starts rolling, look at the following two functions:
21

22
* manager.Manager.__init__()
23
* manager.Manager.start()
24

25
This module provides the following classes:
26

27
* Manager: Haizea itself. Pretty much everything else
28
  is contained in this class.
29
* Clock: A base class for Haizea's clock.
30
* SimulatedClock: A clock for simulations.
31
* RealClock: A clock that advances in realtime.
32
"""
33
 
34
import haizea.core.accounting as accounting
35
import haizea.common.constants as constants
36
from haizea.core.scheduler.preparation_schedulers.unmanaged import UnmanagedPreparationScheduler
37
from haizea.core.scheduler.preparation_schedulers.imagetransfer import ImageTransferPreparationScheduler
38
from haizea.core.enact.opennebula import OpenNebulaResourcePoolInfo, OpenNebulaVMEnactment, OpenNebulaDummyDeploymentEnactment
39
from haizea.core.enact.simulated import SimulatedResourcePoolInfo, SimulatedVMEnactment, SimulatedDeploymentEnactment
40
from haizea.core.frontends.tracefile import TracefileFrontend
41
from haizea.core.frontends.opennebula import OpenNebulaFrontend
42
from haizea.core.frontends.rpc import RPCFrontend
43
from haizea.core.scheduler import UnrecoverableError
44
from haizea.core.scheduler.lease_scheduler import LeaseScheduler
45
from haizea.core.scheduler.vm_scheduler import VMScheduler
46
from haizea.core.scheduler.mapper import class_mappings as mapper_mappings
47
from haizea.core.scheduler.slottable import SlotTable
48
from haizea.core.scheduler.policy import PolicyManager
49
from haizea.core.scheduler.resourcepool import ResourcePool, ResourcePoolWithReusableImages
50
from haizea.core.leases import Lease, Site
51
from haizea.core.log import HaizeaLogger
52
from haizea.core.rpcserver import RPCServer
53
from haizea.common.utils import abstract, round_datetime, Singleton, import_class
54
from haizea.policies import admission_class_mappings, preemption_class_mappings, host_class_mappings 
55

    
56
import operator
57
import logging
58
import signal
59
import sys, os
60
import traceback
61
from time import sleep
62
from math import ceil
63
from mx.DateTime import now, TimeDelta
64

    
65
DAEMON_STDOUT = DAEMON_STDIN = "/dev/null"
66
DAEMON_STDERR = "/var/tmp/haizea.err"
67
DEFAULT_LOGFILE = "/var/tmp/haizea.log"
68

    
69
class Manager(Singleton):
70
    """The root of Haizea
71
    
72
    This class is the root of Haizea. Pretty much everything else (scheduler,
73
    enactment modules, etc.) is contained in this class. The Manager
74
    class is meant to be a singleton.
75
    
76
    """
77
    
78
    def __init__(self, config, daemon=False, pidfile=None):
79
        """Initializes the manager.
80
        
81
        Argument:
82
        config -- a populated instance of haizea.common.config.RMConfig
83
        daemon -- True if Haizea must run as a daemon, False if it must
84
                  run in the foreground
85
        pidfile -- When running as a daemon, file to save pid to
86
        """
87
        self.config = config
88
        
89
        # Create the RM components
90
        
91
        mode = config.get("mode")
92
        
93
        self.daemon = daemon
94
        self.pidfile = pidfile
95

    
96
        if mode == "simulated":
97
            # Simulated-time simulations always run in the foreground
98
            clock = self.config.get("clock")
99
            if clock == constants.CLOCK_SIMULATED:
100
                self.daemon = False
101
        elif mode == "opennebula":
102
            clock = constants.CLOCK_REAL        
103
        
104
        self.init_logging()
105
                
106
        if clock == constants.CLOCK_SIMULATED:
107
            starttime = self.config.get("starttime")
108
            self.clock = SimulatedClock(self, starttime)
109
            self.rpc_server = None
110
        elif clock == constants.CLOCK_REAL:
111
            wakeup_interval = self.config.get("wakeup-interval")
112
            non_sched = self.config.get("non-schedulable-interval")
113
            if mode == "opennebula":
114
                fastforward = self.config.get("dry-run")
115
            else:
116
                fastforward = False
117
            self.clock = RealClock(self, wakeup_interval, non_sched, fastforward)
118
            if fastforward:
119
                # No need for an RPC server when doing a dry run
120
                self.rpc_server = None
121
            else:
122
                self.rpc_server = RPCServer(self)
123
                    
124
        # Enactment modules
125
        if mode == "simulated":
126
            resources = self.config.get("simul.resources")
127
            if resources == "in-tracefile":
128
                tracefile = self.config.get("tracefile")
129
                site = Site.from_lwf_file(tracefile)
130
            elif resources.startswith("file:"):
131
                sitefile = resources.split(":")
132
                site = Site.from_xml_file(sitefile)
133
            else:
134
                site = Site.from_resources_string(resources)
135
    
136
            info_enact = SimulatedResourcePoolInfo(site)
137
            vm_enact = SimulatedVMEnactment()
138
            deploy_enact = SimulatedDeploymentEnactment()
139
        elif mode == "opennebula":
140
            # Enactment modules
141
            info_enact = OpenNebulaResourcePoolInfo()
142
            vm_enact = OpenNebulaVMEnactment()
143
            # No deployment in OpenNebula. Using dummy one for now.
144
            deploy_enact = OpenNebulaDummyDeploymentEnactment()            
145

    
146
        if mode == "simulated":
147
            preparation_type = self.config.get("lease-preparation")
148
        elif mode == "opennebula":
149
            # No deployment in OpenNebula.
150
            preparation_type = constants.PREPARATION_UNMANAGED
151

    
152
        # Resource pool
153
        if preparation_type == constants.PREPARATION_TRANSFER:
154
            if self.config.get("diskimage-reuse") == constants.REUSE_IMAGECACHES:
155
                resourcepool = ResourcePoolWithReusableImages(info_enact, vm_enact, deploy_enact)
156
            else:
157
                resourcepool = ResourcePool(info_enact, vm_enact, deploy_enact)
158
        else:
159
            resourcepool = ResourcePool(info_enact, vm_enact, deploy_enact)
160
    
161
        # Slot table
162
        slottable = SlotTable(info_enact.get_resource_types())
163
        for n in resourcepool.get_nodes() + resourcepool.get_aux_nodes():
164
            rt = slottable.create_resource_tuple_from_capacity(n.capacity)
165
            slottable.add_node(n.id, rt)
166

    
167
        # Policy manager
168
        admission = self.config.get("policy.admission")
169
        admission = admission_class_mappings.get(admission, admission)
170
        admission = import_class(admission)
171
        admission = admission(slottable)
172
        
173
        preemption = self.config.get("policy.preemption")
174
        preemption = preemption_class_mappings.get(preemption, preemption)
175
        preemption = import_class(preemption)
176
        preemption = preemption(slottable)
177

    
178
        host_selection = self.config.get("policy.host-selection")
179
        host_selection = host_class_mappings.get(host_selection, host_selection)
180
        host_selection = import_class(host_selection)
181
        host_selection = host_selection(slottable)
182

    
183
        self.policy = PolicyManager(admission, preemption, host_selection)
184

    
185
        # Preparation scheduler
186
        if preparation_type == constants.PREPARATION_UNMANAGED:
187
            preparation_scheduler = UnmanagedPreparationScheduler(slottable, resourcepool, deploy_enact)
188
        elif preparation_type == constants.PREPARATION_TRANSFER:
189
            preparation_scheduler = ImageTransferPreparationScheduler(slottable, resourcepool, deploy_enact)    
190
    
191
        # VM mapper and scheduler
192
        mapper = self.config.get("mapper")
193
        mapper = mapper_mappings.get(mapper, mapper)
194
        mapper = import_class(mapper)
195
        mapper = mapper(slottable, self.policy)
196
        vm_scheduler = VMScheduler(slottable, resourcepool, mapper)
197
    
198
        # Lease Scheduler
199
        self.scheduler = LeaseScheduler(vm_scheduler, preparation_scheduler, slottable)
200
        
201
        # Lease request frontends
202
        if mode == "simulated":
203
            if clock == constants.CLOCK_SIMULATED:
204
                # In pure simulation, we can only use the tracefile frontend
205
                self.frontends = [TracefileFrontend(self, self.clock.get_start_time())]
206
            elif clock == constants.CLOCK_REAL:
207
                # In simulation with a real clock, only the RPC frontend can be used
208
                self.frontends = [RPCFrontend(self)]             
209
        elif mode == "opennebula":
210
                self.frontends = [OpenNebulaFrontend(self)]               
211

    
212
        # Statistics collection 
213
        self.accounting = accounting.AccountingDataCollection(self, self.config.get("datafile"))
214
        
215
        self.logger = logging.getLogger("RM")
216

    
217

    
218
    def init_logging(self):
219
        """Initializes logging
220
        
221
        """
222

    
223
        logger = logging.getLogger("")
224
        if self.daemon:
225
            handler = logging.FileHandler(self.config.get("logfile"))
226
        else:
227
            handler = logging.StreamHandler()
228
        formatter = logging.Formatter('[%(haizeatime)s] %(name)-7s %(message)s')
229
        handler.setFormatter(formatter)
230
        logger.addHandler(handler)
231
        level = logging.getLevelName(self.config.get("loglevel"))
232
        logger.setLevel(level)
233
        logging.setLoggerClass(HaizeaLogger)
234

    
235
        
236
    def daemonize(self):
237
        """Daemonizes the Haizea process.
238
        
239
        Based on code in:  http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/66012
240
        
241
        """
242
        # First fork
243
        try:
244
            pid = os.fork()
245
            if pid > 0: 
246
                # Exit first parent
247
                sys.exit(0) 
248
        except OSError, e:
249
            sys.stderr.write("Failed to daemonize Haizea: (%d) %s\n" % (e.errno, e.strerror))
250
            sys.exit(1)
251
    
252
        # Decouple from parent environment.
253
        os.chdir(".")
254
        os.umask(0)
255
        os.setsid()
256
    
257
        # Second fork
258
        try:
259
            pid = os.fork()
260
            if pid > 0: 
261
                # Exit second parent.
262
                sys.exit(0) 
263
        except OSError, e:
264
            sys.stderr.write("Failed to daemonize Haizea: (%d) %s\n" % (e.errno, e.strerror))
265
            sys.exit(2)
266
            
267
        # Open file descriptors and print start message
268
        si = file(DAEMON_STDIN, 'r')
269
        so = file(DAEMON_STDOUT, 'a+')
270
        se = file(DAEMON_STDERR, 'a+', 0)
271
        pid = os.getpid()
272
        sys.stderr.write("\nStarted Haizea daemon with pid %i\n\n" % pid)
273
        sys.stderr.flush()
274
        file(self.pidfile,'w+').write("%i\n" % pid)
275
        
276
        # Redirect standard file descriptors.
277
        os.close(sys.stdin.fileno())
278
        os.close(sys.stdout.fileno())
279
        os.close(sys.stderr.fileno())
280
        os.dup2(si.fileno(), sys.stdin.fileno())
281
        os.dup2(so.fileno(), sys.stdout.fileno())
282
        os.dup2(se.fileno(), sys.stderr.fileno())
283

    
284
    def start(self):
285
        """Starts the resource manager"""
286
        self.logger.info("Starting resource manager")
287

    
288
        # Create counters to keep track of interesting data.
289
        self.accounting.create_counter(constants.COUNTER_ARACCEPTED, constants.AVERAGE_NONE)
290
        self.accounting.create_counter(constants.COUNTER_ARREJECTED, constants.AVERAGE_NONE)
291
        self.accounting.create_counter(constants.COUNTER_IMACCEPTED, constants.AVERAGE_NONE)
292
        self.accounting.create_counter(constants.COUNTER_IMREJECTED, constants.AVERAGE_NONE)
293
        self.accounting.create_counter(constants.COUNTER_BESTEFFORTCOMPLETED, constants.AVERAGE_NONE)
294
        self.accounting.create_counter(constants.COUNTER_QUEUESIZE, constants.AVERAGE_TIMEWEIGHTED)
295
        self.accounting.create_counter(constants.COUNTER_DISKUSAGE, constants.AVERAGE_NONE)
296
        self.accounting.create_counter(constants.COUNTER_UTILIZATION, constants.AVERAGE_NONE)
297
        
298
        if self.daemon:
299
            self.daemonize()
300
        if self.rpc_server:
301
            self.rpc_server.start()
302
            
303
        # Start the clock
304
        try:
305
            self.clock.run()
306
        except UnrecoverableError, exc:
307
            self.__unrecoverable_error(exc)
308
        except Exception, exc:
309
            self.__unexpected_exception(exc)
310

    
311
    def stop(self):
312
        """Stops the resource manager by stopping the clock"""
313
        self.clock.stop()
314
        
315
    def graceful_stop(self):
316
        """Stops the resource manager gracefully and exits"""
317
        
318
        self.logger.status("Stopping resource manager gracefully...")
319
        
320
        # Stop collecting data (this finalizes counters)
321
        self.accounting.stop()
322
        
323
        # TODO: When gracefully stopping mid-scheduling, we need to figure out what to
324
        #       do with leases that are still running.
325

    
326
        self.print_status()
327
        
328
        # In debug mode, dump the lease descriptors.
329
        for lease in self.scheduler.completed_leases.entries.values():
330
            lease.print_contents()
331
            
332
        # Write all collected data to disk
333
        self.accounting.save_to_disk()
334
        
335
        # Stop RPC server
336
        if self.rpc_server != None:
337
            self.rpc_server.stop()
338
                    
339
    def process_requests(self, nexttime):
340
        """Process any new requests in the request frontend
341
        
342
        Checks the request frontend to see if there are any new requests that
343
        have to be processed. AR leases are sent directly to the schedule.
344
        Best-effort leases are queued.
345
        
346
        Arguments:
347
        nexttime -- The next time at which the scheduler can allocate resources.
348
                    This is meant to be provided by the clock simply as a sanity
349
                    measure when running in real time (to avoid scheduling something
350
                    "now" to actually have "now" be in the past once the scheduling
351
                    function returns. i.e., nexttime has nothing to do with whether 
352
                    there are resources available at that time or not.
353
        
354
        """        
355
        
356
        # Get requests from frontend
357
        requests = []
358
        for frontend in self.frontends:
359
            requests += frontend.get_accumulated_requests()
360
        requests.sort(key=operator.attrgetter("submit_time"))
361
        
362
        # Request leases and run the scheduling function.
363
        try:
364
            self.logger.vdebug("Requesting leases")
365
            for req in requests:
366
                self.scheduler.request_lease(req)
367

    
368
            self.logger.vdebug("Running scheduling function")
369
            self.scheduler.schedule(nexttime)
370
        except UnrecoverableError, exc:
371
            self.__unrecoverable_error(exc)
372
        except Exception, exc:
373
            self.__unexpected_exception(exc)
374

    
375
    def process_starting_reservations(self, time):
376
        """Process reservations starting/stopping at specified time"""
377
        
378
        # The lease scheduler takes care of this.
379
        try:
380
            self.scheduler.process_starting_reservations(time)
381
        except UnrecoverableError, exc:
382
            self.__unrecoverable_error(exc)
383
        except Exception, exc:
384
            self.__unexpected_exception(exc)
385

    
386
    def process_ending_reservations(self, time):
387
        """Process reservations starting/stopping at specified time"""
388
        
389
        # The lease scheduler takes care of this.
390
        try:
391
            self.scheduler.process_ending_reservations(time)
392
        except UnrecoverableError, exc:
393
            self.__unrecoverable_error(exc)
394
        except Exception, exc:
395
            self.__unexpected_exception(exc)
396
         
397
    def get_utilization(self, nowtime):
398
        """ Gather utilization information at a given time.
399
        
400
        Each time we process reservations, we report resource utilization 
401
        to the accounting module. This utilization information shows what 
402
        portion of the physical resources is used by each type of reservation 
403
        (e.g., 70% are running a VM, 5% are doing suspensions, etc.) See the 
404
        accounting module for details on how this data is stored.
405
        Currently we only collect utilization from the VM Scheduler 
406
        (in the future, information may also be gathered from the preparation 
407
        scheduler).
408
        """
409
        util = self.scheduler.vm_scheduler.get_utilization(nowtime)
410
        self.accounting.append_stat(constants.COUNTER_UTILIZATION, util)             
411
             
412
    def notify_event(self, lease_id, event):
413
        """Notifies an asynchronous event to Haizea.
414
        
415
        Arguments:
416
        lease_id -- ID of lease that is affected by event
417
        event -- Event (currently, only the constants.EVENT_END_VM event is supported)
418
        """
419
        try:
420
            lease = self.scheduler.get_lease_by_id(lease_id)
421
            self.scheduler.notify_event(lease, event)
422
        except UnrecoverableError, exc:
423
            self.__unrecoverable_error(exc)
424
        except Exception, exc:
425
            self.__unexpected_exception(exc)
426
        
427
    def cancel_lease(self, lease_id):
428
        """Cancels a lease.
429
        
430
        Arguments:
431
        lease_id -- ID of lease to cancel
432
        """    
433
        try:
434
            lease = self.scheduler.get_lease_by_id(lease_id)
435
            self.scheduler.cancel_lease(lease)
436
        except UnrecoverableError, exc:
437
            self.__unrecoverable_error(exc)
438
        except Exception, exc:
439
            self.__unexpected_exception(exc)
440
            
441
    def get_next_changepoint(self):
442
        """Return next changepoint in the slot table"""
443
        return self.scheduler.slottable.get_next_changepoint(self.clock.get_time())
444
   
445
    def exists_more_leases(self):
446
        """Return True if there are any leases still "in the system" """
447
        return self.scheduler.exists_scheduled_leases() or not self.scheduler.is_queue_empty()
448

    
449
    def print_status(self):
450
        """Prints status summary."""
451
        
452
        leases = self.scheduler.leases.get_leases()
453
        completed_leases = self.scheduler.completed_leases.get_leases()
454
        self.logger.status("--- Haizea status summary ---")
455
        self.logger.status("Number of leases (not including completed): %i" % len(leases))
456
        self.logger.status("Completed leases: %i" % len(completed_leases))
457
        self.logger.status("Completed best-effort leases: %i" % self.accounting.data.counters[constants.COUNTER_BESTEFFORTCOMPLETED])
458
        self.logger.status("Queue size: %i" % self.accounting.data.counters[constants.COUNTER_QUEUESIZE])
459
        self.logger.status("Accepted AR leases: %i" % self.accounting.data.counters[constants.COUNTER_ARACCEPTED])
460
        self.logger.status("Rejected AR leases: %i" % self.accounting.data.counters[constants.COUNTER_ARREJECTED])
461
        self.logger.status("Accepted IM leases: %i" % self.accounting.data.counters[constants.COUNTER_IMACCEPTED])
462
        self.logger.status("Rejected IM leases: %i" % self.accounting.data.counters[constants.COUNTER_IMREJECTED])
463
        self.logger.status("---- End summary ----")        
464

    
465
    def __unrecoverable_error(self, exc):
466
        """Handles an unrecoverable error.
467
        
468
        This method prints information on the unrecoverable error and makes Haizea panic.
469
        """
470
        self.logger.error("An unrecoverable error has happened.")
471
        self.logger.error("Original exception:")
472
        self.__print_exception(exc.exc, exc.get_traceback())
473
        self.logger.error("Unrecoverable error traceback:")
474
        self.__print_exception(exc, sys.exc_info()[2])
475
        self.__panic()
476

    
477
    def __unexpected_exception(self, exc):
478
        """Handles an unrecoverable error.
479
        
480
        This method prints information on the unrecoverable error and makes Haizea panic.
481
        """
482
        self.logger.error("An unexpected exception has happened.")
483
        self.__print_exception(exc, sys.exc_info()[2])
484
        self.__panic()
485
            
486
    def __print_exception(self, exc, exc_traceback):
487
        """Prints an exception's traceback to the log."""
488
        tb = traceback.format_tb(exc_traceback)
489
        for line in tb:
490
            self.logger.error(line)
491
        self.logger.error("Message: %s" % exc)
492

    
493
    
494
    def __panic(self):
495
        """Makes Haizea crash and burn in a panicked frenzy"""
496
        
497
        self.logger.status("Panicking...")
498

    
499
        # Stop RPC server
500
        if self.rpc_server != None:
501
            self.rpc_server.stop()
502

    
503
        # Dump state
504
        self.print_status()
505
        self.logger.error("Next change point (in slot table): %s" % self.get_next_changepoint())
506

    
507
        # Print lease descriptors
508
        leases = self.scheduler.leases.get_leases()
509
        if len(leases)>0:
510
            self.logger.vdebug("vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv")
511
            for lease in leases:
512
                lease.print_contents()
513
            self.logger.vdebug("^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^")        
514

    
515
        # Exit
516
        treatment = self.config.get("lease-failure-handling")
517
        if treatment == constants.ONFAILURE_EXIT_RAISE:
518
            raise
519
        else:
520
            exit(1)
521

    
522
            
523
class Clock(object):
524
    """Base class for the resource manager's clock.
525
    
526
    The clock is in charge of periodically waking the resource manager so it
527
    will process new requests and handle existing reservations. This is a
528
    base class defining abstract methods.
529
    
530
    """
531
    def __init__(self, manager):
532
        self.manager = manager
533
        self.done = False
534
    
535
    def get_time(self): 
536
        """Return the current time"""
537
        return abstract()
538

    
539
    def get_start_time(self): 
540
        """Return the time at which the clock started ticking"""
541
        return abstract()
542
    
543
    def get_next_schedulable_time(self): 
544
        """Return the next time at which resources could be scheduled.
545
        
546
        The "next schedulable time" server sanity measure when running 
547
        in real time (to avoid scheduling something "now" to actually 
548
        have "now" be in the past once the scheduling function returns. 
549
        i.e., the "next schedulable time" has nothing to do with whether 
550
        there are resources available at that time or not.
551
        """
552
        return abstract()
553
    
554
    def run(self):
555
        """Start and run the clock. This function is, in effect,
556
        the main loop of the resource manager."""
557
        return abstract()     
558

    
559
    def stop(self):
560
        """Stop the clock.
561
        
562
        Stopping the clock makes Haizea exit.
563
        """
564
        self.done = True    
565
    
566
        
567
class SimulatedClock(Clock):
568
    """Simulates the passage of time... really fast.
569
    
570
    The simulated clock steps through time to produce an ideal schedule.
571
    See the run() function for a description of how time is incremented
572
    exactly in the simulated clock.
573
    
574
    """
575
    
576
    def __init__(self, manager, starttime):
577
        """Initialize the simulated clock, starting at the provided starttime"""
578
        Clock.__init__(self, manager)
579
        self.starttime = starttime
580
        self.time = starttime
581
        self.logger = logging.getLogger("CLOCK")
582
        self.statusinterval = self.manager.config.get("status-message-interval")
583
       
584
    def get_time(self):
585
        """See docstring in base Clock class."""
586
        return self.time
587
    
588
    def get_start_time(self):
589
        """See docstring in base Clock class."""
590
        return self.starttime
591

    
592
    def get_next_schedulable_time(self):
593
        """See docstring in base Clock class."""
594
        return self.time    
595
    
596
    def run(self):
597
        """Runs the simulated clock through time.
598
        
599
        The clock starts at the provided start time. At each point in time,
600
        it wakes up the resource manager and then skips to the next time
601
        where "something" is happening (see __get_next_time for a more
602
        rigorous description of this).
603
        
604
        The clock stops when there is nothing left to do (no pending or 
605
        queue requests, and no future reservations)
606
        
607
        The simulated clock can only work in conjunction with the
608
        tracefile request frontend.
609
        """
610
        self.logger.status("Starting simulated clock")
611
        self.manager.accounting.start(self.get_start_time())
612
        prevstatustime = self.time
613
        
614
        # Main loop
615
        while not self.done:
616
            # Check to see if there are any leases which are ending prematurely.
617
            # Note that this is unique to simulation.
618
            prematureends = self.manager.scheduler.slottable.get_prematurely_ending_res(self.time)
619
            
620
            # Notify the resource manager about the premature ends
621
            for rr in prematureends:
622
                self.manager.notify_event(rr.lease.id, constants.EVENT_END_VM)
623
                
624
            # Process reservations starting/stopping at the current time and
625
            # check if there are any new requests.
626
            self.manager.process_ending_reservations(self.time)
627
            self.manager.process_starting_reservations(self.time)
628
            self.manager.process_requests(self.time)
629
            
630
            # Since processing requests may have resulted in new reservations
631
            # starting now, we process reservations again.
632
            self.manager.process_starting_reservations(self.time)
633
            # And one final call to deal with nil-duration reservations
634
            self.manager.process_ending_reservations(self.time)
635
            
636
            
637
            # Print a status message
638
            if self.statusinterval != None and (self.time - prevstatustime).minutes >= self.statusinterval:
639
                self.manager.print_status()
640
                prevstatustime = self.time
641
                
642
            # Skip to next point in time.
643
            self.time, self.done = self.__get_next_time()
644
                    
645
        self.logger.status("Simulated clock has stopped")
646

    
647
        # Stop the resource manager
648
        self.manager.graceful_stop()
649
        
650
    
651
    def __get_next_time(self):
652
        """Determines what is the next point in time to skip to.
653
        
654
        At a given point in time, the next time is the earliest of the following:
655
        * The arrival of the next lease request
656
        * The start or end of a reservation (a "changepoint" in the slot table)
657
        * A premature end of a lease
658
        """
659
        
660
        # Determine candidate next times
661
        tracefrontend = self.__get_trace_frontend()
662
        nextchangepoint = self.manager.get_next_changepoint()
663
        nextprematureend = self.manager.scheduler.slottable.get_next_premature_end(self.time)
664
        nextreqtime = tracefrontend.get_next_request_time()
665
        self.logger.debug("Next change point (in slot table): %s" % nextchangepoint)
666
        self.logger.debug("Next request time: %s" % nextreqtime)
667
        self.logger.debug("Next premature end: %s" % nextprematureend)
668
        
669
        # The previous time is now
670
        prevtime = self.time
671
        
672
        # We initialize the next time to now too, to detect if
673
        # we've been unable to determine what the next time is.
674
        newtime = self.time
675
        
676
        # Find the earliest of the three, accounting for None values
677
        if nextchangepoint != None and nextreqtime == None:
678
            newtime = nextchangepoint
679
        elif nextchangepoint == None and nextreqtime != None:
680
            newtime = nextreqtime
681
        elif nextchangepoint != None and nextreqtime != None:
682
            newtime = min(nextchangepoint, nextreqtime)
683
            
684
        if nextprematureend != None:
685
            newtime = min(nextprematureend, newtime)
686
                        
687
        # If there's no more leases in the system, and no more pending requests,
688
        # then we're done.
689
        if not self.manager.exists_more_leases() and not tracefrontend.exists_more_requests():
690
            self.done = True
691
        
692
        # We can also be done if we've specified that we want to stop when
693
        # the best-effort requests are all done or when they've all been submitted.
694
        stopwhen = self.manager.config.get("stop-when")
695
        besteffort = self.manager.scheduler.leases.get_leases(type = Lease.BEST_EFFORT)
696
        pendingbesteffort = [r for r in tracefrontend.requests if r.get_type() == Lease.BEST_EFFORT]
697
        if stopwhen == constants.STOPWHEN_BEDONE:
698
            if self.manager.scheduler.is_queue_empty() and len(besteffort) + len(pendingbesteffort) == 0:
699
                self.done = True
700
        elif stopwhen == constants.STOPWHEN_BESUBMITTED:
701
            if len(pendingbesteffort) == 0:
702
                self.done = True
703
                
704
        # If we didn't arrive at a new time, and we're not done, we've fallen into
705
        # an infinite loop. This is A Bad Thing(tm).
706
        if newtime == prevtime and self.done != True:
707
            raise Exception, "Simulated clock has fallen into an infinite loop."
708
        
709
        return newtime, self.done
710

    
711
    def __get_trace_frontend(self):
712
        """Gets the tracefile frontend from the resource manager"""
713
        frontends = self.manager.frontends
714
        tracef = [f for f in frontends if isinstance(f, TracefileFrontend)]
715
        if len(tracef) != 1:
716
            raise Exception, "The simulated clock can only work with a tracefile request frontend."
717
        else:
718
            return tracef[0] 
719
        
720
        
721
class RealClock(Clock):
722
    """A realtime clock.
723
    
724
    The real clock wakes up periodically to, in turn, tell the resource manager
725
    to wake up. The real clock can also be run in a "fastforward" mode for
726
    debugging purposes (however, unlike the simulated clock, the clock will
727
    always skip a fixed amount of time into the future).
728
    """
729
    def __init__(self, manager, quantum, non_sched, fastforward = False):
730
        """Initializes the real clock.
731
        
732
        Arguments:
733
        manager -- the resource manager
734
        quantum -- interval between clock wakeups
735
        fastforward -- if True, the clock won't actually sleep
736
                       for the duration of the quantum."""
737
        Clock.__init__(self, manager)
738
        self.fastforward = fastforward
739
        if not self.fastforward:
740
            self.lastwakeup = None
741
        else:
742
            self.lastwakeup = round_datetime(now())
743
        self.logger = logging.getLogger("CLOCK")
744
        self.starttime = self.get_time()
745
        self.nextschedulable = None
746
        self.nextperiodicwakeup = None
747
        self.quantum = TimeDelta(seconds=quantum)
748
        self.non_sched = TimeDelta(seconds=non_sched)
749
               
750
    def get_time(self):
751
        """See docstring in base Clock class."""
752
        if not self.fastforward:
753
            return now()
754
        else:
755
            return self.lastwakeup
756
    
757
    def get_start_time(self):
758
        """See docstring in base Clock class."""
759
        return self.starttime
760

    
761
    def get_next_schedulable_time(self):
762
        """See docstring in base Clock class."""
763
        return self.nextschedulable    
764
    
765
    def run(self):
766
        """Runs the real clock through time.
767
        
768
        The clock starts when run() is called. In each iteration of the main loop
769
        it will do the following:
770
        - Wake up the resource manager
771
        - Determine if there will be anything to do before the next
772
          time the clock will wake up (after the quantum has passed). Note
773
          that this information is readily available on the slot table.
774
          If so, set next-wakeup-time to (now + time until slot table
775
          event). Otherwise, set it to (now + quantum)
776
        - Sleep until next-wake-up-time
777
        
778
        The clock keeps on tickin' until a SIGINT signal (Ctrl-C if running in the
779
        foreground) or a SIGTERM signal is received.
780
        """
781
        self.logger.status("Starting clock")
782
        self.manager.accounting.start(self.get_start_time())
783
        
784
        try:
785
            signal.signal(signal.SIGINT, self.signalhandler_gracefulstop)
786
            signal.signal(signal.SIGTERM, self.signalhandler_gracefulstop)
787
        except ValueError, exc:
788
            # This means Haizea is not the main thread, which will happen
789
            # when running it as part of a py.test. We simply ignore this
790
            # to allow the test to continue.
791
            pass
792
        
793
        # Main loop
794
        while not self.done:
795
            self.logger.status("Waking up to manage resources")
796
            
797
            # Save the waking time. We want to use a consistent time in the 
798
            # resource manager operations (if we use now(), we'll get a different
799
            # time every time)
800
            if not self.fastforward:
801
                self.lastwakeup = round_datetime(self.get_time())
802
            self.logger.status("Wake-up time recorded as %s" % self.lastwakeup)
803
                
804
            # Next schedulable time
805
            self.nextschedulable = round_datetime(self.lastwakeup + self.non_sched)
806
            
807
            # Wake up the resource manager
808
            self.manager.process_ending_reservations(self.lastwakeup)
809
            self.manager.process_starting_reservations(self.lastwakeup)
810
            # TODO: Compute nextschedulable here, before processing requests
811
            self.manager.process_requests(self.nextschedulable)
812
            
813
            # Next wakeup time
814
            time_now = now()
815
            if self.lastwakeup + self.quantum <= time_now:
816
                quantums = (time_now - self.lastwakeup) / self.quantum
817
                quantums = int(ceil(quantums)) * self.quantum
818
                self.nextperiodicwakeup = round_datetime(self.lastwakeup + quantums)
819
            else:
820
                self.nextperiodicwakeup = round_datetime(self.lastwakeup + self.quantum)
821
            
822
            # Determine if there's anything to do before the next wakeup time
823
            nextchangepoint = self.manager.get_next_changepoint()
824
            if nextchangepoint != None and nextchangepoint <= self.nextperiodicwakeup:
825
                # We need to wake up earlier to handle a slot table event
826
                nextwakeup = nextchangepoint
827
                self.logger.status("Going back to sleep. Waking up at %s to handle slot table event." % nextwakeup)
828
            else:
829
                # Nothing to do before waking up
830
                nextwakeup = self.nextperiodicwakeup
831
                self.logger.status("Going back to sleep. Waking up at %s to see if something interesting has happened by then." % nextwakeup)
832
            
833
            # The only exit condition from the real clock is if the stop_when_no_more_leases
834
            # is set to True, and there's no more work left to do.
835
            # TODO: This first if is a kludge. Other options should only interact with
836
            # options through the configfile's get method. The "stop-when-no-more-leases"
837
            # option is currently OpenNebula-specific (while the real clock isn't; it can
838
            # be used by both the simulator and the OpenNebula mode). This has to be
839
            # fixed.            
840
            if self.manager.config._options.has_key("stop-when-no-more-leases"):
841
                stop_when_no_more_leases = self.manager.config.get("stop-when-no-more-leases")
842
                if stop_when_no_more_leases and not self.manager.exists_more_leases():
843
                    self.done = True
844
            
845
            # Sleep
846
            if not self.done:
847
                if not self.fastforward:
848
                    sleep((nextwakeup - now()).seconds)
849
                else:
850
                    self.lastwakeup = nextwakeup
851

    
852
        self.logger.status("Real clock has stopped")
853

    
854
        # Stop the resource manager
855
        self.manager.graceful_stop()
856
    
857
    def signalhandler_gracefulstop(self, signum, frame):
858
        """Handler for SIGTERM and SIGINT. Allows Haizea to stop gracefully."""
859
        
860
        sigstr = ""
861
        if signum == signal.SIGTERM:
862
            sigstr = " (SIGTERM)"
863
        elif signum == signal.SIGINT:
864
            sigstr = " (SIGINT)"
865
        self.logger.status("Received signal %i%s" %(signum, sigstr))
866
        self.done = True
867