Project

General

Profile

root / trunk / src / haizea / core / enact / opennebula.py @ 676

1
# -------------------------------------------------------------------------- #
2
# Copyright 2006-2009, University of Chicago                                 #
3
# Copyright 2008-2009, Distributed Systems Architecture Group, Universidad   #
4
# Complutense de Madrid (dsa-research.org)                                   #
5
#                                                                            #
6
# Licensed under the Apache License, Version 2.0 (the "License"); you may    #
7
# not use this file except in compliance with the License. You may obtain    #
8
# a copy of the License at                                                   #
9
#                                                                            #
10
# http://www.apache.org/licenses/LICENSE-2.0                                 #
11
#                                                                            #
12
# Unless required by applicable law or agreed to in writing, software        #
13
# distributed under the License is distributed on an "AS IS" BASIS,          #
14
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.   #
15
# See the License for the specific language governing permissions and        #
16
# limitations under the License.                                             #
17
# -------------------------------------------------------------------------- #
18

    
19
from haizea.core.scheduler import EnactmentError
20
from haizea.core.leases import Capacity
21
from haizea.core.scheduler.resourcepool import ResourcePoolNode
22
from haizea.core.enact import ResourcePoolInfo, VMEnactment, DeploymentEnactment
23
from haizea.common.utils import get_config, OpenNebulaXMLRPCClientSingleton
24
from haizea.common.opennebula_xmlrpc import OpenNebulaVM, OpenNebulaHost
25
import haizea.common.constants as constants
26
import logging
27
from time import sleep
28

    
29
class OpenNebulaEnactmentError(EnactmentError):
30
    def __init__(self, method, msg):
31
        self.method = method
32
        self.msg = msg
33
        self.message = "Error when invoking '%s': %s" % (method, msg)
34

    
35
class OpenNebulaResourcePoolInfo(ResourcePoolInfo):
36
    
37
    def __init__(self):
38
        ResourcePoolInfo.__init__(self)
39
        self.logger = logging.getLogger("ENACT.ONE.INFO")
40

    
41
        self.rpc = OpenNebulaXMLRPCClientSingleton().client
42

    
43
        # Get information about nodes from OpenNebula
44
        self.max_nod_id = 0
45
        self.nodes = {}
46

    
47
        self.resource_types = []
48
        self.resource_types.append((constants.RES_CPU,1))
49
        self.resource_types.append((constants.RES_MEM,1))
50
        self.resource_types.append((constants.RES_DISK,1))
51
                    
52
        self.logger.info("Fetching nodes from OpenNebula")            
53
        self.__fetch_nodes()
54
        self.logger.info("Fetched %i nodes from OpenNebula" % len(self.nodes))            
55
        
56
    def refresh(self):
57
        return self.__fetch_nodes()
58
        
59
    def get_nodes(self):
60
        return self.nodes
61
    
62
    def get_resource_types(self):
63
        return self.resource_types
64

    
65
    def get_bandwidth(self):
66
        return 0
67
    
68
    def __fetch_nodes(self):
69
        new_nodes = []
70
        hosts = self.rpc.hostpool_info()
71
        hostnames = set([n.hostname for n in self.nodes.values()])
72
        for host in hosts:
73
            # CPU
74
            # OpenNebula reports each CPU as "100"
75
            # (so, a 4-core machine is reported as "400")
76
            # We need to convert this to a multi-instance
77
            # resource type in Haizea            
78
            cpu = host.max_cpu
79
            ncpu = cpu / 100
80
            enact_id = host.id                
81
            hostname = host.name
82
            
83
            # We want to skip nodes we're already aware of ...
84
            if hostname in hostnames:
85
                continue
86

    
87
            # ... and those in an error or disabled state ...
88
            if host.state in (OpenNebulaHost.STATE_ERROR, OpenNebulaHost.STATE_DISABLED):
89
                continue
90
            
91
            # ... and those were monitoring information is not yet available.
92
            if cpu == 0:
93
                self.logger.debug("Skipping node '%s' (monitoring information not yet available)" % hostname)
94
                continue
95
            
96
            self.max_nod_id += 1
97
            
98
            nod_id = self.max_nod_id
99
            capacity = Capacity([constants.RES_CPU, constants.RES_MEM, constants.RES_DISK])
100
            
101
            capacity.set_ninstances(constants.RES_CPU, ncpu)
102
            for i in range(ncpu):
103
                capacity.set_quantity_instance(constants.RES_CPU, i+1, 100)            
104
            
105
            # Memory. Must divide by 1024 to obtain quantity in MB
106
            capacity.set_quantity(constants.RES_MEM, host.max_mem / 1024.0)
107
            
108
            # Disk
109
            # OpenNebula doesn't report this correctly yet.
110
            # We set it to an arbitrarily high value.
111
            capacity.set_quantity(constants.RES_DISK, 80000)
112

    
113
            node = ResourcePoolNode(nod_id, hostname, capacity)
114
            node.enactment_info = enact_id
115
            self.nodes[nod_id] = node
116
            new_nodes.append(node)
117
            self.logger.debug("Fetched node %i %s %s" % (node.id, node.hostname, node.capacity))
118
        return new_nodes
119
        
120

    
121
class OpenNebulaVMEnactment(VMEnactment):
122
    def __init__(self):
123
        VMEnactment.__init__(self)
124
        self.logger = logging.getLogger("ENACT.ONE.VM")
125
        self.rpc = OpenNebulaXMLRPCClientSingleton().client
126

    
127
    def start(self, action):
128
        for vnode in action.vnodes:
129
            # Unpack action
130
            vid = action.vnodes[vnode].enactment_info
131
            hid = action.vnodes[vnode].pnode
132
            
133
            self.logger.debug("Sending request to start VM for L%iV%i (ONE: vid=%i, hid=%i)"
134
                         % (action.lease_haizea_id, vnode, vid, hid))
135

    
136
            try:
137
                self.rpc.vm_deploy(vid, hid)
138
                self.logger.debug("Request succesful.")
139
            except Exception, msg:
140
                raise OpenNebulaEnactmentError("vm.deploy", msg)
141
            
142
    def stop(self, action):
143
        for vnode in action.vnodes:
144
            # Unpack action
145
            vid = action.vnodes[vnode].enactment_info
146
            
147
            self.logger.debug("Sending request to shutdown VM for L%iV%i (ONE: vid=%i)"
148
                         % (action.lease_haizea_id, vnode, vid))
149

    
150
            try:
151
                self.rpc.vm_shutdown(vid)
152
                self.logger.debug("Request succesful.")
153
            except Exception, msg:
154
                raise OpenNebulaEnactmentError("vm.shutdown", msg)
155
            
156
            # Space out commands to avoid OpenNebula from getting saturated
157
            # TODO: We should spawn out a thread to do this, so Haizea isn't
158
            # blocking until all these commands end
159
            interval = get_config().get("enactment-overhead").seconds
160
            sleep(interval)
161

    
162
    def suspend(self, action):
163
        for vnode in action.vnodes:
164
            # Unpack action
165
            vid = action.vnodes[vnode].enactment_info
166
            
167
            self.logger.debug("Sending request to suspend VM for L%iV%i (ONE: vid=%i)"
168
                         % (action.lease_haizea_id, vnode, vid))
169

    
170
            try:
171
                self.rpc.vm_suspend(vid)
172
                self.logger.debug("Request succesful.")
173
            except Exception, msg:
174
                raise OpenNebulaEnactmentError("vm.suspend", msg)
175
            
176
            # Space out commands to avoid OpenNebula from getting saturated
177
            # TODO: We should spawn out a thread to do this, so Haizea isn't
178
            # blocking until all these commands end
179
            interval = get_config().get("enactment-overhead").seconds
180
            sleep(interval)
181
        
182
    def resume(self, action):
183
        for vnode in action.vnodes:
184
            # Unpack action
185
            vid = action.vnodes[vnode].enactment_info
186
            
187
            self.logger.debug("Sending request to resume VM for L%iV%i (ONE: vid=%i)"
188
                         % (action.lease_haizea_id, vnode, vid))
189

    
190
            try:
191
                self.rpc.vm_resume(vid)
192
                self.logger.debug("Request succesful.")
193
            except Exception, msg:
194
                raise OpenNebulaEnactmentError("vm.resume", msg)
195
            
196
            # Space out commands to avoid OpenNebula from getting saturated
197
            # TODO: We should spawn out a thread to do this, so Haizea isn't
198
            # blocking until all these commands end
199
            interval = get_config().get("enactment-overhead").seconds
200
            sleep(interval)
201

    
202
    def verify_suspend(self, action):
203
        result = 0
204
        for vnode in action.vnodes:
205
            # Unpack action
206
            vid = action.vnodes[vnode].enactment_info
207
            
208
            try:
209
                vm = self.rpc.vm_info(vid)   
210
                state = vm.state
211
                if state == OpenNebulaVM.STATE_SUSPENDED:
212
                    self.logger.debug("Suspend of L%iV%i correct (ONE vid=%i)." % (action.lease_haizea_id, vnode, vid))
213
                else:
214
                    self.logger.warning("ONE did not complete suspend of L%iV%i on time. State is %i. (ONE vid=%i)" % (action.lease_haizea_id, vnode, state, vid))
215
                    result = 1
216
            except Exception, msg:
217
                raise OpenNebulaEnactmentError("vm.info", msg)
218

    
219
        return result
220
        
221
    def verify_resume(self, action):
222
        result = 0
223
        for vnode in action.vnodes:
224
            # Unpack action
225
            vid = action.vnodes[vnode].enactment_info
226
            
227
            try:
228
                vm = self.rpc.vm_info(vid)   
229
                state = vm.state
230
                if state == OpenNebulaVM.STATE_ACTIVE:
231
                    self.logger.debug("Resume of L%iV%i correct (ONE vid=%i)." % (action.lease_haizea_id, vnode, vid))
232
                else:
233
                    self.logger.warning("ONE did not complete resume of L%iV%i on time. State is %i. (ONE vid=%i)" % (action.lease_haizea_id, vnode, state, vid))
234
                    result = 1
235
            except Exception, msg:
236
                raise OpenNebulaEnactmentError("vm.info", msg)
237

    
238
        return result        
239

    
240
class OpenNebulaDummyDeploymentEnactment(DeploymentEnactment):    
241
    def __init__(self):
242
        DeploymentEnactment.__init__(self)
243
            
244
    def get_aux_nodes(self):
245
        return [] 
246
            
247
    def resolve_to_file(self, lease_id, vnode, diskimage_id):
248
        return "/var/haizea/images/%s-L%iV%i" % (diskimage_id, lease_id, vnode)