Revision 676
Added by Borja Sotomayor over 14 years ago
trunk/src/haizea/core/manager.py | ||
---|---|---|
742 | 742 |
|
743 | 743 |
# Main loop |
744 | 744 |
while not self.done: |
745 |
# Check if there are any changes in the resource pool |
|
746 |
new_nodes = self.manager.scheduler.vm_scheduler.resourcepool.refresh_nodes() |
|
747 |
for n in new_nodes: |
|
748 |
rt = slottable.create_resource_tuple_from_capacity(n.capacity) |
|
749 |
slottable.add_node(n.id, rt) |
|
750 |
|
|
745 | 751 |
# Check to see if there are any leases which are ending prematurely. |
746 | 752 |
# Note that this is unique to simulation. |
747 | 753 |
prematureends = self.manager.scheduler.slottable.get_prematurely_ending_res(self.time) |
... | ... | |
923 | 929 |
# Main loop |
924 | 930 |
while not self.done: |
925 | 931 |
self.logger.status("Waking up to manage resources") |
926 |
|
|
932 |
|
|
927 | 933 |
# Save the waking time. We want to use a consistent time in the |
928 | 934 |
# resource manager operations (if we use now(), we'll get a different |
929 | 935 |
# time every time) |
... | ... | |
934 | 940 |
# Next schedulable time |
935 | 941 |
self.nextschedulable = round_datetime(self.lastwakeup + self.non_sched) |
936 | 942 |
|
943 |
# Check if there are any changes in the resource pool |
|
944 |
new_nodes = self.manager.scheduler.vm_scheduler.resourcepool.refresh_nodes() |
|
945 |
for n in new_nodes: |
|
946 |
rt = self.manager.scheduler.slottable.create_resource_tuple_from_capacity(n.capacity) |
|
947 |
self.manager.scheduler.slottable.add_node(n.id, rt) |
|
948 |
|
|
937 | 949 |
# Wake up the resource manager |
938 | 950 |
self.manager.process_ending_reservations(self.lastwakeup) |
939 | 951 |
self.manager.process_starting_reservations(self.lastwakeup) |
trunk/src/haizea/core/enact/opennebula.py | ||
---|---|---|
41 | 41 |
self.rpc = OpenNebulaXMLRPCClientSingleton().client |
42 | 42 |
|
43 | 43 |
# Get information about nodes from OpenNebula |
44 |
self.max_nod_id = 0 |
|
44 | 45 |
self.nodes = {} |
45 |
hosts = self.rpc.hostpool_info() |
|
46 |
for (i, host) in enumerate(hosts): |
|
47 |
if not host.state in (OpenNebulaHost.STATE_ERROR, OpenNebulaHost.STATE_DISABLED): |
|
48 |
nod_id = i+1 |
|
49 |
enact_id = host.id |
|
50 |
hostname = host.name |
|
51 |
capacity = Capacity([constants.RES_CPU, constants.RES_MEM, constants.RES_DISK]) |
|
52 |
|
|
53 |
# CPU |
|
54 |
# OpenNebula reports each CPU as "100" |
|
55 |
# (so, a 4-core machine is reported as "400") |
|
56 |
# We need to convert this to a multi-instance |
|
57 |
# resource type in Haizea |
|
58 |
cpu = host.max_cpu |
|
59 |
ncpu = cpu / 100 |
|
60 |
capacity.set_ninstances(constants.RES_CPU, ncpu) |
|
61 |
for i in range(ncpu): |
|
62 |
capacity.set_quantity_instance(constants.RES_CPU, i+1, 100) |
|
63 |
|
|
64 |
# Memory. Must divide by 1024 to obtain quantity in MB |
|
65 |
capacity.set_quantity(constants.RES_MEM, host.max_mem / 1024.0) |
|
66 |
|
|
67 |
# Disk |
|
68 |
# OpenNebula doesn't report this correctly yet. |
|
69 |
# We set it to an arbitrarily high value. |
|
70 |
capacity.set_quantity(constants.RES_DISK, 80000) |
|
71 |
|
|
72 |
node = ResourcePoolNode(nod_id, hostname, capacity) |
|
73 |
node.enactment_info = enact_id |
|
74 |
self.nodes[nod_id] = node |
|
75 |
|
|
46 |
|
|
76 | 47 |
self.resource_types = [] |
77 | 48 |
self.resource_types.append((constants.RES_CPU,1)) |
78 | 49 |
self.resource_types.append((constants.RES_MEM,1)) |
79 | 50 |
self.resource_types.append((constants.RES_DISK,1)) |
80 |
|
|
81 |
self.logger.info("Fetched %i nodes from OpenNebula" % len(self.nodes))
|
|
82 |
for n in self.nodes.values():
|
|
83 |
self.logger.debug("%i %s %s" % (n.id, n.hostname, n.capacity))
|
|
51 |
|
|
52 |
self.logger.info("Fetching nodes from OpenNebula")
|
|
53 |
self.__fetch_nodes()
|
|
54 |
self.logger.info("Fetched %i nodes from OpenNebula" % len(self.nodes))
|
|
84 | 55 |
|
56 |
def refresh(self): |
|
57 |
return self.__fetch_nodes() |
|
58 |
|
|
85 | 59 |
def get_nodes(self): |
86 | 60 |
return self.nodes |
87 | 61 |
|
... | ... | |
90 | 64 |
|
91 | 65 |
def get_bandwidth(self): |
92 | 66 |
return 0 |
67 |
|
|
68 |
def __fetch_nodes(self): |
|
69 |
new_nodes = [] |
|
70 |
hosts = self.rpc.hostpool_info() |
|
71 |
hostnames = set([n.hostname for n in self.nodes.values()]) |
|
72 |
for host in hosts: |
|
73 |
# CPU |
|
74 |
# OpenNebula reports each CPU as "100" |
|
75 |
# (so, a 4-core machine is reported as "400") |
|
76 |
# We need to convert this to a multi-instance |
|
77 |
# resource type in Haizea |
|
78 |
cpu = host.max_cpu |
|
79 |
ncpu = cpu / 100 |
|
80 |
enact_id = host.id |
|
81 |
hostname = host.name |
|
82 |
|
|
83 |
# We want to skip nodes we're already aware of ... |
|
84 |
if hostname in hostnames: |
|
85 |
continue |
|
93 | 86 |
|
87 |
# ... and those in an error or disabled state ... |
|
88 |
if host.state in (OpenNebulaHost.STATE_ERROR, OpenNebulaHost.STATE_DISABLED): |
|
89 |
continue |
|
90 |
|
|
91 |
# ... and those were monitoring information is not yet available. |
|
92 |
if cpu == 0: |
|
93 |
self.logger.debug("Skipping node '%s' (monitoring information not yet available)" % hostname) |
|
94 |
continue |
|
95 |
|
|
96 |
self.max_nod_id += 1 |
|
97 |
|
|
98 |
nod_id = self.max_nod_id |
|
99 |
capacity = Capacity([constants.RES_CPU, constants.RES_MEM, constants.RES_DISK]) |
|
100 |
|
|
101 |
capacity.set_ninstances(constants.RES_CPU, ncpu) |
|
102 |
for i in range(ncpu): |
|
103 |
capacity.set_quantity_instance(constants.RES_CPU, i+1, 100) |
|
104 |
|
|
105 |
# Memory. Must divide by 1024 to obtain quantity in MB |
|
106 |
capacity.set_quantity(constants.RES_MEM, host.max_mem / 1024.0) |
|
107 |
|
|
108 |
# Disk |
|
109 |
# OpenNebula doesn't report this correctly yet. |
|
110 |
# We set it to an arbitrarily high value. |
|
111 |
capacity.set_quantity(constants.RES_DISK, 80000) |
|
112 |
|
|
113 |
node = ResourcePoolNode(nod_id, hostname, capacity) |
|
114 |
node.enactment_info = enact_id |
|
115 |
self.nodes[nod_id] = node |
|
116 |
new_nodes.append(node) |
|
117 |
self.logger.debug("Fetched node %i %s %s" % (node.id, node.hostname, node.capacity)) |
|
118 |
return new_nodes |
|
119 |
|
|
120 |
|
|
94 | 121 |
class OpenNebulaVMEnactment(VMEnactment): |
95 | 122 |
def __init__(self): |
96 | 123 |
VMEnactment.__init__(self) |
trunk/src/haizea/core/enact/__init__.py | ||
---|---|---|
26 | 26 |
""" Returns the nodes in the resource pool. """ |
27 | 27 |
abstract() |
28 | 28 |
|
29 |
def refresh(self): |
|
30 |
abstract() |
|
31 |
|
|
29 | 32 |
def get_resource_types(self): |
30 | 33 |
abstract() |
31 | 34 |
|
trunk/src/haizea/core/enact/simulated.py | ||
---|---|---|
53 | 53 |
|
54 | 54 |
def get_nodes(self): |
55 | 55 |
return self.nodes |
56 |
|
|
57 |
def refresh(self): |
|
58 |
return [] |
|
56 | 59 |
|
57 | 60 |
def get_resource_types(self): |
58 | 61 |
return self.resource_types |
trunk/src/haizea/core/scheduler/vm_scheduler.py | ||
---|---|---|
330 | 330 |
use = r.vmrr.resources_in_pnode[node].get_by_type(constants.RES_CPU) |
331 | 331 |
util[type(r)] = use + util.setdefault(type(r),0.0) |
332 | 332 |
util[None] = total - sum(util.values()) |
333 |
for k in util: |
|
334 |
util[k] /= total |
|
333 |
|
|
334 |
if total != 0: |
|
335 |
for k in util: |
|
336 |
util[k] /= total |
|
335 | 337 |
|
336 | 338 |
return util |
337 | 339 |
|
trunk/src/haizea/core/scheduler/resourcepool.py | ||
---|---|---|
102 | 102 |
verify_resume_action.from_rr(rr) |
103 | 103 |
self.vm.verify_resume(verify_resume_action) |
104 | 104 |
|
105 |
def refresh_nodes(self): |
|
106 |
new_nodes = self.info.refresh() |
|
107 |
for node in new_nodes: |
|
108 |
self.nodes[node.id] = node |
|
109 |
return new_nodes |
|
110 |
|
|
105 | 111 |
def get_nodes(self): |
106 | 112 |
return self.nodes.values() |
107 | 113 |
|
Also available in: Unified diff
Fixed: Haizea would crash when fetching nodes for which OpenNebula had not yet collected monitoring information. This fix also has the pleasant side effect of allowing Haizea to detect new hosts added in OpenNebula without having to restart Haizea.