Project

General

Profile

root / trunk / src / haizea / core / enact / opennebula.py @ 641

1
# -------------------------------------------------------------------------- #
2
# Copyright 2006-2009, University of Chicago                                 #
3
# Copyright 2008-2009, Distributed Systems Architecture Group, Universidad   #
4
# Complutense de Madrid (dsa-research.org)                                   #
5
#                                                                            #
6
# Licensed under the Apache License, Version 2.0 (the "License"); you may    #
7
# not use this file except in compliance with the License. You may obtain    #
8
# a copy of the License at                                                   #
9
#                                                                            #
10
# http://www.apache.org/licenses/LICENSE-2.0                                 #
11
#                                                                            #
12
# Unless required by applicable law or agreed to in writing, software        #
13
# distributed under the License is distributed on an "AS IS" BASIS,          #
14
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.   #
15
# See the License for the specific language governing permissions and        #
16
# limitations under the License.                                             #
17
# -------------------------------------------------------------------------- #
18

    
19
from haizea.core.scheduler import EnactmentError
20
from haizea.core.leases import Capacity
21
from haizea.core.scheduler.resourcepool import ResourcePoolNode
22
from haizea.core.scheduler.slottable import ResourceTuple
23
from haizea.core.enact import ResourcePoolInfo, VMEnactment, DeploymentEnactment
24
from haizea.common.utils import get_config
25
from haizea.common.opennebula_xmlrpc import OpenNebulaXMLRPCClient, OpenNebulaVM, OpenNebulaHost
26
import haizea.common.constants as constants
27
import logging
28
from time import sleep
29

    
30
one_rpc = None
31

    
32
def get_one_xmlrpcclient():
33
    global one_rpc
34
    if one_rpc == None:
35
        host = get_config().get("one.host")
36
        port = get_config().get("one.port")
37
        user, passw = OpenNebulaXMLRPCClient.get_userpass_from_env()
38
        one_rpc = OpenNebulaXMLRPCClient(host, port, user, passw)
39
    return one_rpc
40

    
41
class OpenNebulaEnactmentError(EnactmentError):
42
    def __init__(self, method, msg):
43
        self.method = method
44
        self.msg = msg
45
        self.message = "Error when invoking '%s': %s" % (method, msg)
46

    
47
class OpenNebulaResourcePoolInfo(ResourcePoolInfo):
48
    
49
    def __init__(self):
50
        ResourcePoolInfo.__init__(self)
51
        self.logger = logging.getLogger("ENACT.ONE.INFO")
52

    
53
        self.rpc = get_one_xmlrpcclient()
54

    
55
        # Get information about nodes from OpenNebula
56
        self.nodes = {}
57
        hosts = self.rpc.hostpool_info()
58
        for (i, host) in enumerate(hosts):
59
            if not host.state in (OpenNebulaHost.STATE_ERROR, OpenNebulaHost.STATE_DISABLED):
60
                nod_id = i+1
61
                enact_id = host.id
62
                hostname = host.name
63
                capacity = Capacity([constants.RES_CPU, constants.RES_MEM, constants.RES_DISK])
64
                
65
                # CPU
66
                # OpenNebula reports each CPU as "100"
67
                # (so, a 4-core machine is reported as "400")
68
                # We need to convert this to a multi-instance
69
                # resource type in Haizea
70
                cpu = host.max_cpu
71
                ncpu = cpu / 100
72
                capacity.set_ninstances(constants.RES_CPU, ncpu)
73
                for i in range(ncpu):
74
                    capacity.set_quantity_instance(constants.RES_CPU, i+1, 100)            
75
                
76
                # Memory. Must divide by 1024 to obtain quantity in MB
77
                capacity.set_quantity(constants.RES_MEM, host.max_mem / 1024.0)
78
                
79
                # Disk
80
                # OpenNebula doesn't report this correctly yet.
81
                # We set it to an arbitrarily high value.
82
                capacity.set_quantity(constants.RES_DISK, 80000)
83
    
84
                node = ResourcePoolNode(nod_id, hostname, capacity)
85
                node.enactment_info = enact_id
86
                self.nodes[nod_id] = node
87
            
88
        self.resource_types = []
89
        self.resource_types.append((constants.RES_CPU,1))
90
        self.resource_types.append((constants.RES_MEM,1))
91
        self.resource_types.append((constants.RES_DISK,1))
92
            
93
        self.logger.info("Fetched %i nodes from OpenNebula" % len(self.nodes))
94
        for n in self.nodes.values():
95
            self.logger.debug("%i %s %s" % (n.id, n.hostname, n.capacity))
96
        
97
    def get_nodes(self):
98
        return self.nodes
99
    
100
    def get_resource_types(self):
101
        return self.resource_types
102

    
103
    def get_bandwidth(self):
104
        return 0
105

    
106
class OpenNebulaVMEnactment(VMEnactment):
107
    def __init__(self):
108
        VMEnactment.__init__(self)
109
        self.logger = logging.getLogger("ENACT.ONE.VM")
110
        self.rpc = get_one_xmlrpcclient()
111

    
112
    def start(self, action):
113
        for vnode in action.vnodes:
114
            # Unpack action
115
            vid = action.vnodes[vnode].enactment_info
116
            hid = action.vnodes[vnode].pnode
117
            
118
            self.logger.debug("Sending request to start VM for L%iV%i (ONE: vid=%i, hid=%i)"
119
                         % (action.lease_haizea_id, vnode, vid, hid))
120

    
121
            try:
122
                self.rpc.vm_deploy(vid, hid)
123
                self.logger.debug("Request succesful.")
124
            except Exception, msg:
125
                raise OpenNebulaEnactmentError("vm.deploy", msg)
126
            
127
    def stop(self, action):
128
        for vnode in action.vnodes:
129
            # Unpack action
130
            vid = action.vnodes[vnode].enactment_info
131
            
132
            self.logger.debug("Sending request to shutdown VM for L%iV%i (ONE: vid=%i)"
133
                         % (action.lease_haizea_id, vnode, vid))
134

    
135
            try:
136
                self.rpc.vm_shutdown(vid)
137
                self.logger.debug("Request succesful.")
138
            except Exception, msg:
139
                raise OpenNebulaEnactmentError("vm.shutdown", msg)
140
            
141
            # Space out commands to avoid OpenNebula from getting saturated
142
            # TODO: We should spawn out a thread to do this, so Haizea isn't
143
            # blocking until all these commands end
144
            interval = get_config().get("enactment-overhead").seconds
145
            sleep(interval)
146

    
147
    def suspend(self, action):
148
        for vnode in action.vnodes:
149
            # Unpack action
150
            vid = action.vnodes[vnode].enactment_info
151
            
152
            self.logger.debug("Sending request to suspend VM for L%iV%i (ONE: vid=%i)"
153
                         % (action.lease_haizea_id, vnode, vid))
154

    
155
            try:
156
                self.rpc.vm_suspend(vid)
157
                self.logger.debug("Request succesful.")
158
            except Exception, msg:
159
                raise OpenNebulaEnactmentError("vm.suspend", msg)
160
            
161
            # Space out commands to avoid OpenNebula from getting saturated
162
            # TODO: We should spawn out a thread to do this, so Haizea isn't
163
            # blocking until all these commands end
164
            interval = get_config().get("enactment-overhead").seconds
165
            sleep(interval)
166
        
167
    def resume(self, action):
168
        for vnode in action.vnodes:
169
            # Unpack action
170
            vid = action.vnodes[vnode].enactment_info
171
            
172
            self.logger.debug("Sending request to resume VM for L%iV%i (ONE: vid=%i)"
173
                         % (action.lease_haizea_id, vnode, vid))
174

    
175
            try:
176
                self.rpc.vm_resume(vid)
177
                self.logger.debug("Request succesful.")
178
            except Exception, msg:
179
                raise OpenNebulaEnactmentError("vm.resume", msg)
180
            
181
            # Space out commands to avoid OpenNebula from getting saturated
182
            # TODO: We should spawn out a thread to do this, so Haizea isn't
183
            # blocking until all these commands end
184
            interval = get_config().get("enactment-overhead").seconds
185
            sleep(interval)
186

    
187
    def verify_suspend(self, action):
188
        result = 0
189
        for vnode in action.vnodes:
190
            # Unpack action
191
            vid = action.vnodes[vnode].enactment_info
192
            
193
            try:
194
                vm = self.rpc.vm_info(vid)   
195
                state = vm.state
196
                if state == OpenNebulaVM.STATE_SUSPENDED:
197
                    self.logger.debug("Suspend of L%iV%i correct (ONE vid=%i)." % (action.lease_haizea_id, vnode, vid))
198
                else:
199
                    self.logger.warning("ONE did not complete suspend of L%iV%i on time. State is %i. (ONE vid=%i)" % (action.lease_haizea_id, vnode, state, vid))
200
                    result = 1
201
            except Exception, msg:
202
                raise OpenNebulaEnactmentError("vm.info", msg)
203

    
204
        return result
205
        
206
    def verify_resume(self, action):
207
        result = 0
208
        for vnode in action.vnodes:
209
            # Unpack action
210
            vid = action.vnodes[vnode].enactment_info
211
            
212
            try:
213
                vm = self.rpc.vm_info(vid)   
214
                state = vm.state
215
                if state == OpenNebulaVM.STATE_ACTIVE:
216
                    self.logger.debug("Resume of L%iV%i correct (ONE vid=%i)." % (action.lease_haizea_id, vnode, vid))
217
                else:
218
                    self.logger.warning("ONE did not complete resume of L%iV%i on time. State is %i. (ONE vid=%i)" % (action.lease_haizea_id, vnode, state, vid))
219
                    result = 1
220
            except Exception, msg:
221
                raise OpenNebulaEnactmentError("vm.info", msg)
222

    
223
        return result        
224

    
225
class OpenNebulaDummyDeploymentEnactment(DeploymentEnactment):    
226
    def __init__(self):
227
        DeploymentEnactment.__init__(self)
228
            
229
    def get_aux_nodes(self):
230
        return [] 
231
            
232
    def resolve_to_file(self, lease_id, vnode, diskimage_id):
233
        return "/var/haizea/images/%s-L%iV%i" % (diskimage_id, lease_id, vnode)