Project

General

Profile

root / branches / 1.1 / src / haizea / core / manager.py @ 841

1 632 borja
# -------------------------------------------------------------------------- #
2 641 borja
# Copyright 2006-2009, University of Chicago                                 #
3
# Copyright 2008-2009, Distributed Systems Architecture Group, Universidad   #
4 632 borja
# Complutense de Madrid (dsa-research.org)                                   #
5
#                                                                            #
6
# Licensed under the Apache License, Version 2.0 (the "License"); you may    #
7
# not use this file except in compliance with the License. You may obtain    #
8
# a copy of the License at                                                   #
9
#                                                                            #
10
# http://www.apache.org/licenses/LICENSE-2.0                                 #
11
#                                                                            #
12
# Unless required by applicable law or agreed to in writing, software        #
13
# distributed under the License is distributed on an "AS IS" BASIS,          #
14
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.   #
15
# See the License for the specific language governing permissions and        #
16
# limitations under the License.                                             #
17
# -------------------------------------------------------------------------- #
18
19
"""The manager (resource manager) module is the root of Haizea. If you want to
20
see where the ball starts rolling, look at the following two functions:
21

22
* manager.Manager.__init__()
23
* manager.Manager.start()
24

25
This module provides the following classes:
26

27
* Manager: Haizea itself. Pretty much everything else
28
  is contained in this class.
29
* Clock: A base class for Haizea's clock.
30
* SimulatedClock: A clock for simulations.
31
* RealClock: A clock that advances in realtime.
32
"""
33
34
import haizea.common.constants as constants
35
from haizea.core.scheduler.preparation_schedulers.unmanaged import UnmanagedPreparationScheduler
36
from haizea.core.scheduler.preparation_schedulers.imagetransfer import ImageTransferPreparationScheduler
37
from haizea.core.enact.opennebula import OpenNebulaResourcePoolInfo, OpenNebulaVMEnactment, OpenNebulaDummyDeploymentEnactment
38
from haizea.core.enact.simulated import SimulatedResourcePoolInfo, SimulatedVMEnactment, SimulatedDeploymentEnactment
39
from haizea.core.frontends.tracefile import TracefileFrontend
40
from haizea.core.frontends.opennebula import OpenNebulaFrontend
41
from haizea.core.frontends.rpc import RPCFrontend
42 650 borja
from haizea.core.accounting import AccountingDataCollection
43 632 borja
from haizea.core.scheduler import UnrecoverableError
44
from haizea.core.scheduler.lease_scheduler import LeaseScheduler
45
from haizea.core.scheduler.vm_scheduler import VMScheduler
46
from haizea.core.scheduler.mapper import class_mappings as mapper_mappings
47 647 borja
from haizea.core.scheduler.slottable import SlotTable, ResourceReservation
48 632 borja
from haizea.core.scheduler.policy import PolicyManager
49
from haizea.core.scheduler.resourcepool import ResourcePool, ResourcePoolWithReusableImages
50
from haizea.core.leases import Lease, Site
51
from haizea.core.log import HaizeaLogger
52
from haizea.core.rpcserver import RPCServer
53 664 borja
from haizea.common.utils import abstract, round_datetime, Singleton, import_class, OpenNebulaXMLRPCClientSingleton
54
from haizea.common.opennebula_xmlrpc import OpenNebulaXMLRPCClient
55 693 borja
from haizea.pluggable.policies import admission_class_mappings, preemption_class_mappings, host_class_mappings, pricing_mappings
56 650 borja
from haizea.pluggable.accounting import probe_class_mappings
57 632 borja
58
import operator
59
import logging
60
import signal
61
import sys, os
62
import traceback
63 647 borja
import shelve
64 664 borja
import socket
65 632 borja
from time import sleep
66
from math import ceil
67 811 borja
from mx.DateTime import now, TimeDelta, Parser
68 632 borja
69
DAEMON_STDOUT = DAEMON_STDIN = "/dev/null"
70
DAEMON_STDERR = "/var/tmp/haizea.err"
71
DEFAULT_LOGFILE = "/var/tmp/haizea.log"
72
73 664 borja
class Manager(object):
74 632 borja
    """The root of Haizea
75

76
    This class is the root of Haizea. Pretty much everything else (scheduler,
77
    enactment modules, etc.) is contained in this class. The Manager
78
    class is meant to be a singleton.
79

80
    """
81
82 664 borja
    __metaclass__ = Singleton
83
84 632 borja
    def __init__(self, config, daemon=False, pidfile=None):
85
        """Initializes the manager.
86

87
        Argument:
88
        config -- a populated instance of haizea.common.config.RMConfig
89
        daemon -- True if Haizea must run as a daemon, False if it must
90
                  run in the foreground
91
        pidfile -- When running as a daemon, file to save pid to
92
        """
93
        self.config = config
94 693 borja
        self.daemon = daemon
95
        self.pidfile = pidfile
96
97 723 borja
98 693 borja
    # Has to be in a separate function since some pluggable modules need to
99
    # access the Manager singleton
100
    def __initialize(self):
101 632 borja
        # Create the RM components
102
103 693 borja
        mode = self.config.get("mode")
104 632 borja
105
106
        if mode == "simulated":
107
            # Simulated-time simulations always run in the foreground
108
            clock = self.config.get("clock")
109
            if clock == constants.CLOCK_SIMULATED:
110
                self.daemon = False
111
        elif mode == "opennebula":
112
            clock = constants.CLOCK_REAL
113
114
        self.init_logging()
115
116
        if clock == constants.CLOCK_SIMULATED:
117
            starttime = self.config.get("starttime")
118
            self.clock = SimulatedClock(self, starttime)
119
            self.rpc_server = None
120
        elif clock == constants.CLOCK_REAL:
121
            wakeup_interval = self.config.get("wakeup-interval")
122
            non_sched = self.config.get("non-schedulable-interval")
123
            if mode == "opennebula":
124
                fastforward = self.config.get("dry-run")
125
            else:
126
                fastforward = False
127
            self.clock = RealClock(self, wakeup_interval, non_sched, fastforward)
128
            if fastforward:
129
                # No need for an RPC server when doing a dry run
130
                self.rpc_server = None
131
            else:
132
                self.rpc_server = RPCServer(self)
133
134 664 borja
        # Create the RPC singleton client for OpenNebula mode
135
        if mode == "opennebula":
136
            host = self.config.get("one.host")
137
            port = self.config.get("one.port")
138 688 borja
            try:
139
                rv = OpenNebulaXMLRPCClient.get_userpass_from_env()
140
            except:
141
                print "ONE_AUTH environment variable is not set or authorization file is malformed"
142 664 borja
                exit(1)
143 688 borja
144
            user, passw = rv[0], rv[1]
145
            try:
146
                OpenNebulaXMLRPCClientSingleton(host, port, user, passw)
147
            except socket.error, e:
148
                print "Unable to connect to OpenNebula"
149
                print "Reason: %s" % e
150
                exit(1)
151 664 borja
152 632 borja
        # Enactment modules
153
        if mode == "simulated":
154
            resources = self.config.get("simul.resources")
155
            if resources == "in-tracefile":
156
                tracefile = self.config.get("tracefile")
157
                site = Site.from_lwf_file(tracefile)
158
            elif resources.startswith("file:"):
159
                sitefile = resources.split(":")
160 794 borja
                site = Site.from_xml_file(sitefile[1])
161 632 borja
            else:
162
                site = Site.from_resources_string(resources)
163
164 693 borja
            deploy_bandwidth = self.config.get("imagetransfer-bandwidth")
165 632 borja
            info_enact = SimulatedResourcePoolInfo(site)
166
            vm_enact = SimulatedVMEnactment()
167 664 borja
            deploy_enact = SimulatedDeploymentEnactment(deploy_bandwidth)
168 632 borja
        elif mode == "opennebula":
169
            # Enactment modules
170
            info_enact = OpenNebulaResourcePoolInfo()
171
            vm_enact = OpenNebulaVMEnactment()
172
            # No deployment in OpenNebula. Using dummy one for now.
173
            deploy_enact = OpenNebulaDummyDeploymentEnactment()
174
175
        if mode == "simulated":
176
            preparation_type = self.config.get("lease-preparation")
177
        elif mode == "opennebula":
178
            # No deployment in OpenNebula.
179
            preparation_type = constants.PREPARATION_UNMANAGED
180
181
        # Resource pool
182
        if preparation_type == constants.PREPARATION_TRANSFER:
183
            if self.config.get("diskimage-reuse") == constants.REUSE_IMAGECACHES:
184
                resourcepool = ResourcePoolWithReusableImages(info_enact, vm_enact, deploy_enact)
185
            else:
186
                resourcepool = ResourcePool(info_enact, vm_enact, deploy_enact)
187
        else:
188
            resourcepool = ResourcePool(info_enact, vm_enact, deploy_enact)
189
190
        # Slot table
191
        slottable = SlotTable(info_enact.get_resource_types())
192
        for n in resourcepool.get_nodes() + resourcepool.get_aux_nodes():
193
            rt = slottable.create_resource_tuple_from_capacity(n.capacity)
194
            slottable.add_node(n.id, rt)
195
196
        # Policy manager
197
        admission = self.config.get("policy.admission")
198
        admission = admission_class_mappings.get(admission, admission)
199
        admission = import_class(admission)
200
        admission = admission(slottable)
201
202
        preemption = self.config.get("policy.preemption")
203
        preemption = preemption_class_mappings.get(preemption, preemption)
204
        preemption = import_class(preemption)
205
        preemption = preemption(slottable)
206
207
        host_selection = self.config.get("policy.host-selection")
208
        host_selection = host_class_mappings.get(host_selection, host_selection)
209
        host_selection = import_class(host_selection)
210
        host_selection = host_selection(slottable)
211
212 693 borja
        pricing = self.config.get("policy.pricing")
213
        pricing = pricing_mappings.get(pricing, pricing)
214
        pricing = import_class(pricing)
215
        pricing = pricing(slottable)
216 632 borja
217 693 borja
        self.policy = PolicyManager(admission, preemption, host_selection, pricing)
218
219 632 borja
        # Preparation scheduler
220
        if preparation_type == constants.PREPARATION_UNMANAGED:
221
            preparation_scheduler = UnmanagedPreparationScheduler(slottable, resourcepool, deploy_enact)
222
        elif preparation_type == constants.PREPARATION_TRANSFER:
223
            preparation_scheduler = ImageTransferPreparationScheduler(slottable, resourcepool, deploy_enact)
224
225
        # VM mapper and scheduler
226 784 borja
        mapper_name = self.config.get("mapper")
227
        mapper_cls = mapper_mappings.get(mapper_name, mapper_name)
228
        mapper_cls = import_class(mapper_cls)
229
        mapper = mapper_cls(slottable, self.policy)
230 664 borja
231
        # When using backfilling, set the number of leases that can be
232
        # scheduled in the future.
233
        backfilling = self.config.get("backfilling")
234
        if backfilling == constants.BACKFILLING_OFF:
235
            max_in_future = 0
236
        elif backfilling == constants.BACKFILLING_AGGRESSIVE:
237
            max_in_future = 1
238
        elif backfilling == constants.BACKFILLING_CONSERVATIVE:
239
            max_in_future = -1 # Unlimited
240
        elif backfilling == constants.BACKFILLING_INTERMEDIATE:
241
            max_in_future = self.config.get("backfilling-reservations")
242
243
        vm_scheduler = VMScheduler(slottable, resourcepool, mapper, max_in_future)
244 784 borja
245
        if mapper_name == "deadline":  # Kludge
246
            mapper.set_vm_scheduler(vm_scheduler)
247
248 650 borja
        # Statistics collection
249 664 borja
        attrs = dict([(attr, self.config.get_attr(attr)) for attr in self.config.get_attrs()])
250
251
        self.accounting = AccountingDataCollection(self.config.get("datafile"), attrs)
252 650 borja
        # Load probes
253
        probes = self.config.get("accounting-probes")
254
        probes = probes.split()
255
        for probe in probes:
256
            probe_class = probe_class_mappings.get(probe, probe)
257
            probe_class = import_class(probe_class)
258
            probe_obj = probe_class(self.accounting)
259
            self.accounting.add_probe(probe_obj)
260
261 632 borja
        # Lease Scheduler
262 650 borja
        self.scheduler = LeaseScheduler(vm_scheduler, preparation_scheduler, slottable, self.accounting)
263 664 borja
264 632 borja
        # Lease request frontends
265
        if mode == "simulated":
266
            if clock == constants.CLOCK_SIMULATED:
267
                # In pure simulation, we can only use the tracefile frontend
268 664 borja
                self.frontends = [TracefileFrontend(self.clock.get_start_time())]
269 632 borja
            elif clock == constants.CLOCK_REAL:
270
                # In simulation with a real clock, only the RPC frontend can be used
271 664 borja
                self.frontends = [RPCFrontend()]
272 632 borja
        elif mode == "opennebula":
273 664 borja
            self.frontends = [OpenNebulaFrontend()]
274 632 borja
275 648 borja
        persistence_file = self.config.get("persistence-file")
276
        if persistence_file == "none":
277
            persistence_file = None
278
        self.persistence = PersistenceManager(persistence_file)
279 632 borja
280
        self.logger = logging.getLogger("RM")
281
282
283
    def init_logging(self):
284
        """Initializes logging
285

286
        """
287
288
        logger = logging.getLogger("")
289
        if self.daemon:
290
            handler = logging.FileHandler(self.config.get("logfile"))
291
        else:
292
            handler = logging.StreamHandler()
293 675 borja
        if sys.version_info[1] <= 4:
294
            formatter = logging.Formatter('%(name)-7s %(message)s')
295
        else:
296
            formatter = logging.Formatter('[%(haizeatime)s] %(name)-7s %(message)s')
297 632 borja
        handler.setFormatter(formatter)
298
        logger.addHandler(handler)
299
        level = logging.getLevelName(self.config.get("loglevel"))
300
        logger.setLevel(level)
301
        logging.setLoggerClass(HaizeaLogger)
302
303
304
    def daemonize(self):
305
        """Daemonizes the Haizea process.
306

307
        Based on code in:  http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/66012
308

309
        """
310
        # First fork
311
        try:
312
            pid = os.fork()
313
            if pid > 0:
314
                # Exit first parent
315
                sys.exit(0)
316
        except OSError, e:
317
            sys.stderr.write("Failed to daemonize Haizea: (%d) %s\n" % (e.errno, e.strerror))
318
            sys.exit(1)
319
320
        # Decouple from parent environment.
321
        os.chdir(".")
322
        os.umask(0)
323
        os.setsid()
324
325
        # Second fork
326
        try:
327
            pid = os.fork()
328
            if pid > 0:
329
                # Exit second parent.
330
                sys.exit(0)
331
        except OSError, e:
332
            sys.stderr.write("Failed to daemonize Haizea: (%d) %s\n" % (e.errno, e.strerror))
333
            sys.exit(2)
334
335
        # Open file descriptors and print start message
336
        si = file(DAEMON_STDIN, 'r')
337
        so = file(DAEMON_STDOUT, 'a+')
338
        se = file(DAEMON_STDERR, 'a+', 0)
339
        pid = os.getpid()
340
        sys.stderr.write("\nStarted Haizea daemon with pid %i\n\n" % pid)
341
        sys.stderr.flush()
342
        file(self.pidfile,'w+').write("%i\n" % pid)
343
344
        # Redirect standard file descriptors.
345
        os.close(sys.stdin.fileno())
346
        os.close(sys.stdout.fileno())
347
        os.close(sys.stderr.fileno())
348
        os.dup2(si.fileno(), sys.stdin.fileno())
349
        os.dup2(so.fileno(), sys.stdout.fileno())
350
        os.dup2(se.fileno(), sys.stderr.fileno())
351
352
    def start(self):
353
        """Starts the resource manager"""
354 693 borja
355
        self.__initialize()
356
357 632 borja
        self.logger.info("Starting resource manager")
358
359 664 borja
        for frontend in self.frontends:
360
            frontend.load(self)
361
362 632 borja
        if self.daemon:
363
            self.daemonize()
364
        if self.rpc_server:
365
            self.rpc_server.start()
366
367 647 borja
        self.__recover()
368
369 632 borja
        # Start the clock
370
        try:
371
            self.clock.run()
372
        except UnrecoverableError, exc:
373
            self.__unrecoverable_error(exc)
374
        except Exception, exc:
375
            self.__unexpected_exception(exc)
376
377
    def stop(self):
378
        """Stops the resource manager by stopping the clock"""
379
        self.clock.stop()
380
381
    def graceful_stop(self):
382
        """Stops the resource manager gracefully and exits"""
383
384
        self.logger.status("Stopping resource manager gracefully...")
385
386
        # Stop collecting data (this finalizes counters)
387
        self.accounting.stop()
388
389 647 borja
        self.persistence.close()
390
391 632 borja
        # TODO: When gracefully stopping mid-scheduling, we need to figure out what to
392
        #       do with leases that are still running.
393
394
        self.print_status()
395
396
        # In debug mode, dump the lease descriptors.
397
        for lease in self.scheduler.completed_leases.entries.values():
398
            lease.print_contents()
399
400
        # Write all collected data to disk
401 650 borja
        leases = self.scheduler.completed_leases.entries
402
        self.accounting.save_to_disk(leases)
403 632 borja
404
        # Stop RPC server
405
        if self.rpc_server != None:
406
            self.rpc_server.stop()
407
408
    def process_requests(self, nexttime):
409
        """Process any new requests in the request frontend
410

411
        Checks the request frontend to see if there are any new requests that
412
        have to be processed. AR leases are sent directly to the schedule.
413
        Best-effort leases are queued.
414

415
        Arguments:
416
        nexttime -- The next time at which the scheduler can allocate resources.
417
                    This is meant to be provided by the clock simply as a sanity
418
                    measure when running in real time (to avoid scheduling something
419
                    "now" to actually have "now" be in the past once the scheduling
420
                    function returns. i.e., nexttime has nothing to do with whether
421
                    there are resources available at that time or not.
422

423
        """
424
425
        # Get requests from frontend
426
        requests = []
427
        for frontend in self.frontends:
428
            requests += frontend.get_accumulated_requests()
429
        requests.sort(key=operator.attrgetter("submit_time"))
430
431
        # Request leases and run the scheduling function.
432
        try:
433
            self.logger.vdebug("Requesting leases")
434
            for req in requests:
435
                self.scheduler.request_lease(req)
436
437
            self.logger.vdebug("Running scheduling function")
438
            self.scheduler.schedule(nexttime)
439
        except UnrecoverableError, exc:
440
            self.__unrecoverable_error(exc)
441
        except Exception, exc:
442
            self.__unexpected_exception(exc)
443
444
    def process_starting_reservations(self, time):
445
        """Process reservations starting/stopping at specified time"""
446
447
        # The lease scheduler takes care of this.
448
        try:
449
            self.scheduler.process_starting_reservations(time)
450
        except UnrecoverableError, exc:
451
            self.__unrecoverable_error(exc)
452
        except Exception, exc:
453
            self.__unexpected_exception(exc)
454
455
    def process_ending_reservations(self, time):
456
        """Process reservations starting/stopping at specified time"""
457
458
        # The lease scheduler takes care of this.
459
        try:
460
            self.scheduler.process_ending_reservations(time)
461
        except UnrecoverableError, exc:
462
            self.__unrecoverable_error(exc)
463
        except Exception, exc:
464 647 borja
            self.__unexpected_exception(exc)
465 632 borja
466
    def notify_event(self, lease_id, event):
467
        """Notifies an asynchronous event to Haizea.
468

469
        Arguments:
470
        lease_id -- ID of lease that is affected by event
471
        event -- Event (currently, only the constants.EVENT_END_VM event is supported)
472
        """
473
        try:
474
            lease = self.scheduler.get_lease_by_id(lease_id)
475
            self.scheduler.notify_event(lease, event)
476
        except UnrecoverableError, exc:
477
            self.__unrecoverable_error(exc)
478
        except Exception, exc:
479
            self.__unexpected_exception(exc)
480
481
    def cancel_lease(self, lease_id):
482
        """Cancels a lease.
483

484
        Arguments:
485
        lease_id -- ID of lease to cancel
486
        """
487
        try:
488
            lease = self.scheduler.get_lease_by_id(lease_id)
489
            self.scheduler.cancel_lease(lease)
490
        except UnrecoverableError, exc:
491
            self.__unrecoverable_error(exc)
492
        except Exception, exc:
493
            self.__unexpected_exception(exc)
494
495
    def get_next_changepoint(self):
496
        """Return next changepoint in the slot table"""
497
        return self.scheduler.slottable.get_next_changepoint(self.clock.get_time())
498
499
    def exists_more_leases(self):
500
        """Return True if there are any leases still "in the system" """
501
        return self.scheduler.exists_scheduled_leases() or not self.scheduler.is_queue_empty()
502
503
    def print_status(self):
504
        """Prints status summary."""
505
506
        leases = self.scheduler.leases.get_leases()
507
        completed_leases = self.scheduler.completed_leases.get_leases()
508 718 borja
509 632 borja
        self.logger.status("--- Haizea status summary ---")
510 718 borja
        states = {}
511 725 borja
        for l in leases + completed_leases:
512 718 borja
            state = l.get_state()
513 725 borja
            ltype = l.get_type()
514
            states[state][ltype] = states.setdefault(state, {}).setdefault(ltype, 0) + 1
515 718 borja
        lstates = sorted(states.keys())
516
        for state in lstates:
517
            state_str = Lease.state_str[state]
518 725 borja
            num_leases = sum(states[state].values())
519
            types_str = " + ".join(["%i %s" % (num,Lease.type_str[ltype]) for ltype, num in states[state].items()])
520
            self.logger.status(("%s:" % state_str).ljust(20) + ("%i" % num_leases).rjust(6) + " (%s)" % types_str)
521 718 borja
        self.logger.status("------".rjust(26))
522
        self.logger.status("TOTAL:".rjust(20) + ("%i" % (len(leases) + len(completed_leases))).rjust(6))
523
        self.logger.status("---- End summary ----")
524 632 borja
525 723 borja
526 647 borja
    def __recover(self):
527 648 borja
        """Loads persisted leases and scheduling information
528

529
        This method does three things:
530
        1. Recover persisted leases. Note that not all persisted leases
531
           may be recoverable. For example, if a lease was scheduled
532
           to start at a certain time, but that time passed while
533
           Haizea was not running, the lease will simply be transitioned
534
           to a failed state.
535
        2. Recover the queue.
536
        3. Recover the list of "future leases" as determined by
537
           the backfilling algorithm.
538
        """
539
540
        # Load leases
541 647 borja
        leases = self.persistence.get_leases()
542
        for lease in leases:
543 648 borja
            # Create a list of RRs
544 647 borja
            rrs = lease.preparation_rrs + lease.vm_rrs
545
            for vmrr in lease.vm_rrs:
546
                rrs += vmrr.pre_rrs + vmrr.post_rrs
547
548 648 borja
            # Bind resource tuples in RRs to slot table
549 647 borja
            for rr in rrs:
550
                for restuple in rr.resources_in_pnode.values():
551
                    restuple.slottable = self.scheduler.slottable
552
553
            self.logger.debug("Attempting to recover lease %i" % lease.id)
554
            lease.print_contents()
555
556 648 borja
            # Check the lease's state and determine how to proceed.
557 647 borja
            load_rrs = False
558
            lease_state = lease.get_state()
559
            if lease_state in (Lease.STATE_DONE, Lease.STATE_CANCELLED, Lease.STATE_REJECTED, Lease.STATE_FAIL):
560
                self.logger.info("Recovered lease %i (already done)" % lease.id)
561
                self.scheduler.completed_leases.add(lease)
562
            elif lease_state in (Lease.STATE_NEW, Lease.STATE_PENDING):
563
                self.scheduler.leases.add(lease)
564
            elif lease_state == Lease.STATE_QUEUED:
565
                load_rrs = True
566
                self.scheduler.leases.add(lease)
567
                self.logger.info("Recovered lease %i (queued)" % lease.id)
568
            elif lease_state in (Lease.STATE_SCHEDULED, Lease.STATE_READY):
569
                # Check if schedule is still valid.
570
                vmrr = lease.get_last_vmrr()
571
                if len(vmrr.pre_rrs) > 0:
572
                    start = vmrr.pre_rrs[0].start
573
                else:
574
                    start = vmrr.start
575
                if self.clock.get_time() < start:
576
                    load_rrs = True
577
                    self.scheduler.leases.add(lease)
578
                    self.logger.info("Recovered lease %i" % lease.id)
579
                else:
580
                    lease.set_state(Lease.STATE_FAIL)
581
                    self.scheduler.completed_leases.add(lease)
582
                    self.logger.info("Could not recover lease %i (scheduled starting time has passed)" % lease.id)
583
            elif lease_state == Lease.STATE_ACTIVE:
584
                vmrr = lease.get_last_vmrr()
585
                if self.clock.get_time() < self.clock.get_time():
586
                    # TODO: Check if VMs are actually running
587
                    load_rrs = True
588
                    self.scheduler.leases.add(lease)
589
                    self.logger.info("Recovered lease %i" % lease.id)
590
                else:
591
                    # TODO: May have to stop extant virtual machines
592
                    lease.set_state(Lease.STATE_FAIL)
593
                    self.scheduler.completed_leases.add(lease)
594
                    self.logger.info("Could not recover lease %i (scheduled ending time has passed)" % lease.id)
595
            else:
596
                # No support for recovering lease in the
597
                # remaining states
598
                lease.set_state(Lease.STATE_FAIL)
599
                self.scheduler.completed_leases.add(lease)
600 648 borja
                self.logger.info("Could not recover lease %i (unsupported state %i for recovery)" % (lease.id, lease_state))
601 647 borja
602 648 borja
            # Load the lease's RRs into the slot table
603 647 borja
            if load_rrs:
604
                for rr in rrs:
605
                    if rr.state in (ResourceReservation.STATE_ACTIVE, ResourceReservation.STATE_SCHEDULED):
606
                        self.scheduler.slottable.add_reservation(rr)
607 648 borja
608
        # Rebuild the queue
609 647 borja
        queue = self.persistence.get_queue()
610
        for lease_id in queue:
611
            if self.scheduler.leases.has_lease(lease_id):
612
                lease = self.scheduler.leases.get_lease(lease_id)
613
                self.scheduler.queue.enqueue(lease)
614
615 648 borja
        # Rebuild the "future leases"
616 647 borja
        future = self.persistence.get_future_leases()
617
        for lease_id in future:
618
            if self.scheduler.leases.has_lease(lease_id):
619
                lease = self.scheduler.leases.get_lease(lease_id)
620
                self.scheduler.vm_scheduler.future_leases.add(lease)
621
622
623 632 borja
    def __unrecoverable_error(self, exc):
624
        """Handles an unrecoverable error.
625

626
        This method prints information on the unrecoverable error and makes Haizea panic.
627
        """
628
        self.logger.error("An unrecoverable error has happened.")
629
        self.logger.error("Original exception:")
630
        self.__print_exception(exc.exc, exc.get_traceback())
631
        self.logger.error("Unrecoverable error traceback:")
632
        self.__print_exception(exc, sys.exc_info()[2])
633
        self.__panic()
634
635
    def __unexpected_exception(self, exc):
636
        """Handles an unrecoverable error.
637

638
        This method prints information on the unrecoverable error and makes Haizea panic.
639
        """
640
        self.logger.error("An unexpected exception has happened.")
641
        self.__print_exception(exc, sys.exc_info()[2])
642
        self.__panic()
643
644
    def __print_exception(self, exc, exc_traceback):
645
        """Prints an exception's traceback to the log."""
646
        tb = traceback.format_tb(exc_traceback)
647
        for line in tb:
648
            self.logger.error(line)
649
        self.logger.error("Message: %s" % exc)
650
651
652
    def __panic(self):
653
        """Makes Haizea crash and burn in a panicked frenzy"""
654
655
        self.logger.status("Panicking...")
656
657
        # Stop RPC server
658
        if self.rpc_server != None:
659
            self.rpc_server.stop()
660
661
        # Dump state
662
        self.print_status()
663
        self.logger.error("Next change point (in slot table): %s" % self.get_next_changepoint())
664
665
        # Print lease descriptors
666
        leases = self.scheduler.leases.get_leases()
667
        if len(leases)>0:
668
            self.logger.vdebug("vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv")
669
            for lease in leases:
670
                lease.print_contents()
671
            self.logger.vdebug("^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^")
672
673
        # Exit
674
        treatment = self.config.get("lease-failure-handling")
675
        if treatment == constants.ONFAILURE_EXIT_RAISE:
676
            raise
677
        else:
678
            exit(1)
679
680
681
class Clock(object):
682
    """Base class for the resource manager's clock.
683

684
    The clock is in charge of periodically waking the resource manager so it
685
    will process new requests and handle existing reservations. This is a
686
    base class defining abstract methods.
687

688
    """
689
    def __init__(self, manager):
690
        self.manager = manager
691
        self.done = False
692
693
    def get_time(self):
694
        """Return the current time"""
695
        return abstract()
696
697
    def get_start_time(self):
698
        """Return the time at which the clock started ticking"""
699
        return abstract()
700
701
    def get_next_schedulable_time(self):
702
        """Return the next time at which resources could be scheduled.
703

704
        The "next schedulable time" server sanity measure when running
705
        in real time (to avoid scheduling something "now" to actually
706
        have "now" be in the past once the scheduling function returns.
707
        i.e., the "next schedulable time" has nothing to do with whether
708
        there are resources available at that time or not.
709
        """
710
        return abstract()
711
712
    def run(self):
713
        """Start and run the clock. This function is, in effect,
714
        the main loop of the resource manager."""
715
        return abstract()
716
717
    def stop(self):
718
        """Stop the clock.
719

720
        Stopping the clock makes Haizea exit.
721
        """
722
        self.done = True
723
724
725
class SimulatedClock(Clock):
726
    """Simulates the passage of time... really fast.
727

728
    The simulated clock steps through time to produce an ideal schedule.
729
    See the run() function for a description of how time is incremented
730
    exactly in the simulated clock.
731

732
    """
733
734
    def __init__(self, manager, starttime):
735
        """Initialize the simulated clock, starting at the provided starttime"""
736
        Clock.__init__(self, manager)
737
        self.starttime = starttime
738
        self.time = starttime
739
        self.logger = logging.getLogger("CLOCK")
740
        self.statusinterval = self.manager.config.get("status-message-interval")
741
742
    def get_time(self):
743
        """See docstring in base Clock class."""
744
        return self.time
745
746
    def get_start_time(self):
747
        """See docstring in base Clock class."""
748
        return self.starttime
749
750
    def get_next_schedulable_time(self):
751
        """See docstring in base Clock class."""
752
        return self.time
753
754
    def run(self):
755
        """Runs the simulated clock through time.
756

757
        The clock starts at the provided start time. At each point in time,
758
        it wakes up the resource manager and then skips to the next time
759
        where "something" is happening (see __get_next_time for a more
760
        rigorous description of this).
761

762
        The clock stops when there is nothing left to do (no pending or
763
        queue requests, and no future reservations)
764

765
        The simulated clock can only work in conjunction with the
766
        tracefile request frontend.
767
        """
768
        self.logger.status("Starting simulated clock")
769
        self.manager.accounting.start(self.get_start_time())
770
        prevstatustime = self.time
771
772
        # Main loop
773
        while not self.done:
774
            # Check to see if there are any leases which are ending prematurely.
775
            # Note that this is unique to simulation.
776
            prematureends = self.manager.scheduler.slottable.get_prematurely_ending_res(self.time)
777
778
            # Notify the resource manager about the premature ends
779
            for rr in prematureends:
780
                self.manager.notify_event(rr.lease.id, constants.EVENT_END_VM)
781
782
            # Process reservations starting/stopping at the current time and
783
            # check if there are any new requests.
784
            self.manager.process_ending_reservations(self.time)
785
            self.manager.process_starting_reservations(self.time)
786
            self.manager.process_requests(self.time)
787
788
            # Since processing requests may have resulted in new reservations
789
            # starting now, we process reservations again.
790
            self.manager.process_starting_reservations(self.time)
791
            # And one final call to deal with nil-duration reservations
792
            self.manager.process_ending_reservations(self.time)
793
794 650 borja
            self.manager.accounting.at_timestep(self.manager.scheduler)
795 632 borja
796 773 borja
            if self.manager.config.get("sanity-check"):
797
                passed, node, time, capacity = self.manager.scheduler.slottable.sanity_check(only_at=self.time)
798
                assert passed == True
799
800
                util = self.manager.accounting.get_last_counter_value("CPU utilization")
801
                assert util <= 1.0
802
803
                for l in self.manager.scheduler.leases.get_leases():
804
                    l.sanity_check()
805
806
807 632 borja
            # Print a status message
808
            if self.statusinterval != None and (self.time - prevstatustime).minutes >= self.statusinterval:
809
                self.manager.print_status()
810
                prevstatustime = self.time
811
812
            # Skip to next point in time.
813
            self.time, self.done = self.__get_next_time()
814
815
        self.logger.status("Simulated clock has stopped")
816
817
        # Stop the resource manager
818
        self.manager.graceful_stop()
819
820
821
    def __get_next_time(self):
822
        """Determines what is the next point in time to skip to.
823

824
        At a given point in time, the next time is the earliest of the following:
825
        * The arrival of the next lease request
826
        * The start or end of a reservation (a "changepoint" in the slot table)
827
        * A premature end of a lease
828
        """
829
830
        # Determine candidate next times
831
        tracefrontend = self.__get_trace_frontend()
832
        nextchangepoint = self.manager.get_next_changepoint()
833
        nextprematureend = self.manager.scheduler.slottable.get_next_premature_end(self.time)
834
        nextreqtime = tracefrontend.get_next_request_time()
835
        self.logger.debug("Next change point (in slot table): %s" % nextchangepoint)
836
        self.logger.debug("Next request time: %s" % nextreqtime)
837
        self.logger.debug("Next premature end: %s" % nextprematureend)
838
839
        # The previous time is now
840
        prevtime = self.time
841
842
        # We initialize the next time to now too, to detect if
843
        # we've been unable to determine what the next time is.
844
        newtime = self.time
845
846
        # Find the earliest of the three, accounting for None values
847
        if nextchangepoint != None and nextreqtime == None:
848
            newtime = nextchangepoint
849
        elif nextchangepoint == None and nextreqtime != None:
850
            newtime = nextreqtime
851
        elif nextchangepoint != None and nextreqtime != None:
852
            newtime = min(nextchangepoint, nextreqtime)
853
854
        if nextprematureend != None:
855
            newtime = min(nextprematureend, newtime)
856
857
        # If there's no more leases in the system, and no more pending requests,
858
        # then we're done.
859
        if not self.manager.exists_more_leases() and not tracefrontend.exists_more_requests():
860
            self.done = True
861
862
        # We can also be done if we've specified that we want to stop when
863
        # the best-effort requests are all done or when they've all been submitted.
864
        stopwhen = self.manager.config.get("stop-when")
865 811 borja
866 632 borja
        besteffort = self.manager.scheduler.leases.get_leases(type = Lease.BEST_EFFORT)
867
        pendingbesteffort = [r for r in tracefrontend.requests if r.get_type() == Lease.BEST_EFFORT]
868
        if stopwhen == constants.STOPWHEN_BEDONE:
869
            if self.manager.scheduler.is_queue_empty() and len(besteffort) + len(pendingbesteffort) == 0:
870
                self.done = True
871
        elif stopwhen == constants.STOPWHEN_BESUBMITTED:
872
            if len(pendingbesteffort) == 0:
873
                self.done = True
874 811 borja
        elif stopwhen == constants.STOPWHEN_ALLDONE:
875
            pass
876
        elif stopwhen == constants.STOPWHEN_EXACT:
877
            t = Parser.DateTimeDeltaFromString(self.manager.config.get("stop-when-time"))
878
            if self.time >= self.starttime + t:
879
                self.done = True
880 632 borja
881
        # If we didn't arrive at a new time, and we're not done, we've fallen into
882
        # an infinite loop. This is A Bad Thing(tm).
883
        if newtime == prevtime and self.done != True:
884
            raise Exception, "Simulated clock has fallen into an infinite loop."
885
886
        return newtime, self.done
887
888
    def __get_trace_frontend(self):
889
        """Gets the tracefile frontend from the resource manager"""
890
        frontends = self.manager.frontends
891
        tracef = [f for f in frontends if isinstance(f, TracefileFrontend)]
892
        if len(tracef) != 1:
893
            raise Exception, "The simulated clock can only work with a tracefile request frontend."
894
        else:
895
            return tracef[0]
896
897
898
class RealClock(Clock):
899
    """A realtime clock.
900

901
    The real clock wakes up periodically to, in turn, tell the resource manager
902
    to wake up. The real clock can also be run in a "fastforward" mode for
903
    debugging purposes (however, unlike the simulated clock, the clock will
904
    always skip a fixed amount of time into the future).
905
    """
906
    def __init__(self, manager, quantum, non_sched, fastforward = False):
907
        """Initializes the real clock.
908

909
        Arguments:
910
        manager -- the resource manager
911
        quantum -- interval between clock wakeups
912
        fastforward -- if True, the clock won't actually sleep
913
                       for the duration of the quantum."""
914
        Clock.__init__(self, manager)
915
        self.fastforward = fastforward
916
        if not self.fastforward:
917
            self.lastwakeup = None
918
        else:
919
            self.lastwakeup = round_datetime(now())
920
        self.logger = logging.getLogger("CLOCK")
921
        self.starttime = self.get_time()
922
        self.nextschedulable = None
923
        self.nextperiodicwakeup = None
924
        self.quantum = TimeDelta(seconds=quantum)
925
        self.non_sched = TimeDelta(seconds=non_sched)
926
927
    def get_time(self):
928
        """See docstring in base Clock class."""
929
        if not self.fastforward:
930
            return now()
931
        else:
932
            return self.lastwakeup
933
934
    def get_start_time(self):
935
        """See docstring in base Clock class."""
936
        return self.starttime
937
938
    def get_next_schedulable_time(self):
939
        """See docstring in base Clock class."""
940
        return self.nextschedulable
941
942
    def run(self):
943
        """Runs the real clock through time.
944

945
        The clock starts when run() is called. In each iteration of the main loop
946
        it will do the following:
947
        - Wake up the resource manager
948
        - Determine if there will be anything to do before the next
949
          time the clock will wake up (after the quantum has passed). Note
950
          that this information is readily available on the slot table.
951
          If so, set next-wakeup-time to (now + time until slot table
952
          event). Otherwise, set it to (now + quantum)
953
        - Sleep until next-wake-up-time
954

955
        The clock keeps on tickin' until a SIGINT signal (Ctrl-C if running in the
956
        foreground) or a SIGTERM signal is received.
957
        """
958
        self.logger.status("Starting clock")
959
        self.manager.accounting.start(self.get_start_time())
960
961
        try:
962
            signal.signal(signal.SIGINT, self.signalhandler_gracefulstop)
963
            signal.signal(signal.SIGTERM, self.signalhandler_gracefulstop)
964
        except ValueError, exc:
965
            # This means Haizea is not the main thread, which will happen
966
            # when running it as part of a py.test. We simply ignore this
967
            # to allow the test to continue.
968
            pass
969
970
        # Main loop
971
        while not self.done:
972
            self.logger.status("Waking up to manage resources")
973 676 borja
974 632 borja
            # Save the waking time. We want to use a consistent time in the
975
            # resource manager operations (if we use now(), we'll get a different
976
            # time every time)
977
            if not self.fastforward:
978
                self.lastwakeup = round_datetime(self.get_time())
979
            self.logger.status("Wake-up time recorded as %s" % self.lastwakeup)
980
981
            # Next schedulable time
982
            self.nextschedulable = round_datetime(self.lastwakeup + self.non_sched)
983
984 676 borja
            # Check if there are any changes in the resource pool
985
            new_nodes = self.manager.scheduler.vm_scheduler.resourcepool.refresh_nodes()
986
            for n in new_nodes:
987
                rt = self.manager.scheduler.slottable.create_resource_tuple_from_capacity(n.capacity)
988
                self.manager.scheduler.slottable.add_node(n.id, rt)
989
990 632 borja
            # Wake up the resource manager
991
            self.manager.process_ending_reservations(self.lastwakeup)
992
            self.manager.process_starting_reservations(self.lastwakeup)
993
            # TODO: Compute nextschedulable here, before processing requests
994
            self.manager.process_requests(self.nextschedulable)
995 650 borja
996
            self.manager.accounting.at_timestep(self.manager.scheduler)
997 632 borja
998
            # Next wakeup time
999
            time_now = now()
1000
            if self.lastwakeup + self.quantum <= time_now:
1001
                quantums = (time_now - self.lastwakeup) / self.quantum
1002
                quantums = int(ceil(quantums)) * self.quantum
1003
                self.nextperiodicwakeup = round_datetime(self.lastwakeup + quantums)
1004
            else:
1005
                self.nextperiodicwakeup = round_datetime(self.lastwakeup + self.quantum)
1006
1007
            # Determine if there's anything to do before the next wakeup time
1008
            nextchangepoint = self.manager.get_next_changepoint()
1009
            if nextchangepoint != None and nextchangepoint <= self.nextperiodicwakeup:
1010
                # We need to wake up earlier to handle a slot table event
1011
                nextwakeup = nextchangepoint
1012
                self.logger.status("Going back to sleep. Waking up at %s to handle slot table event." % nextwakeup)
1013
            else:
1014
                # Nothing to do before waking up
1015
                nextwakeup = self.nextperiodicwakeup
1016
                self.logger.status("Going back to sleep. Waking up at %s to see if something interesting has happened by then." % nextwakeup)
1017
1018
            # The only exit condition from the real clock is if the stop_when_no_more_leases
1019
            # is set to True, and there's no more work left to do.
1020
            # TODO: This first if is a kludge. Other options should only interact with
1021
            # options through the configfile's get method. The "stop-when-no-more-leases"
1022
            # option is currently OpenNebula-specific (while the real clock isn't; it can
1023
            # be used by both the simulator and the OpenNebula mode). This has to be
1024
            # fixed.
1025
            if self.manager.config._options.has_key("stop-when-no-more-leases"):
1026
                stop_when_no_more_leases = self.manager.config.get("stop-when-no-more-leases")
1027
                if stop_when_no_more_leases and not self.manager.exists_more_leases():
1028
                    self.done = True
1029
1030
            # Sleep
1031
            if not self.done:
1032
                if not self.fastforward:
1033
                    sleep((nextwakeup - now()).seconds)
1034
                else:
1035
                    self.lastwakeup = nextwakeup
1036
1037
        self.logger.status("Real clock has stopped")
1038
1039
        # Stop the resource manager
1040
        self.manager.graceful_stop()
1041
1042
    def signalhandler_gracefulstop(self, signum, frame):
1043
        """Handler for SIGTERM and SIGINT. Allows Haizea to stop gracefully."""
1044
1045
        sigstr = ""
1046
        if signum == signal.SIGTERM:
1047
            sigstr = " (SIGTERM)"
1048
        elif signum == signal.SIGINT:
1049
            sigstr = " (SIGINT)"
1050
        self.logger.status("Received signal %i%s" %(signum, sigstr))
1051
        self.done = True
1052
1053 647 borja
1054 648 borja
class PersistenceManager(object):
1055
    """Persistence manager.
1056

1057
    The persistence manager is in charge of persisting leases, and some
1058
    scheduling data, to disk. This allows Haizea to recover from crashes.
1059
    """
1060
1061 647 borja
    def __init__(self, file):
1062 648 borja
        """Constructor
1063 647 borja

1064 648 borja
        Initializes the persistence manager. If the specified file
1065
        does not exist, it is created. If the file is created, it
1066
        is opened but the information is not recovered (this is
1067
        the responsibility of the Manager class)
1068

1069
        Arguments:
1070
        file -- Persistence file. If None is specified, then
1071
                persistence is disabled and Haizea will run entirely
1072
                in-memory.
1073
        """
1074
        if file == None:
1075
            self.disabled = True
1076
            self.shelf = {}
1077
        else:
1078
            self.disabled = False
1079
            file = os.path.expanduser(file)
1080
            d = os.path.dirname(file)
1081
            if not os.path.exists(d):
1082
                os.makedirs(d)
1083
            self.shelf = shelve.open(file, flag='c', protocol = -1)
1084
1085 647 borja
    def persist_lease(self, lease):
1086 648 borja
        """Persists a single lease to disk
1087

1088
        Arguments:
1089
        lease -- Lease to persist
1090
        """
1091
        if not self.disabled:
1092
            self.shelf["lease-%i" % lease.id] = lease
1093
            self.shelf.sync()
1094 647 borja
1095
    def persist_queue(self, queue):
1096 648 borja
        """Persists the queue to disk
1097

1098
        Arguments:
1099
        queue -- The queue
1100
        """
1101
        if not self.disabled:
1102
            self.shelf["queue"] = [l.id for l in queue]
1103
            self.shelf.sync()
1104 647 borja
1105
    def persist_future_leases(self, leases):
1106 648 borja
        """Persists the set of future leases
1107

1108
        Arguments:
1109
        leases -- "Future leases" (as determined by backfilling algorithm)
1110
        """
1111
        if not self.disabled:
1112
            self.shelf["future"] = [l.id for l in leases]
1113
            self.shelf.sync()
1114 647 borja
1115
    def get_leases(self):
1116 648 borja
        """Returns the leases persisted to disk.
1117

1118
        """
1119 647 borja
        return [v for k,v in self.shelf.items() if k.startswith("lease-")]
1120
1121
    def get_queue(self):
1122 648 borja
        """Returns the queue persisted to disk.
1123

1124
        """
1125 647 borja
        if self.shelf.has_key("queue"):
1126
            return self.shelf["queue"]
1127
        else:
1128
            return []
1129
1130
    def get_future_leases(self):
1131 648 borja
        """Returns the future leases persisted to disk.
1132

1133
        """
1134 647 borja
        if self.shelf.has_key("future"):
1135
            return self.shelf["future"]
1136
        else:
1137
            return []
1138
1139
    def close(self):
1140 648 borja
        """Closes the persistence manager.
1141 647 borja

1142 648 borja
        Closing the persistence manager saves any remaining
1143
        data to disk.
1144
        """
1145
        if not self.disabled:
1146
            self.shelf.close()
1147