Project

General

Profile

root / branches / 1.1 / src / haizea / core / manager.py @ 841

1
# -------------------------------------------------------------------------- #
2
# Copyright 2006-2009, University of Chicago                                 #
3
# Copyright 2008-2009, Distributed Systems Architecture Group, Universidad   #
4
# Complutense de Madrid (dsa-research.org)                                   #
5
#                                                                            #
6
# Licensed under the Apache License, Version 2.0 (the "License"); you may    #
7
# not use this file except in compliance with the License. You may obtain    #
8
# a copy of the License at                                                   #
9
#                                                                            #
10
# http://www.apache.org/licenses/LICENSE-2.0                                 #
11
#                                                                            #
12
# Unless required by applicable law or agreed to in writing, software        #
13
# distributed under the License is distributed on an "AS IS" BASIS,          #
14
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.   #
15
# See the License for the specific language governing permissions and        #
16
# limitations under the License.                                             #
17
# -------------------------------------------------------------------------- #
18

    
19
"""The manager (resource manager) module is the root of Haizea. If you want to
20
see where the ball starts rolling, look at the following two functions:
21

22
* manager.Manager.__init__()
23
* manager.Manager.start()
24

25
This module provides the following classes:
26

27
* Manager: Haizea itself. Pretty much everything else
28
  is contained in this class.
29
* Clock: A base class for Haizea's clock.
30
* SimulatedClock: A clock for simulations.
31
* RealClock: A clock that advances in realtime.
32
"""
33
 
34
import haizea.common.constants as constants
35
from haizea.core.scheduler.preparation_schedulers.unmanaged import UnmanagedPreparationScheduler
36
from haizea.core.scheduler.preparation_schedulers.imagetransfer import ImageTransferPreparationScheduler
37
from haizea.core.enact.opennebula import OpenNebulaResourcePoolInfo, OpenNebulaVMEnactment, OpenNebulaDummyDeploymentEnactment
38
from haizea.core.enact.simulated import SimulatedResourcePoolInfo, SimulatedVMEnactment, SimulatedDeploymentEnactment
39
from haizea.core.frontends.tracefile import TracefileFrontend
40
from haizea.core.frontends.opennebula import OpenNebulaFrontend
41
from haizea.core.frontends.rpc import RPCFrontend
42
from haizea.core.accounting import AccountingDataCollection
43
from haizea.core.scheduler import UnrecoverableError
44
from haizea.core.scheduler.lease_scheduler import LeaseScheduler
45
from haizea.core.scheduler.vm_scheduler import VMScheduler
46
from haizea.core.scheduler.mapper import class_mappings as mapper_mappings
47
from haizea.core.scheduler.slottable import SlotTable, ResourceReservation
48
from haizea.core.scheduler.policy import PolicyManager
49
from haizea.core.scheduler.resourcepool import ResourcePool, ResourcePoolWithReusableImages
50
from haizea.core.leases import Lease, Site
51
from haizea.core.log import HaizeaLogger
52
from haizea.core.rpcserver import RPCServer
53
from haizea.common.utils import abstract, round_datetime, Singleton, import_class, OpenNebulaXMLRPCClientSingleton
54
from haizea.common.opennebula_xmlrpc import OpenNebulaXMLRPCClient
55
from haizea.pluggable.policies import admission_class_mappings, preemption_class_mappings, host_class_mappings, pricing_mappings
56
from haizea.pluggable.accounting import probe_class_mappings
57

    
58
import operator
59
import logging
60
import signal
61
import sys, os
62
import traceback
63
import shelve
64
import socket
65
from time import sleep
66
from math import ceil
67
from mx.DateTime import now, TimeDelta, Parser
68

    
69
DAEMON_STDOUT = DAEMON_STDIN = "/dev/null"
70
DAEMON_STDERR = "/var/tmp/haizea.err"
71
DEFAULT_LOGFILE = "/var/tmp/haizea.log"
72

    
73
class Manager(object):
74
    """The root of Haizea
75
    
76
    This class is the root of Haizea. Pretty much everything else (scheduler,
77
    enactment modules, etc.) is contained in this class. The Manager
78
    class is meant to be a singleton.
79
    
80
    """
81
    
82
    __metaclass__ = Singleton
83
    
84
    def __init__(self, config, daemon=False, pidfile=None):
85
        """Initializes the manager.
86
        
87
        Argument:
88
        config -- a populated instance of haizea.common.config.RMConfig
89
        daemon -- True if Haizea must run as a daemon, False if it must
90
                  run in the foreground
91
        pidfile -- When running as a daemon, file to save pid to
92
        """
93
        self.config = config
94
        self.daemon = daemon
95
        self.pidfile = pidfile
96

    
97

    
98
    # Has to be in a separate function since some pluggable modules need to
99
    # access the Manager singleton
100
    def __initialize(self):
101
        # Create the RM components
102
        
103
        mode = self.config.get("mode")
104
        
105

    
106
        if mode == "simulated":
107
            # Simulated-time simulations always run in the foreground
108
            clock = self.config.get("clock")
109
            if clock == constants.CLOCK_SIMULATED:
110
                self.daemon = False
111
        elif mode == "opennebula":
112
            clock = constants.CLOCK_REAL        
113
        
114
        self.init_logging()
115
                
116
        if clock == constants.CLOCK_SIMULATED:
117
            starttime = self.config.get("starttime")
118
            self.clock = SimulatedClock(self, starttime)
119
            self.rpc_server = None
120
        elif clock == constants.CLOCK_REAL:
121
            wakeup_interval = self.config.get("wakeup-interval")
122
            non_sched = self.config.get("non-schedulable-interval")
123
            if mode == "opennebula":
124
                fastforward = self.config.get("dry-run")
125
            else:
126
                fastforward = False
127
            self.clock = RealClock(self, wakeup_interval, non_sched, fastforward)
128
            if fastforward:
129
                # No need for an RPC server when doing a dry run
130
                self.rpc_server = None
131
            else:
132
                self.rpc_server = RPCServer(self)
133
                    
134
        # Create the RPC singleton client for OpenNebula mode
135
        if mode == "opennebula":
136
            host = self.config.get("one.host")
137
            port = self.config.get("one.port")
138
            try:
139
                rv = OpenNebulaXMLRPCClient.get_userpass_from_env()
140
            except:
141
                print "ONE_AUTH environment variable is not set or authorization file is malformed"
142
                exit(1)
143
                
144
            user, passw = rv[0], rv[1]
145
            try:
146
                OpenNebulaXMLRPCClientSingleton(host, port, user, passw)
147
            except socket.error, e:
148
                print "Unable to connect to OpenNebula"
149
                print "Reason: %s" % e
150
                exit(1)
151
                    
152
        # Enactment modules
153
        if mode == "simulated":
154
            resources = self.config.get("simul.resources")
155
            if resources == "in-tracefile":
156
                tracefile = self.config.get("tracefile")
157
                site = Site.from_lwf_file(tracefile)
158
            elif resources.startswith("file:"):
159
                sitefile = resources.split(":")
160
                site = Site.from_xml_file(sitefile[1])
161
            else:
162
                site = Site.from_resources_string(resources)
163
    
164
            deploy_bandwidth = self.config.get("imagetransfer-bandwidth")
165
            info_enact = SimulatedResourcePoolInfo(site)
166
            vm_enact = SimulatedVMEnactment()
167
            deploy_enact = SimulatedDeploymentEnactment(deploy_bandwidth)
168
        elif mode == "opennebula":
169
            # Enactment modules
170
            info_enact = OpenNebulaResourcePoolInfo()
171
            vm_enact = OpenNebulaVMEnactment()
172
            # No deployment in OpenNebula. Using dummy one for now.
173
            deploy_enact = OpenNebulaDummyDeploymentEnactment()            
174

    
175
        if mode == "simulated":
176
            preparation_type = self.config.get("lease-preparation")
177
        elif mode == "opennebula":
178
            # No deployment in OpenNebula.
179
            preparation_type = constants.PREPARATION_UNMANAGED
180

    
181
        # Resource pool
182
        if preparation_type == constants.PREPARATION_TRANSFER:
183
            if self.config.get("diskimage-reuse") == constants.REUSE_IMAGECACHES:
184
                resourcepool = ResourcePoolWithReusableImages(info_enact, vm_enact, deploy_enact)
185
            else:
186
                resourcepool = ResourcePool(info_enact, vm_enact, deploy_enact)
187
        else:
188
            resourcepool = ResourcePool(info_enact, vm_enact, deploy_enact)
189
    
190
        # Slot table
191
        slottable = SlotTable(info_enact.get_resource_types())
192
        for n in resourcepool.get_nodes() + resourcepool.get_aux_nodes():
193
            rt = slottable.create_resource_tuple_from_capacity(n.capacity)
194
            slottable.add_node(n.id, rt)
195

    
196
        # Policy manager
197
        admission = self.config.get("policy.admission")
198
        admission = admission_class_mappings.get(admission, admission)
199
        admission = import_class(admission)
200
        admission = admission(slottable)
201
        
202
        preemption = self.config.get("policy.preemption")
203
        preemption = preemption_class_mappings.get(preemption, preemption)
204
        preemption = import_class(preemption)
205
        preemption = preemption(slottable)
206

    
207
        host_selection = self.config.get("policy.host-selection")
208
        host_selection = host_class_mappings.get(host_selection, host_selection)
209
        host_selection = import_class(host_selection)
210
        host_selection = host_selection(slottable)
211

    
212
        pricing = self.config.get("policy.pricing")
213
        pricing = pricing_mappings.get(pricing, pricing)
214
        pricing = import_class(pricing)
215
        pricing = pricing(slottable)
216

    
217
        self.policy = PolicyManager(admission, preemption, host_selection, pricing)
218

    
219
        # Preparation scheduler
220
        if preparation_type == constants.PREPARATION_UNMANAGED:
221
            preparation_scheduler = UnmanagedPreparationScheduler(slottable, resourcepool, deploy_enact)
222
        elif preparation_type == constants.PREPARATION_TRANSFER:
223
            preparation_scheduler = ImageTransferPreparationScheduler(slottable, resourcepool, deploy_enact)    
224
    
225
        # VM mapper and scheduler
226
        mapper_name = self.config.get("mapper")
227
        mapper_cls = mapper_mappings.get(mapper_name, mapper_name)
228
        mapper_cls = import_class(mapper_cls)
229
        mapper = mapper_cls(slottable, self.policy)
230
        
231
        # When using backfilling, set the number of leases that can be
232
        # scheduled in the future.
233
        backfilling = self.config.get("backfilling")
234
        if backfilling == constants.BACKFILLING_OFF:
235
            max_in_future = 0
236
        elif backfilling == constants.BACKFILLING_AGGRESSIVE:
237
            max_in_future = 1
238
        elif backfilling == constants.BACKFILLING_CONSERVATIVE:
239
            max_in_future = -1 # Unlimited
240
        elif backfilling == constants.BACKFILLING_INTERMEDIATE:
241
            max_in_future = self.config.get("backfilling-reservations")
242
        
243
        vm_scheduler = VMScheduler(slottable, resourcepool, mapper, max_in_future)
244
        
245
        if mapper_name == "deadline":  # Kludge
246
            mapper.set_vm_scheduler(vm_scheduler)
247

    
248
        # Statistics collection 
249
        attrs = dict([(attr, self.config.get_attr(attr)) for attr in self.config.get_attrs()])    
250
        
251
        self.accounting = AccountingDataCollection(self.config.get("datafile"), attrs)
252
        # Load probes
253
        probes = self.config.get("accounting-probes")
254
        probes = probes.split()
255
        for probe in probes:
256
            probe_class = probe_class_mappings.get(probe, probe)
257
            probe_class = import_class(probe_class)
258
            probe_obj = probe_class(self.accounting)
259
            self.accounting.add_probe(probe_obj)    
260
    
261
        # Lease Scheduler
262
        self.scheduler = LeaseScheduler(vm_scheduler, preparation_scheduler, slottable, self.accounting)
263
                                
264
        # Lease request frontends
265
        if mode == "simulated":
266
            if clock == constants.CLOCK_SIMULATED:
267
                # In pure simulation, we can only use the tracefile frontend
268
                self.frontends = [TracefileFrontend(self.clock.get_start_time())]
269
            elif clock == constants.CLOCK_REAL:
270
                # In simulation with a real clock, only the RPC frontend can be used
271
                self.frontends = [RPCFrontend()]             
272
        elif mode == "opennebula":
273
            self.frontends = [OpenNebulaFrontend()]               
274

    
275
        persistence_file = self.config.get("persistence-file")
276
        if persistence_file == "none":
277
            persistence_file = None
278
        self.persistence = PersistenceManager(persistence_file)
279
        
280
        self.logger = logging.getLogger("RM")
281

    
282

    
283
    def init_logging(self):
284
        """Initializes logging
285
        
286
        """
287

    
288
        logger = logging.getLogger("")
289
        if self.daemon:
290
            handler = logging.FileHandler(self.config.get("logfile"))
291
        else:
292
            handler = logging.StreamHandler()
293
        if sys.version_info[1] <= 4:
294
            formatter = logging.Formatter('%(name)-7s %(message)s')
295
        else:
296
            formatter = logging.Formatter('[%(haizeatime)s] %(name)-7s %(message)s')
297
        handler.setFormatter(formatter)
298
        logger.addHandler(handler)
299
        level = logging.getLevelName(self.config.get("loglevel"))
300
        logger.setLevel(level)
301
        logging.setLoggerClass(HaizeaLogger)
302

    
303
        
304
    def daemonize(self):
305
        """Daemonizes the Haizea process.
306
        
307
        Based on code in:  http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/66012
308
        
309
        """
310
        # First fork
311
        try:
312
            pid = os.fork()
313
            if pid > 0: 
314
                # Exit first parent
315
                sys.exit(0) 
316
        except OSError, e:
317
            sys.stderr.write("Failed to daemonize Haizea: (%d) %s\n" % (e.errno, e.strerror))
318
            sys.exit(1)
319
    
320
        # Decouple from parent environment.
321
        os.chdir(".")
322
        os.umask(0)
323
        os.setsid()
324
    
325
        # Second fork
326
        try:
327
            pid = os.fork()
328
            if pid > 0: 
329
                # Exit second parent.
330
                sys.exit(0) 
331
        except OSError, e:
332
            sys.stderr.write("Failed to daemonize Haizea: (%d) %s\n" % (e.errno, e.strerror))
333
            sys.exit(2)
334
            
335
        # Open file descriptors and print start message
336
        si = file(DAEMON_STDIN, 'r')
337
        so = file(DAEMON_STDOUT, 'a+')
338
        se = file(DAEMON_STDERR, 'a+', 0)
339
        pid = os.getpid()
340
        sys.stderr.write("\nStarted Haizea daemon with pid %i\n\n" % pid)
341
        sys.stderr.flush()
342
        file(self.pidfile,'w+').write("%i\n" % pid)
343
        
344
        # Redirect standard file descriptors.
345
        os.close(sys.stdin.fileno())
346
        os.close(sys.stdout.fileno())
347
        os.close(sys.stderr.fileno())
348
        os.dup2(si.fileno(), sys.stdin.fileno())
349
        os.dup2(so.fileno(), sys.stdout.fileno())
350
        os.dup2(se.fileno(), sys.stderr.fileno())
351

    
352
    def start(self):
353
        """Starts the resource manager"""
354
        
355
        self.__initialize()
356

    
357
        self.logger.info("Starting resource manager")
358
        
359
        for frontend in self.frontends:
360
            frontend.load(self)
361
        
362
        if self.daemon:
363
            self.daemonize()
364
        if self.rpc_server:
365
            self.rpc_server.start()
366
            
367
        self.__recover()            
368
            
369
        # Start the clock
370
        try:
371
            self.clock.run()
372
        except UnrecoverableError, exc:
373
            self.__unrecoverable_error(exc)
374
        except Exception, exc:
375
            self.__unexpected_exception(exc)
376

    
377
    def stop(self):
378
        """Stops the resource manager by stopping the clock"""
379
        self.clock.stop()
380
        
381
    def graceful_stop(self):
382
        """Stops the resource manager gracefully and exits"""
383
        
384
        self.logger.status("Stopping resource manager gracefully...")
385
        
386
        # Stop collecting data (this finalizes counters)
387
        self.accounting.stop()
388
        
389
        self.persistence.close()
390
        
391
        # TODO: When gracefully stopping mid-scheduling, we need to figure out what to
392
        #       do with leases that are still running.
393

    
394
        self.print_status()
395
        
396
        # In debug mode, dump the lease descriptors.
397
        for lease in self.scheduler.completed_leases.entries.values():
398
            lease.print_contents()
399
            
400
        # Write all collected data to disk
401
        leases = self.scheduler.completed_leases.entries
402
        self.accounting.save_to_disk(leases)
403
        
404
        # Stop RPC server
405
        if self.rpc_server != None:
406
            self.rpc_server.stop()
407
                    
408
    def process_requests(self, nexttime):
409
        """Process any new requests in the request frontend
410
        
411
        Checks the request frontend to see if there are any new requests that
412
        have to be processed. AR leases are sent directly to the schedule.
413
        Best-effort leases are queued.
414
        
415
        Arguments:
416
        nexttime -- The next time at which the scheduler can allocate resources.
417
                    This is meant to be provided by the clock simply as a sanity
418
                    measure when running in real time (to avoid scheduling something
419
                    "now" to actually have "now" be in the past once the scheduling
420
                    function returns. i.e., nexttime has nothing to do with whether 
421
                    there are resources available at that time or not.
422
        
423
        """        
424
        
425
        # Get requests from frontend
426
        requests = []
427
        for frontend in self.frontends:
428
            requests += frontend.get_accumulated_requests()
429
        requests.sort(key=operator.attrgetter("submit_time"))
430
        
431
        # Request leases and run the scheduling function.
432
        try:
433
            self.logger.vdebug("Requesting leases")
434
            for req in requests:
435
                self.scheduler.request_lease(req)
436

    
437
            self.logger.vdebug("Running scheduling function")
438
            self.scheduler.schedule(nexttime)
439
        except UnrecoverableError, exc:
440
            self.__unrecoverable_error(exc)
441
        except Exception, exc:
442
            self.__unexpected_exception(exc)
443

    
444
    def process_starting_reservations(self, time):
445
        """Process reservations starting/stopping at specified time"""
446
        
447
        # The lease scheduler takes care of this.
448
        try:
449
            self.scheduler.process_starting_reservations(time)
450
        except UnrecoverableError, exc:
451
            self.__unrecoverable_error(exc)
452
        except Exception, exc:
453
            self.__unexpected_exception(exc)
454

    
455
    def process_ending_reservations(self, time):
456
        """Process reservations starting/stopping at specified time"""
457
        
458
        # The lease scheduler takes care of this.
459
        try:
460
            self.scheduler.process_ending_reservations(time)
461
        except UnrecoverableError, exc:
462
            self.__unrecoverable_error(exc)
463
        except Exception, exc:
464
            self.__unexpected_exception(exc)             
465
             
466
    def notify_event(self, lease_id, event):
467
        """Notifies an asynchronous event to Haizea.
468
        
469
        Arguments:
470
        lease_id -- ID of lease that is affected by event
471
        event -- Event (currently, only the constants.EVENT_END_VM event is supported)
472
        """
473
        try:
474
            lease = self.scheduler.get_lease_by_id(lease_id)
475
            self.scheduler.notify_event(lease, event)
476
        except UnrecoverableError, exc:
477
            self.__unrecoverable_error(exc)
478
        except Exception, exc:
479
            self.__unexpected_exception(exc)
480
        
481
    def cancel_lease(self, lease_id):
482
        """Cancels a lease.
483
        
484
        Arguments:
485
        lease_id -- ID of lease to cancel
486
        """    
487
        try:
488
            lease = self.scheduler.get_lease_by_id(lease_id)
489
            self.scheduler.cancel_lease(lease)
490
        except UnrecoverableError, exc:
491
            self.__unrecoverable_error(exc)
492
        except Exception, exc:
493
            self.__unexpected_exception(exc)
494
            
495
    def get_next_changepoint(self):
496
        """Return next changepoint in the slot table"""
497
        return self.scheduler.slottable.get_next_changepoint(self.clock.get_time())
498
   
499
    def exists_more_leases(self):
500
        """Return True if there are any leases still "in the system" """
501
        return self.scheduler.exists_scheduled_leases() or not self.scheduler.is_queue_empty()
502

    
503
    def print_status(self):
504
        """Prints status summary."""
505
        
506
        leases = self.scheduler.leases.get_leases()
507
        completed_leases = self.scheduler.completed_leases.get_leases()
508
                    
509
        self.logger.status("--- Haizea status summary ---")
510
        states = {}
511
        for l in leases + completed_leases:
512
            state = l.get_state()
513
            ltype = l.get_type()
514
            states[state][ltype] = states.setdefault(state, {}).setdefault(ltype, 0) + 1
515
        lstates = sorted(states.keys())
516
        for state in lstates:
517
            state_str = Lease.state_str[state]
518
            num_leases = sum(states[state].values())
519
            types_str = " + ".join(["%i %s" % (num,Lease.type_str[ltype]) for ltype, num in states[state].items()])
520
            self.logger.status(("%s:" % state_str).ljust(20) + ("%i" % num_leases).rjust(6) + " (%s)" % types_str)
521
        self.logger.status("------".rjust(26))
522
        self.logger.status("TOTAL:".rjust(20) + ("%i" % (len(leases) + len(completed_leases))).rjust(6))
523
        self.logger.status("---- End summary ----")
524

    
525

    
526
    def __recover(self):
527
        """Loads persisted leases and scheduling information
528
        
529
        This method does three things:
530
        1. Recover persisted leases. Note that not all persisted leases
531
           may be recoverable. For example, if a lease was scheduled
532
           to start at a certain time, but that time passed while
533
           Haizea was not running, the lease will simply be transitioned
534
           to a failed state.
535
        2. Recover the queue.
536
        3. Recover the list of "future leases" as determined by
537
           the backfilling algorithm.
538
        """
539
        
540
        # Load leases
541
        leases = self.persistence.get_leases()
542
        for lease in leases:
543
            # Create a list of RRs
544
            rrs = lease.preparation_rrs + lease.vm_rrs
545
            for vmrr in lease.vm_rrs:
546
                rrs += vmrr.pre_rrs + vmrr.post_rrs
547

    
548
            # Bind resource tuples in RRs to slot table
549
            for rr in rrs:
550
                for restuple in rr.resources_in_pnode.values():
551
                    restuple.slottable = self.scheduler.slottable
552

    
553
            self.logger.debug("Attempting to recover lease %i" % lease.id)
554
            lease.print_contents()
555
            
556
            # Check the lease's state and determine how to proceed.
557
            load_rrs = False
558
            lease_state = lease.get_state()
559
            if lease_state in (Lease.STATE_DONE, Lease.STATE_CANCELLED, Lease.STATE_REJECTED, Lease.STATE_FAIL):
560
                self.logger.info("Recovered lease %i (already done)" % lease.id)
561
                self.scheduler.completed_leases.add(lease)
562
            elif lease_state in (Lease.STATE_NEW, Lease.STATE_PENDING):
563
                self.scheduler.leases.add(lease)
564
            elif lease_state == Lease.STATE_QUEUED:
565
                load_rrs = True
566
                self.scheduler.leases.add(lease)
567
                self.logger.info("Recovered lease %i (queued)" % lease.id)
568
            elif lease_state in (Lease.STATE_SCHEDULED, Lease.STATE_READY):
569
                # Check if schedule is still valid.
570
                vmrr = lease.get_last_vmrr()
571
                if len(vmrr.pre_rrs) > 0:
572
                    start = vmrr.pre_rrs[0].start
573
                else:
574
                    start = vmrr.start
575
                if self.clock.get_time() < start:
576
                    load_rrs = True
577
                    self.scheduler.leases.add(lease)
578
                    self.logger.info("Recovered lease %i" % lease.id)
579
                else:
580
                    lease.set_state(Lease.STATE_FAIL)
581
                    self.scheduler.completed_leases.add(lease)
582
                    self.logger.info("Could not recover lease %i (scheduled starting time has passed)" % lease.id)
583
            elif lease_state == Lease.STATE_ACTIVE:
584
                vmrr = lease.get_last_vmrr()
585
                if self.clock.get_time() < self.clock.get_time():
586
                    # TODO: Check if VMs are actually running
587
                    load_rrs = True
588
                    self.scheduler.leases.add(lease)
589
                    self.logger.info("Recovered lease %i" % lease.id)
590
                else:
591
                    # TODO: May have to stop extant virtual machines
592
                    lease.set_state(Lease.STATE_FAIL)
593
                    self.scheduler.completed_leases.add(lease)                    
594
                    self.logger.info("Could not recover lease %i (scheduled ending time has passed)" % lease.id)
595
            else:
596
                # No support for recovering lease in the
597
                # remaining states
598
                lease.set_state(Lease.STATE_FAIL)
599
                self.scheduler.completed_leases.add(lease)                    
600
                self.logger.info("Could not recover lease %i (unsupported state %i for recovery)" % (lease.id, lease_state))
601
                
602
            # Load the lease's RRs into the slot table
603
            if load_rrs:
604
                for rr in rrs:
605
                    if rr.state in (ResourceReservation.STATE_ACTIVE, ResourceReservation.STATE_SCHEDULED):
606
                        self.scheduler.slottable.add_reservation(rr)
607
                
608
        # Rebuild the queue        
609
        queue = self.persistence.get_queue()
610
        for lease_id in queue:
611
            if self.scheduler.leases.has_lease(lease_id):
612
                lease = self.scheduler.leases.get_lease(lease_id)
613
                self.scheduler.queue.enqueue(lease)
614

    
615
        # Rebuild the "future leases"
616
        future = self.persistence.get_future_leases()
617
        for lease_id in future:
618
            if self.scheduler.leases.has_lease(lease_id):
619
                lease = self.scheduler.leases.get_lease(lease_id)
620
                self.scheduler.vm_scheduler.future_leases.add(lease)
621

    
622

    
623
    def __unrecoverable_error(self, exc):
624
        """Handles an unrecoverable error.
625
        
626
        This method prints information on the unrecoverable error and makes Haizea panic.
627
        """
628
        self.logger.error("An unrecoverable error has happened.")
629
        self.logger.error("Original exception:")
630
        self.__print_exception(exc.exc, exc.get_traceback())
631
        self.logger.error("Unrecoverable error traceback:")
632
        self.__print_exception(exc, sys.exc_info()[2])
633
        self.__panic()
634

    
635
    def __unexpected_exception(self, exc):
636
        """Handles an unrecoverable error.
637
        
638
        This method prints information on the unrecoverable error and makes Haizea panic.
639
        """
640
        self.logger.error("An unexpected exception has happened.")
641
        self.__print_exception(exc, sys.exc_info()[2])
642
        self.__panic()
643
            
644
    def __print_exception(self, exc, exc_traceback):
645
        """Prints an exception's traceback to the log."""
646
        tb = traceback.format_tb(exc_traceback)
647
        for line in tb:
648
            self.logger.error(line)
649
        self.logger.error("Message: %s" % exc)
650

    
651
    
652
    def __panic(self):
653
        """Makes Haizea crash and burn in a panicked frenzy"""
654
        
655
        self.logger.status("Panicking...")
656

    
657
        # Stop RPC server
658
        if self.rpc_server != None:
659
            self.rpc_server.stop()
660

    
661
        # Dump state
662
        self.print_status()
663
        self.logger.error("Next change point (in slot table): %s" % self.get_next_changepoint())
664

    
665
        # Print lease descriptors
666
        leases = self.scheduler.leases.get_leases()
667
        if len(leases)>0:
668
            self.logger.vdebug("vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv")
669
            for lease in leases:
670
                lease.print_contents()
671
            self.logger.vdebug("^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^")        
672

    
673
        # Exit
674
        treatment = self.config.get("lease-failure-handling")
675
        if treatment == constants.ONFAILURE_EXIT_RAISE:
676
            raise
677
        else:
678
            exit(1)
679

    
680
            
681
class Clock(object):
682
    """Base class for the resource manager's clock.
683
    
684
    The clock is in charge of periodically waking the resource manager so it
685
    will process new requests and handle existing reservations. This is a
686
    base class defining abstract methods.
687
    
688
    """
689
    def __init__(self, manager):
690
        self.manager = manager
691
        self.done = False
692
    
693
    def get_time(self): 
694
        """Return the current time"""
695
        return abstract()
696

    
697
    def get_start_time(self): 
698
        """Return the time at which the clock started ticking"""
699
        return abstract()
700
    
701
    def get_next_schedulable_time(self): 
702
        """Return the next time at which resources could be scheduled.
703
        
704
        The "next schedulable time" server sanity measure when running 
705
        in real time (to avoid scheduling something "now" to actually 
706
        have "now" be in the past once the scheduling function returns. 
707
        i.e., the "next schedulable time" has nothing to do with whether 
708
        there are resources available at that time or not.
709
        """
710
        return abstract()
711
    
712
    def run(self):
713
        """Start and run the clock. This function is, in effect,
714
        the main loop of the resource manager."""
715
        return abstract()     
716

    
717
    def stop(self):
718
        """Stop the clock.
719
        
720
        Stopping the clock makes Haizea exit.
721
        """
722
        self.done = True    
723
    
724
        
725
class SimulatedClock(Clock):
726
    """Simulates the passage of time... really fast.
727
    
728
    The simulated clock steps through time to produce an ideal schedule.
729
    See the run() function for a description of how time is incremented
730
    exactly in the simulated clock.
731
    
732
    """
733
    
734
    def __init__(self, manager, starttime):
735
        """Initialize the simulated clock, starting at the provided starttime"""
736
        Clock.__init__(self, manager)
737
        self.starttime = starttime
738
        self.time = starttime
739
        self.logger = logging.getLogger("CLOCK")
740
        self.statusinterval = self.manager.config.get("status-message-interval")
741
       
742
    def get_time(self):
743
        """See docstring in base Clock class."""
744
        return self.time
745
    
746
    def get_start_time(self):
747
        """See docstring in base Clock class."""
748
        return self.starttime
749

    
750
    def get_next_schedulable_time(self):
751
        """See docstring in base Clock class."""
752
        return self.time    
753
    
754
    def run(self):
755
        """Runs the simulated clock through time.
756
        
757
        The clock starts at the provided start time. At each point in time,
758
        it wakes up the resource manager and then skips to the next time
759
        where "something" is happening (see __get_next_time for a more
760
        rigorous description of this).
761
        
762
        The clock stops when there is nothing left to do (no pending or 
763
        queue requests, and no future reservations)
764
        
765
        The simulated clock can only work in conjunction with the
766
        tracefile request frontend.
767
        """
768
        self.logger.status("Starting simulated clock")
769
        self.manager.accounting.start(self.get_start_time())
770
        prevstatustime = self.time
771
        
772
        # Main loop
773
        while not self.done:
774
            # Check to see if there are any leases which are ending prematurely.
775
            # Note that this is unique to simulation.
776
            prematureends = self.manager.scheduler.slottable.get_prematurely_ending_res(self.time)
777
            
778
            # Notify the resource manager about the premature ends
779
            for rr in prematureends:
780
                self.manager.notify_event(rr.lease.id, constants.EVENT_END_VM)
781
                
782
            # Process reservations starting/stopping at the current time and
783
            # check if there are any new requests.
784
            self.manager.process_ending_reservations(self.time)
785
            self.manager.process_starting_reservations(self.time)
786
            self.manager.process_requests(self.time)
787
            
788
            # Since processing requests may have resulted in new reservations
789
            # starting now, we process reservations again.
790
            self.manager.process_starting_reservations(self.time)
791
            # And one final call to deal with nil-duration reservations
792
            self.manager.process_ending_reservations(self.time)
793
            
794
            self.manager.accounting.at_timestep(self.manager.scheduler)
795
            
796
            if self.manager.config.get("sanity-check"):
797
                passed, node, time, capacity = self.manager.scheduler.slottable.sanity_check(only_at=self.time)
798
                assert passed == True
799
                
800
                util = self.manager.accounting.get_last_counter_value("CPU utilization")
801
                assert util <= 1.0
802
                    
803
                for l in self.manager.scheduler.leases.get_leases():
804
                    l.sanity_check()
805
                    
806
            
807
            # Print a status message
808
            if self.statusinterval != None and (self.time - prevstatustime).minutes >= self.statusinterval:
809
                self.manager.print_status()
810
                prevstatustime = self.time
811
                
812
            # Skip to next point in time.
813
            self.time, self.done = self.__get_next_time()
814
                    
815
        self.logger.status("Simulated clock has stopped")
816

    
817
        # Stop the resource manager
818
        self.manager.graceful_stop()
819
        
820
    
821
    def __get_next_time(self):
822
        """Determines what is the next point in time to skip to.
823
        
824
        At a given point in time, the next time is the earliest of the following:
825
        * The arrival of the next lease request
826
        * The start or end of a reservation (a "changepoint" in the slot table)
827
        * A premature end of a lease
828
        """
829
        
830
        # Determine candidate next times
831
        tracefrontend = self.__get_trace_frontend()
832
        nextchangepoint = self.manager.get_next_changepoint()
833
        nextprematureend = self.manager.scheduler.slottable.get_next_premature_end(self.time)
834
        nextreqtime = tracefrontend.get_next_request_time()
835
        self.logger.debug("Next change point (in slot table): %s" % nextchangepoint)
836
        self.logger.debug("Next request time: %s" % nextreqtime)
837
        self.logger.debug("Next premature end: %s" % nextprematureend)
838
        
839
        # The previous time is now
840
        prevtime = self.time
841
        
842
        # We initialize the next time to now too, to detect if
843
        # we've been unable to determine what the next time is.
844
        newtime = self.time
845
        
846
        # Find the earliest of the three, accounting for None values
847
        if nextchangepoint != None and nextreqtime == None:
848
            newtime = nextchangepoint
849
        elif nextchangepoint == None and nextreqtime != None:
850
            newtime = nextreqtime
851
        elif nextchangepoint != None and nextreqtime != None:
852
            newtime = min(nextchangepoint, nextreqtime)
853
            
854
        if nextprematureend != None:
855
            newtime = min(nextprematureend, newtime)
856
                        
857
        # If there's no more leases in the system, and no more pending requests,
858
        # then we're done.
859
        if not self.manager.exists_more_leases() and not tracefrontend.exists_more_requests():
860
            self.done = True
861
        
862
        # We can also be done if we've specified that we want to stop when
863
        # the best-effort requests are all done or when they've all been submitted.
864
        stopwhen = self.manager.config.get("stop-when")
865

    
866
        besteffort = self.manager.scheduler.leases.get_leases(type = Lease.BEST_EFFORT)
867
        pendingbesteffort = [r for r in tracefrontend.requests if r.get_type() == Lease.BEST_EFFORT]
868
        if stopwhen == constants.STOPWHEN_BEDONE:
869
            if self.manager.scheduler.is_queue_empty() and len(besteffort) + len(pendingbesteffort) == 0:
870
                self.done = True
871
        elif stopwhen == constants.STOPWHEN_BESUBMITTED:
872
            if len(pendingbesteffort) == 0:
873
                self.done = True
874
        elif stopwhen == constants.STOPWHEN_ALLDONE:
875
            pass
876
        elif stopwhen == constants.STOPWHEN_EXACT:
877
            t = Parser.DateTimeDeltaFromString(self.manager.config.get("stop-when-time"))
878
            if self.time >= self.starttime + t:
879
                self.done = True
880
                
881
        # If we didn't arrive at a new time, and we're not done, we've fallen into
882
        # an infinite loop. This is A Bad Thing(tm).
883
        if newtime == prevtime and self.done != True:
884
            raise Exception, "Simulated clock has fallen into an infinite loop."
885
        
886
        return newtime, self.done
887

    
888
    def __get_trace_frontend(self):
889
        """Gets the tracefile frontend from the resource manager"""
890
        frontends = self.manager.frontends
891
        tracef = [f for f in frontends if isinstance(f, TracefileFrontend)]
892
        if len(tracef) != 1:
893
            raise Exception, "The simulated clock can only work with a tracefile request frontend."
894
        else:
895
            return tracef[0] 
896
        
897
        
898
class RealClock(Clock):
899
    """A realtime clock.
900
    
901
    The real clock wakes up periodically to, in turn, tell the resource manager
902
    to wake up. The real clock can also be run in a "fastforward" mode for
903
    debugging purposes (however, unlike the simulated clock, the clock will
904
    always skip a fixed amount of time into the future).
905
    """
906
    def __init__(self, manager, quantum, non_sched, fastforward = False):
907
        """Initializes the real clock.
908
        
909
        Arguments:
910
        manager -- the resource manager
911
        quantum -- interval between clock wakeups
912
        fastforward -- if True, the clock won't actually sleep
913
                       for the duration of the quantum."""
914
        Clock.__init__(self, manager)
915
        self.fastforward = fastforward
916
        if not self.fastforward:
917
            self.lastwakeup = None
918
        else:
919
            self.lastwakeup = round_datetime(now())
920
        self.logger = logging.getLogger("CLOCK")
921
        self.starttime = self.get_time()
922
        self.nextschedulable = None
923
        self.nextperiodicwakeup = None
924
        self.quantum = TimeDelta(seconds=quantum)
925
        self.non_sched = TimeDelta(seconds=non_sched)
926
               
927
    def get_time(self):
928
        """See docstring in base Clock class."""
929
        if not self.fastforward:
930
            return now()
931
        else:
932
            return self.lastwakeup
933
    
934
    def get_start_time(self):
935
        """See docstring in base Clock class."""
936
        return self.starttime
937

    
938
    def get_next_schedulable_time(self):
939
        """See docstring in base Clock class."""
940
        return self.nextschedulable    
941
    
942
    def run(self):
943
        """Runs the real clock through time.
944
        
945
        The clock starts when run() is called. In each iteration of the main loop
946
        it will do the following:
947
        - Wake up the resource manager
948
        - Determine if there will be anything to do before the next
949
          time the clock will wake up (after the quantum has passed). Note
950
          that this information is readily available on the slot table.
951
          If so, set next-wakeup-time to (now + time until slot table
952
          event). Otherwise, set it to (now + quantum)
953
        - Sleep until next-wake-up-time
954
        
955
        The clock keeps on tickin' until a SIGINT signal (Ctrl-C if running in the
956
        foreground) or a SIGTERM signal is received.
957
        """
958
        self.logger.status("Starting clock")
959
        self.manager.accounting.start(self.get_start_time())
960
        
961
        try:
962
            signal.signal(signal.SIGINT, self.signalhandler_gracefulstop)
963
            signal.signal(signal.SIGTERM, self.signalhandler_gracefulstop)
964
        except ValueError, exc:
965
            # This means Haizea is not the main thread, which will happen
966
            # when running it as part of a py.test. We simply ignore this
967
            # to allow the test to continue.
968
            pass
969
        
970
        # Main loop
971
        while not self.done:
972
            self.logger.status("Waking up to manage resources")
973
                        
974
            # Save the waking time. We want to use a consistent time in the 
975
            # resource manager operations (if we use now(), we'll get a different
976
            # time every time)
977
            if not self.fastforward:
978
                self.lastwakeup = round_datetime(self.get_time())
979
            self.logger.status("Wake-up time recorded as %s" % self.lastwakeup)
980
                
981
            # Next schedulable time
982
            self.nextschedulable = round_datetime(self.lastwakeup + self.non_sched)
983
            
984
            # Check if there are any changes in the resource pool
985
            new_nodes = self.manager.scheduler.vm_scheduler.resourcepool.refresh_nodes()      
986
            for n in new_nodes:
987
                rt = self.manager.scheduler.slottable.create_resource_tuple_from_capacity(n.capacity)
988
                self.manager.scheduler.slottable.add_node(n.id, rt)                  
989
            
990
            # Wake up the resource manager
991
            self.manager.process_ending_reservations(self.lastwakeup)
992
            self.manager.process_starting_reservations(self.lastwakeup)
993
            # TODO: Compute nextschedulable here, before processing requests
994
            self.manager.process_requests(self.nextschedulable)
995

    
996
            self.manager.accounting.at_timestep(self.manager.scheduler)
997
            
998
            # Next wakeup time
999
            time_now = now()
1000
            if self.lastwakeup + self.quantum <= time_now:
1001
                quantums = (time_now - self.lastwakeup) / self.quantum
1002
                quantums = int(ceil(quantums)) * self.quantum
1003
                self.nextperiodicwakeup = round_datetime(self.lastwakeup + quantums)
1004
            else:
1005
                self.nextperiodicwakeup = round_datetime(self.lastwakeup + self.quantum)
1006
            
1007
            # Determine if there's anything to do before the next wakeup time
1008
            nextchangepoint = self.manager.get_next_changepoint()
1009
            if nextchangepoint != None and nextchangepoint <= self.nextperiodicwakeup:
1010
                # We need to wake up earlier to handle a slot table event
1011
                nextwakeup = nextchangepoint
1012
                self.logger.status("Going back to sleep. Waking up at %s to handle slot table event." % nextwakeup)
1013
            else:
1014
                # Nothing to do before waking up
1015
                nextwakeup = self.nextperiodicwakeup
1016
                self.logger.status("Going back to sleep. Waking up at %s to see if something interesting has happened by then." % nextwakeup)
1017
            
1018
            # The only exit condition from the real clock is if the stop_when_no_more_leases
1019
            # is set to True, and there's no more work left to do.
1020
            # TODO: This first if is a kludge. Other options should only interact with
1021
            # options through the configfile's get method. The "stop-when-no-more-leases"
1022
            # option is currently OpenNebula-specific (while the real clock isn't; it can
1023
            # be used by both the simulator and the OpenNebula mode). This has to be
1024
            # fixed.            
1025
            if self.manager.config._options.has_key("stop-when-no-more-leases"):
1026
                stop_when_no_more_leases = self.manager.config.get("stop-when-no-more-leases")
1027
                if stop_when_no_more_leases and not self.manager.exists_more_leases():
1028
                    self.done = True
1029
            
1030
            # Sleep
1031
            if not self.done:
1032
                if not self.fastforward:
1033
                    sleep((nextwakeup - now()).seconds)
1034
                else:
1035
                    self.lastwakeup = nextwakeup
1036

    
1037
        self.logger.status("Real clock has stopped")
1038

    
1039
        # Stop the resource manager
1040
        self.manager.graceful_stop()
1041
    
1042
    def signalhandler_gracefulstop(self, signum, frame):
1043
        """Handler for SIGTERM and SIGINT. Allows Haizea to stop gracefully."""
1044
        
1045
        sigstr = ""
1046
        if signum == signal.SIGTERM:
1047
            sigstr = " (SIGTERM)"
1048
        elif signum == signal.SIGINT:
1049
            sigstr = " (SIGINT)"
1050
        self.logger.status("Received signal %i%s" %(signum, sigstr))
1051
        self.done = True
1052

    
1053

    
1054
class PersistenceManager(object):    
1055
    """Persistence manager.
1056
    
1057
    The persistence manager is in charge of persisting leases, and some
1058
    scheduling data, to disk. This allows Haizea to recover from crashes.
1059
    """    
1060
    
1061
    def __init__(self, file):
1062
        """Constructor
1063
        
1064
        Initializes the persistence manager. If the specified file
1065
        does not exist, it is created. If the file is created, it
1066
        is opened but the information is not recovered (this is
1067
        the responsibility of the Manager class)
1068
        
1069
        Arguments:
1070
        file -- Persistence file. If None is specified, then
1071
                persistence is disabled and Haizea will run entirely
1072
                in-memory.
1073
        """
1074
        if file == None:
1075
            self.disabled = True
1076
            self.shelf = {}
1077
        else:
1078
            self.disabled = False
1079
            file = os.path.expanduser(file)
1080
            d = os.path.dirname(file)
1081
            if not os.path.exists(d):
1082
                os.makedirs(d)
1083
            self.shelf = shelve.open(file, flag='c', protocol = -1)
1084
        
1085
    def persist_lease(self, lease):
1086
        """Persists a single lease to disk
1087
                
1088
        Arguments:
1089
        lease -- Lease to persist
1090
        """        
1091
        if not self.disabled:
1092
            self.shelf["lease-%i" % lease.id] = lease
1093
            self.shelf.sync()
1094

    
1095
    def persist_queue(self, queue):
1096
        """Persists the queue to disk
1097
                
1098
        Arguments:
1099
        queue -- The queue
1100
        """        
1101
        if not self.disabled:
1102
            self.shelf["queue"] = [l.id for l in queue]
1103
            self.shelf.sync()
1104
        
1105
    def persist_future_leases(self, leases):
1106
        """Persists the set of future leases
1107
                
1108
        Arguments:
1109
        leases -- "Future leases" (as determined by backfilling algorithm)
1110
        """              
1111
        if not self.disabled:
1112
            self.shelf["future"] = [l.id for l in leases]        
1113
            self.shelf.sync()
1114
        
1115
    def get_leases(self):
1116
        """Returns the leases persisted to disk.
1117
                
1118
        """              
1119
        return [v for k,v in self.shelf.items() if k.startswith("lease-")]
1120
    
1121
    def get_queue(self):
1122
        """Returns the queue persisted to disk.
1123
                
1124
        """              
1125
        if self.shelf.has_key("queue"):
1126
            return self.shelf["queue"]
1127
        else:
1128
            return []
1129
        
1130
    def get_future_leases(self):
1131
        """Returns the future leases persisted to disk.
1132
                
1133
        """              
1134
        if self.shelf.has_key("future"):
1135
            return self.shelf["future"]
1136
        else:
1137
            return []        
1138
    
1139
    def close(self):
1140
        """Closes the persistence manager.
1141
        
1142
        Closing the persistence manager saves any remaining
1143
        data to disk.
1144
        """              
1145
        if not self.disabled:
1146
            self.shelf.close()
1147
        
1148