Project

General

Profile

root / branches / 1.1 / src / haizea / core / enact / opennebula.py @ 844

1
# -------------------------------------------------------------------------- #
2
# Copyright 2006-2009, University of Chicago                                 #
3
# Copyright 2008-2009, Distributed Systems Architecture Group, Universidad   #
4
# Complutense de Madrid (dsa-research.org)                                   #
5
#                                                                            #
6
# Licensed under the Apache License, Version 2.0 (the "License"); you may    #
7
# not use this file except in compliance with the License. You may obtain    #
8
# a copy of the License at                                                   #
9
#                                                                            #
10
# http://www.apache.org/licenses/LICENSE-2.0                                 #
11
#                                                                            #
12
# Unless required by applicable law or agreed to in writing, software        #
13
# distributed under the License is distributed on an "AS IS" BASIS,          #
14
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.   #
15
# See the License for the specific language governing permissions and        #
16
# limitations under the License.                                             #
17
# -------------------------------------------------------------------------- #
18

    
19
from haizea.core.scheduler import EnactmentError
20
from haizea.core.leases import Capacity
21
from haizea.core.scheduler.resourcepool import ResourcePoolNode
22
from haizea.core.enact import ResourcePoolInfo, VMEnactment, DeploymentEnactment
23
from haizea.common.utils import get_config, OpenNebulaXMLRPCClientSingleton
24
from haizea.common.opennebula_xmlrpc import OpenNebulaVM, OpenNebulaHost
25
import haizea.common.constants as constants
26
import logging
27
from time import sleep
28

    
29
class OpenNebulaEnactmentError(EnactmentError):
30
    def __init__(self, method, msg):
31
        self.method = method
32
        self.msg = msg
33
        self.message = "Error when invoking '%s': %s" % (method, msg)
34

    
35
class OpenNebulaResourcePoolInfo(ResourcePoolInfo):
36
    
37
    def __init__(self):
38
        ResourcePoolInfo.__init__(self)
39
        self.logger = logging.getLogger("ENACT.ONE.INFO")
40

    
41
        self.rpc = OpenNebulaXMLRPCClientSingleton().client
42

    
43
        # Get information about nodes from OpenNebula
44
        self.max_nod_id = 0
45
        self.nodes = {}
46

    
47
        self.logger.info("Fetching nodes from OpenNebula")            
48
        self.__fetch_nodes()
49
        self.logger.info("Fetched %i nodes from OpenNebula" % len(self.nodes))            
50

    
51
        cpu_instances = max([n.capacity.ninstances[constants.RES_CPU] for n in self.nodes.values()])
52

    
53
        self.resource_types = []
54
        self.resource_types.append((constants.RES_CPU,cpu_instances))
55
        self.resource_types.append((constants.RES_MEM,1))
56
        self.resource_types.append((constants.RES_DISK,1))
57
                
58
    def refresh(self):
59
        return self.__fetch_nodes()
60
        
61
    def get_nodes(self):
62
        return self.nodes
63
    
64
    def get_resource_types(self):
65
        return self.resource_types
66

    
67
    def get_bandwidth(self):
68
        return 0
69
    
70
    def __fetch_nodes(self):
71
        new_nodes = []
72
        hosts = self.rpc.hostpool_info()
73
        hostnames = set([n.hostname for n in self.nodes.values()])
74
        for host in hosts:
75
            # CPU
76
            # OpenNebula reports each CPU as "100"
77
            # (so, a 4-core machine is reported as "400")
78
            # We need to convert this to a multi-instance
79
            # resource type in Haizea            
80
            cpu = host.max_cpu
81
            ncpu = cpu / 100
82
            enact_id = host.id                
83
            hostname = host.name
84
            
85
            # We want to skip nodes we're already aware of ...
86
            if hostname in hostnames:
87
                continue
88

    
89
            # ... and those in an error or disabled state ...
90
            if host.state in (OpenNebulaHost.STATE_ERROR, OpenNebulaHost.STATE_DISABLED):
91
                continue
92
            
93
            # ... and those were monitoring information is not yet available.
94
            if cpu == 0:
95
                self.logger.debug("Skipping node '%s' (monitoring information not yet available)" % hostname)
96
                continue
97
            
98
            self.max_nod_id += 1
99
            
100
            nod_id = self.max_nod_id
101
            capacity = Capacity([constants.RES_CPU, constants.RES_MEM, constants.RES_DISK])
102
            
103
            capacity.set_ninstances(constants.RES_CPU, ncpu)
104
            for i in range(ncpu):
105
                capacity.set_quantity_instance(constants.RES_CPU, i+1, 100)            
106
            
107
            # Memory. Must divide by 1024 to obtain quantity in MB
108
            capacity.set_quantity(constants.RES_MEM, host.max_mem / 1024.0)
109
            
110
            # Disk
111
            # OpenNebula doesn't report this correctly yet.
112
            # We set it to an arbitrarily high value.
113
            capacity.set_quantity(constants.RES_DISK, 80000)
114

    
115
            node = ResourcePoolNode(nod_id, hostname, capacity)
116
            node.enactment_info = enact_id
117
            self.nodes[nod_id] = node
118
            new_nodes.append(node)
119
            self.logger.debug("Fetched node %i %s %s" % (node.id, node.hostname, node.capacity))
120
        return new_nodes
121
        
122

    
123
class OpenNebulaVMEnactment(VMEnactment):
124
    def __init__(self):
125
        VMEnactment.__init__(self)
126
        self.logger = logging.getLogger("ENACT.ONE.VM")
127
        self.rpc = OpenNebulaXMLRPCClientSingleton().client
128

    
129
    def start(self, action):
130
        for vnode in action.vnodes:
131
            # Unpack action
132
            vid = action.vnodes[vnode].enactment_info
133
            hid = action.vnodes[vnode].pnode
134
            
135
            self.logger.debug("Sending request to start VM for L%iV%i (ONE: vid=%i, hid=%i)"
136
                         % (action.lease_haizea_id, vnode, vid, hid))
137

    
138
            try:
139
                self.rpc.vm_deploy(vid, hid)
140
                self.logger.debug("Request succesful.")
141
            except Exception, msg:
142
                raise OpenNebulaEnactmentError("vm.deploy", msg)
143
            
144
    def stop(self, action):
145
        for vnode in action.vnodes:
146
            # Unpack action
147
            vid = action.vnodes[vnode].enactment_info
148
            
149
            self.logger.debug("Sending request to shutdown VM for L%iV%i (ONE: vid=%i)"
150
                         % (action.lease_haizea_id, vnode, vid))
151

    
152
            try:
153
                self.rpc.vm_shutdown(vid)
154
                self.logger.debug("Request succesful.")
155
            except Exception, msg:
156
                raise OpenNebulaEnactmentError("vm.shutdown", msg)
157
            
158
            # Space out commands to avoid OpenNebula from getting saturated
159
            # TODO: We should spawn out a thread to do this, so Haizea isn't
160
            # blocking until all these commands end
161
            interval = get_config().get("enactment-overhead").seconds
162
            sleep(interval)
163

    
164
    def suspend(self, action):
165
        for vnode in action.vnodes:
166
            # Unpack action
167
            vid = action.vnodes[vnode].enactment_info
168
            
169
            self.logger.debug("Sending request to suspend VM for L%iV%i (ONE: vid=%i)"
170
                         % (action.lease_haizea_id, vnode, vid))
171

    
172
            try:
173
                self.rpc.vm_suspend(vid)
174
                self.logger.debug("Request succesful.")
175
            except Exception, msg:
176
                raise OpenNebulaEnactmentError("vm.suspend", msg)
177
            
178
            # Space out commands to avoid OpenNebula from getting saturated
179
            # TODO: We should spawn out a thread to do this, so Haizea isn't
180
            # blocking until all these commands end
181
            interval = get_config().get("enactment-overhead").seconds
182
            sleep(interval)
183
        
184
    def resume(self, action):
185
        for vnode in action.vnodes:
186
            # Unpack action
187
            vid = action.vnodes[vnode].enactment_info
188
            
189
            self.logger.debug("Sending request to resume VM for L%iV%i (ONE: vid=%i)"
190
                         % (action.lease_haizea_id, vnode, vid))
191

    
192
            try:
193
                self.rpc.vm_resume(vid)
194
                self.logger.debug("Request succesful.")
195
            except Exception, msg:
196
                raise OpenNebulaEnactmentError("vm.resume", msg)
197
            
198
            # Space out commands to avoid OpenNebula from getting saturated
199
            # TODO: We should spawn out a thread to do this, so Haizea isn't
200
            # blocking until all these commands end
201
            interval = get_config().get("enactment-overhead").seconds
202
            sleep(interval)
203

    
204
    def verify_suspend(self, action):
205
        result = 0
206
        for vnode in action.vnodes:
207
            # Unpack action
208
            vid = action.vnodes[vnode].enactment_info
209
            
210
            try:
211
                vm = self.rpc.vm_info(vid)   
212
                state = vm.state
213
                if state == OpenNebulaVM.STATE_SUSPENDED:
214
                    self.logger.debug("Suspend of L%iV%i correct (ONE vid=%i)." % (action.lease_haizea_id, vnode, vid))
215
                else:
216
                    self.logger.warning("ONE did not complete suspend of L%iV%i on time. State is %i. (ONE vid=%i)" % (action.lease_haizea_id, vnode, state, vid))
217
                    result = 1
218
            except Exception, msg:
219
                raise OpenNebulaEnactmentError("vm.info", msg)
220

    
221
        return result
222
        
223
    def verify_resume(self, action):
224
        result = 0
225
        for vnode in action.vnodes:
226
            # Unpack action
227
            vid = action.vnodes[vnode].enactment_info
228
            
229
            try:
230
                vm = self.rpc.vm_info(vid)   
231
                state = vm.state
232
                if state == OpenNebulaVM.STATE_ACTIVE:
233
                    self.logger.debug("Resume of L%iV%i correct (ONE vid=%i)." % (action.lease_haizea_id, vnode, vid))
234
                else:
235
                    self.logger.warning("ONE did not complete resume of L%iV%i on time. State is %i. (ONE vid=%i)" % (action.lease_haizea_id, vnode, state, vid))
236
                    result = 1
237
            except Exception, msg:
238
                raise OpenNebulaEnactmentError("vm.info", msg)
239

    
240
        return result        
241

    
242
class OpenNebulaDummyDeploymentEnactment(DeploymentEnactment):    
243
    def __init__(self):
244
        DeploymentEnactment.__init__(self)
245
            
246
    def get_aux_nodes(self):
247
        return [] 
248
            
249
    def resolve_to_file(self, lease_id, vnode, diskimage_id):
250
        return "/var/haizea/images/%s-L%iV%i" % (diskimage_id, lease_id, vnode)