1
|
|
2
|
|
3
|
|
4
|
|
5
|
|
6
|
|
7
|
|
8
|
|
9
|
|
10
|
|
11
|
|
12
|
|
13
|
|
14
|
|
15
|
|
16
|
|
17
|
|
18
|
|
19
|
import haizea.common.constants as constants
|
20
|
from haizea.core.scheduler.preparation_schedulers import PreparationScheduler
|
21
|
from haizea.core.scheduler.slottable import ResourceReservation
|
22
|
from haizea.core.scheduler import MigrationResourceReservation
|
23
|
from haizea.core.leases import Lease, Capacity, UnmanagedSoftwareEnvironment
|
24
|
from haizea.core.scheduler import ReservationEventHandler, NotSchedulableException, EarliestStartingTime
|
25
|
from haizea.common.utils import estimate_transfer_time, get_config
|
26
|
from haizea.core.scheduler.slottable import ResourceTuple
|
27
|
from mx.DateTime import TimeDelta
|
28
|
|
29
|
import copy
|
30
|
import bisect
|
31
|
|
32
|
|
33
|
class ImageTransferPreparationScheduler(PreparationScheduler):
|
34
|
def __init__(self, slottable, resourcepool, deployment_enact):
|
35
|
PreparationScheduler.__init__(self, slottable, resourcepool, deployment_enact)
|
36
|
|
37
|
self.imagenode = self.deployment_enact.get_imagenode()
|
38
|
|
39
|
self.transfers = []
|
40
|
self.completed_transfers = []
|
41
|
|
42
|
config = get_config()
|
43
|
self.reusealg = config.get("diskimage-reuse")
|
44
|
if self.reusealg == constants.REUSE_IMAGECACHES:
|
45
|
self.maxcachesize = config.get("diskimage-cache-size")
|
46
|
else:
|
47
|
self.maxcachesize = None
|
48
|
|
49
|
self.imagenode_bandwidth = self.deployment_enact.get_bandwidth()
|
50
|
|
51
|
self.handlers ={}
|
52
|
self.handlers[FileTransferResourceReservation] = ReservationEventHandler(
|
53
|
sched = self,
|
54
|
on_start = ImageTransferPreparationScheduler._handle_start_filetransfer,
|
55
|
on_end = ImageTransferPreparationScheduler._handle_end_filetransfer)
|
56
|
|
57
|
self.handlers[DiskImageMigrationResourceReservation] = ReservationEventHandler(
|
58
|
sched = self,
|
59
|
on_start = ImageTransferPreparationScheduler._handle_start_migrate,
|
60
|
on_end = ImageTransferPreparationScheduler._handle_end_migrate)
|
61
|
|
62
|
def schedule(self, lease, vmrr, earliest):
|
63
|
if type(lease.software) == UnmanagedSoftwareEnvironment:
|
64
|
return [], True
|
65
|
if lease.get_type() == Lease.ADVANCE_RESERVATION:
|
66
|
return self.__schedule_deadline(lease, vmrr, earliest)
|
67
|
elif lease.get_type() in (Lease.BEST_EFFORT, Lease.IMMEDIATE):
|
68
|
return self.__schedule_asap(lease, vmrr, earliest)
|
69
|
|
70
|
def schedule_migration(self, lease, vmrr, nexttime):
|
71
|
if type(lease.software) == UnmanagedSoftwareEnvironment:
|
72
|
return []
|
73
|
|
74
|
|
75
|
|
76
|
last_vmrr = lease.get_last_vmrr()
|
77
|
vnode_migrations = dict([(vnode, (last_vmrr.nodes[vnode], vmrr.nodes[vnode])) for vnode in vmrr.nodes])
|
78
|
|
79
|
mustmigrate = False
|
80
|
for vnode in vnode_migrations:
|
81
|
if vnode_migrations[vnode][0] != vnode_migrations[vnode][1]:
|
82
|
mustmigrate = True
|
83
|
break
|
84
|
|
85
|
if not mustmigrate:
|
86
|
return []
|
87
|
|
88
|
if get_config().get("migration") == constants.MIGRATE_YES_NOTRANSFER:
|
89
|
start = nexttime
|
90
|
end = nexttime
|
91
|
res = {}
|
92
|
migr_rr = DiskImageMigrationResourceReservation(lease, start, end, res, vmrr, vnode_migrations)
|
93
|
migr_rr.state = ResourceReservation.STATE_SCHEDULED
|
94
|
return [migr_rr]
|
95
|
|
96
|
|
97
|
migrations = []
|
98
|
while len(vnode_migrations) > 0:
|
99
|
pnodes = set()
|
100
|
migration = {}
|
101
|
for vnode in vnode_migrations:
|
102
|
origin = vnode_migrations[vnode][0]
|
103
|
dest = vnode_migrations[vnode][1]
|
104
|
if not origin in pnodes and not dest in pnodes:
|
105
|
migration[vnode] = vnode_migrations[vnode]
|
106
|
pnodes.add(origin)
|
107
|
pnodes.add(dest)
|
108
|
for vnode in migration:
|
109
|
del vnode_migrations[vnode]
|
110
|
migrations.append(migration)
|
111
|
|
112
|
|
113
|
start = max(last_vmrr.post_rrs[-1].end, nexttime)
|
114
|
bandwidth = self.resourcepool.info.get_migration_bandwidth()
|
115
|
migr_rrs = []
|
116
|
for m in migrations:
|
117
|
mb_to_migrate = lease.software.image_size * len(m.keys())
|
118
|
migr_time = estimate_transfer_time(mb_to_migrate, bandwidth)
|
119
|
end = start + migr_time
|
120
|
res = {}
|
121
|
for (origin,dest) in m.values():
|
122
|
resorigin = Capacity([constants.RES_NETOUT])
|
123
|
resorigin.set_quantity(constants.RES_NETOUT, bandwidth)
|
124
|
resdest = Capacity([constants.RES_NETIN])
|
125
|
resdest.set_quantity(constants.RES_NETIN, bandwidth)
|
126
|
res[origin] = self.slottable.create_resource_tuple_from_capacity(resorigin)
|
127
|
res[dest] = self.slottable.create_resource_tuple_from_capacity(resdest)
|
128
|
migr_rr = DiskImageMigrationResourceReservation(lease, start, start + migr_time, res, vmrr, m)
|
129
|
migr_rr.state = ResourceReservation.STATE_SCHEDULED
|
130
|
migr_rrs.append(migr_rr)
|
131
|
start = end
|
132
|
|
133
|
return migr_rrs
|
134
|
|
135
|
def estimate_migration_time(self, lease):
|
136
|
migration = get_config().get("migration")
|
137
|
if migration == constants.MIGRATE_YES:
|
138
|
vmrr = lease.get_last_vmrr()
|
139
|
images_in_pnode = dict([(pnode,0) for pnode in set(vmrr.nodes.values())])
|
140
|
for (vnode,pnode) in vmrr.nodes.items():
|
141
|
images_in_pnode[pnode] += lease.software.image_size
|
142
|
max_to_transfer = max(images_in_pnode.values())
|
143
|
bandwidth = self.resourcepool.info.get_migration_bandwidth()
|
144
|
return estimate_transfer_time(max_to_transfer, bandwidth)
|
145
|
elif migration == constants.MIGRATE_YES_NOTRANSFER:
|
146
|
return TimeDelta(seconds=0)
|
147
|
|
148
|
def find_earliest_starting_times(self, lease, nexttime):
|
149
|
node_ids = [node.id for node in self.resourcepool.get_nodes()]
|
150
|
config = get_config()
|
151
|
mechanism = config.get("transfer-mechanism")
|
152
|
reusealg = config.get("diskimage-reuse")
|
153
|
avoidredundant = config.get("avoid-redundant-transfers")
|
154
|
|
155
|
if type(lease.software) == UnmanagedSoftwareEnvironment:
|
156
|
earliest = {}
|
157
|
for node in node_ids:
|
158
|
earliest[node] = EarliestStartingTime(nexttime, EarliestStartingTime.EARLIEST_NOPREPARATION)
|
159
|
return earliest
|
160
|
|
161
|
|
162
|
transfer_duration = self.__estimate_image_transfer_time(lease, self.imagenode_bandwidth)
|
163
|
if mechanism == constants.TRANSFER_UNICAST:
|
164
|
transfer_duration *= lease.numnodes
|
165
|
start = self.__get_next_transfer_slot(nexttime, transfer_duration)
|
166
|
earliest = {}
|
167
|
for node in node_ids:
|
168
|
earliest[node] = ImageTransferEarliestStartingTime(start + transfer_duration, ImageTransferEarliestStartingTime.EARLIEST_IMAGETRANSFER)
|
169
|
earliest[node].transfer_start = start
|
170
|
|
171
|
|
172
|
if reusealg == constants.REUSE_IMAGECACHES:
|
173
|
nodeswithimg = self.resourcepool.get_nodes_with_reusable_image(lease.software.image_id)
|
174
|
for node in nodeswithimg:
|
175
|
earliest[node].time = nexttime
|
176
|
earliest[node].type = ImageTransferEarliestStartingTime.EARLIEST_REUSE
|
177
|
|
178
|
|
179
|
|
180
|
if avoidredundant:
|
181
|
if mechanism == constants.TRANSFER_UNICAST:
|
182
|
|
183
|
|
184
|
pass
|
185
|
if mechanism == constants.TRANSFER_MULTICAST:
|
186
|
|
187
|
transfers = [t for t in self.transfers if t.state == ResourceReservation.STATE_SCHEDULED]
|
188
|
for t in transfers:
|
189
|
if t.file == lease.software.image_id:
|
190
|
start = t.end
|
191
|
if start > nexttime:
|
192
|
for n in earliest:
|
193
|
if start < earliest[n].time:
|
194
|
earliest[n].time = start
|
195
|
earliest[n].type = ImageTransferEarliestStartingTime.EARLIEST_PIGGYBACK
|
196
|
earliest[n].piggybacking_on = t
|
197
|
|
198
|
return earliest
|
199
|
|
200
|
def cancel_preparation(self, lease):
|
201
|
toremove = self.__remove_transfers(lease)
|
202
|
for t in toremove:
|
203
|
t.lease.remove_preparationrr(t)
|
204
|
self.slottable.remove_reservation(t)
|
205
|
self.__remove_files(lease)
|
206
|
|
207
|
def cleanup(self, lease):
|
208
|
self.__remove_files(lease)
|
209
|
|
210
|
|
211
|
def __schedule_deadline(self, lease, vmrr, earliest):
|
212
|
config = get_config()
|
213
|
reusealg = config.get("diskimage-reuse")
|
214
|
avoidredundant = config.get("avoid-redundant-transfers")
|
215
|
is_ready = False
|
216
|
|
217
|
musttransfer = {}
|
218
|
mustpool = {}
|
219
|
nodeassignment = vmrr.nodes
|
220
|
start = lease.start.requested
|
221
|
end = lease.start.requested + lease.duration.requested
|
222
|
for (vnode, pnode) in nodeassignment.items():
|
223
|
lease_id = lease.id
|
224
|
self.logger.debug("Scheduling image transfer of '%s' for vnode %i to physnode %i" % (lease.software.image_id, vnode, pnode))
|
225
|
|
226
|
if reusealg == constants.REUSE_IMAGECACHES:
|
227
|
if self.resourcepool.exists_reusable_image(pnode, lease.software.image_id, start):
|
228
|
self.logger.debug("No need to schedule an image transfer (reusing an image in pool)")
|
229
|
mustpool[vnode] = pnode
|
230
|
else:
|
231
|
self.logger.debug("Need to schedule a transfer.")
|
232
|
musttransfer[vnode] = pnode
|
233
|
else:
|
234
|
self.logger.debug("Need to schedule a transfer.")
|
235
|
musttransfer[vnode] = pnode
|
236
|
|
237
|
if len(musttransfer) == 0:
|
238
|
is_ready = True
|
239
|
else:
|
240
|
try:
|
241
|
transfer_rrs = self.__schedule_imagetransfer_edf(lease, musttransfer, earliest)
|
242
|
except NotSchedulableException, exc:
|
243
|
raise
|
244
|
|
245
|
|
246
|
|
247
|
if reusealg == constants.REUSE_IMAGECACHES:
|
248
|
for (vnode, pnode) in mustpool.items():
|
249
|
self.resourcepool.add_mapping_to_existing_reusable_image(pnode, lease.diskimage_id, lease.id, vnode, start)
|
250
|
self.resourcepool.add_diskimage(pnode, lease.diskimage_id, lease.diskimage_size, lease.id, vnode)
|
251
|
|
252
|
return transfer_rrs, is_ready
|
253
|
|
254
|
|
255
|
def __schedule_asap(self, lease, vmrr, earliest):
|
256
|
config = get_config()
|
257
|
reusealg = config.get("diskimage-reuse")
|
258
|
avoidredundant = config.get("avoid-redundant-transfers")
|
259
|
|
260
|
is_ready = False
|
261
|
|
262
|
transfer_rrs = []
|
263
|
musttransfer = {}
|
264
|
piggybacking = []
|
265
|
for (vnode, pnode) in vmrr.nodes.items():
|
266
|
earliest_type = earliest[pnode].type
|
267
|
if earliest_type == ImageTransferEarliestStartingTime.EARLIEST_REUSE:
|
268
|
|
269
|
self.logger.debug("Reusing image for V%i->P%i." % (vnode, pnode))
|
270
|
self.resourcepool.add_mapping_to_existing_reusable_image(pnode, lease.software.image_id, lease.id, vnode, vmrr.end)
|
271
|
self.resourcepool.add_diskimage(pnode, lease.software.image_id, lease.software.image_size, lease.id, vnode)
|
272
|
elif earliest_type == ImageTransferEarliestStartingTime.EARLIEST_PIGGYBACK:
|
273
|
|
274
|
transfer_rr = earliest[pnode].piggybacking_on
|
275
|
transfer_rr.piggyback(lease.id, vnode, pnode)
|
276
|
self.logger.debug("Piggybacking transfer for V%i->P%i on existing transfer in lease %i." % (vnode, pnode, transfer_rr.lease.id))
|
277
|
piggybacking.append(transfer_rr)
|
278
|
else:
|
279
|
|
280
|
musttransfer[vnode] = pnode
|
281
|
self.logger.debug("Must transfer V%i->P%i." % (vnode, pnode))
|
282
|
|
283
|
if len(musttransfer)>0:
|
284
|
transfer_rrs = self.__schedule_imagetransfer_fifo(lease, musttransfer, earliest)
|
285
|
|
286
|
if len(musttransfer)==0 and len(piggybacking)==0:
|
287
|
is_ready = True
|
288
|
|
289
|
return transfer_rrs, is_ready
|
290
|
|
291
|
|
292
|
def __schedule_imagetransfer_edf(self, lease, musttransfer, earliest):
|
293
|
|
294
|
bandwidth = self.deployment_enact.get_bandwidth()
|
295
|
config = get_config()
|
296
|
mechanism = config.get("transfer-mechanism")
|
297
|
transfer_duration = self.__estimate_image_transfer_time(lease, bandwidth)
|
298
|
if mechanism == constants.TRANSFER_UNICAST:
|
299
|
transfer_duration *= len(musttransfer)
|
300
|
|
301
|
|
302
|
start = self.__get_last_transfer_slot(lease.start.requested, transfer_duration)
|
303
|
|
304
|
res = {}
|
305
|
resimgnode = Capacity([constants.RES_NETOUT])
|
306
|
resimgnode.set_quantity(constants.RES_NETOUT, bandwidth)
|
307
|
resnode = Capacity([constants.RES_NETIN])
|
308
|
resnode.set_quantity(constants.RES_NETIN, bandwidth)
|
309
|
res[self.imagenode.id] = self.slottable.create_resource_tuple_from_capacity(resimgnode)
|
310
|
for pnode in musttransfer.values():
|
311
|
res[pnode] = self.slottable.create_resource_tuple_from_capacity(resnode)
|
312
|
|
313
|
newtransfer = FileTransferResourceReservation(lease, res)
|
314
|
newtransfer.deadline = lease.start.requested
|
315
|
newtransfer.state = ResourceReservation.STATE_SCHEDULED
|
316
|
newtransfer.file = lease.software.image_id
|
317
|
newtransfer.start = start
|
318
|
newtransfer.end = start + transfer_duration
|
319
|
for vnode, pnode in musttransfer.items():
|
320
|
newtransfer.piggyback(lease.id, vnode, pnode)
|
321
|
|
322
|
bisect.insort(self.transfers, newtransfer)
|
323
|
|
324
|
return [newtransfer]
|
325
|
|
326
|
def __schedule_imagetransfer_fifo(self, lease, musttransfer, earliest):
|
327
|
|
328
|
bandwidth = self.imagenode_bandwidth
|
329
|
config = get_config()
|
330
|
mechanism = config.get("transfer-mechanism")
|
331
|
|
332
|
|
333
|
|
334
|
pnodes = musttransfer.values()
|
335
|
start = earliest[pnodes[0]].transfer_start
|
336
|
transfer_duration = self.__estimate_image_transfer_time(lease, bandwidth)
|
337
|
|
338
|
res = {}
|
339
|
resimgnode = Capacity([constants.RES_NETOUT])
|
340
|
resimgnode.set_quantity(constants.RES_NETOUT, bandwidth)
|
341
|
resnode = Capacity([constants.RES_NETIN])
|
342
|
resnode.set_quantity(constants.RES_NETIN, bandwidth)
|
343
|
res[self.imagenode.id] = self.slottable.create_resource_tuple_from_capacity(resimgnode)
|
344
|
for n in musttransfer.values():
|
345
|
res[n] = self.slottable.create_resource_tuple_from_capacity(resnode)
|
346
|
|
347
|
newtransfer = FileTransferResourceReservation(lease, res)
|
348
|
newtransfer.start = start
|
349
|
if mechanism == constants.TRANSFER_UNICAST:
|
350
|
newtransfer.end = start + (len(musttransfer) * transfer_duration)
|
351
|
if mechanism == constants.TRANSFER_MULTICAST:
|
352
|
newtransfer.end = start + transfer_duration
|
353
|
|
354
|
newtransfer.deadline = None
|
355
|
newtransfer.state = ResourceReservation.STATE_SCHEDULED
|
356
|
newtransfer.file = lease.software.image_id
|
357
|
for vnode, pnode in musttransfer.items():
|
358
|
newtransfer.piggyback(lease.id, vnode, pnode)
|
359
|
|
360
|
bisect.insort(self.transfers, newtransfer)
|
361
|
|
362
|
return [newtransfer]
|
363
|
|
364
|
|
365
|
def __estimate_image_transfer_time(self, lease, bandwidth):
|
366
|
config = get_config()
|
367
|
force_transfer_time = config.get("force-imagetransfer-time")
|
368
|
if force_transfer_time != None:
|
369
|
return force_transfer_time
|
370
|
else:
|
371
|
return estimate_transfer_time(lease.software.image_size, bandwidth)
|
372
|
|
373
|
|
374
|
def __get_next_transfer_slot(self, nexttime, required_duration):
|
375
|
|
376
|
|
377
|
if len(self.transfers) == 0:
|
378
|
return nexttime
|
379
|
elif nexttime + required_duration <= self.transfers[0].start:
|
380
|
return nexttime
|
381
|
else:
|
382
|
for i in xrange(len(self.transfers) - 1):
|
383
|
if self.transfers[i].end != self.transfers[i+1].start:
|
384
|
hole_duration = self.transfers[i+1].start - self.transfers[i].end
|
385
|
if hole_duration >= required_duration:
|
386
|
return self.transfers[i].end
|
387
|
return self.transfers[-1].end
|
388
|
|
389
|
|
390
|
def __get_last_transfer_slot(self, deadline, required_duration):
|
391
|
|
392
|
|
393
|
if len(self.transfers) == 0:
|
394
|
return deadline - required_duration
|
395
|
elif self.transfers[-1].end + required_duration <= deadline:
|
396
|
return deadline - required_duration
|
397
|
else:
|
398
|
for i in xrange(len(self.transfers) - 1, 0, -1):
|
399
|
if self.transfers[i].start != self.transfer[i-1].end:
|
400
|
hole_duration = self.transfer[i].start - self.transfers[i-1].end
|
401
|
if hole_duration >= required_duration:
|
402
|
return self.transfer[i].start - required_duration
|
403
|
return self.transfers[0].start - required_duration
|
404
|
|
405
|
def __remove_transfers(self, lease):
|
406
|
toremove = []
|
407
|
for t in self.transfers:
|
408
|
for pnode in t.transfers:
|
409
|
leases = [l for l, v in t.transfers[pnode]]
|
410
|
if lease in leases:
|
411
|
newtransfers = [(l, v) for l, v in t.transfers[pnode] if l!=lease]
|
412
|
t.transfers[pnode] = newtransfers
|
413
|
|
414
|
a = sum([len(l) for l in t.transfers.values()])
|
415
|
if a == 0:
|
416
|
toremove.append(t)
|
417
|
for t in toremove:
|
418
|
self.transfers.remove(t)
|
419
|
|
420
|
return toremove
|
421
|
|
422
|
def __remove_files(self, lease):
|
423
|
for vnode, pnode in lease.get_last_vmrr().nodes.items():
|
424
|
self.resourcepool.remove_diskimage(pnode, lease.id, vnode)
|
425
|
|
426
|
@staticmethod
|
427
|
def _handle_start_filetransfer(sched, lease, rr):
|
428
|
sched.logger.debug("LEASE-%i Start of handleStartFileTransfer" % lease.id)
|
429
|
lease.print_contents()
|
430
|
lease_state = lease.get_state()
|
431
|
if lease_state == Lease.STATE_SCHEDULED or lease_state == Lease.STATE_READY:
|
432
|
lease.set_state(Lease.STATE_PREPARING)
|
433
|
rr.state = ResourceReservation.STATE_ACTIVE
|
434
|
|
435
|
else:
|
436
|
raise InconsistentLeaseStateError(l, doing = "starting a file transfer")
|
437
|
|
438
|
|
439
|
|
440
|
lease.print_contents()
|
441
|
sched.logger.debug("LEASE-%i End of handleStartFileTransfer" % lease.id)
|
442
|
sched.logger.info("Starting image transfer for lease %i" % (lease.id))
|
443
|
|
444
|
@staticmethod
|
445
|
def _handle_end_filetransfer(sched, lease, rr):
|
446
|
sched.logger.debug("LEASE-%i Start of handleEndFileTransfer" % lease.id)
|
447
|
lease.print_contents()
|
448
|
lease_state = lease.get_state()
|
449
|
if lease_state == Lease.STATE_PREPARING:
|
450
|
lease.set_state(Lease.STATE_READY)
|
451
|
rr.state = ResourceReservation.STATE_DONE
|
452
|
for physnode in rr.transfers:
|
453
|
vnodes = rr.transfers[physnode]
|
454
|
|
455
|
|
456
|
|
457
|
|
458
|
|
459
|
|
460
|
|
461
|
|
462
|
|
463
|
|
464
|
maxend = None
|
465
|
|
466
|
sched._add_diskimages(physnode, rr.file, lease.software.image_size, vnodes, timeout=maxend)
|
467
|
else:
|
468
|
raise InconsistentLeaseStateError(l, doing = "ending a file transfer")
|
469
|
|
470
|
sched.transfers.remove(rr)
|
471
|
lease.print_contents()
|
472
|
sched.logger.debug("LEASE-%i End of handleEndFileTransfer" % lease.id)
|
473
|
sched.logger.info("Completed image transfer for lease %i" % (lease.id))
|
474
|
|
475
|
def _handle_start_migrate(self, l, rr):
|
476
|
self.logger.debug("LEASE-%i Start of handleStartMigrate" % l.id)
|
477
|
l.print_contents()
|
478
|
rr.state = ResourceReservation.STATE_ACTIVE
|
479
|
l.print_contents()
|
480
|
self.logger.debug("LEASE-%i End of handleStartMigrate" % l.id)
|
481
|
self.logger.info("Migrating lease %i..." % (l.id))
|
482
|
|
483
|
def _handle_end_migrate(self, l, rr):
|
484
|
self.logger.debug("LEASE-%i Start of handleEndMigrate" % l.id)
|
485
|
l.print_contents()
|
486
|
|
487
|
for vnode in rr.transfers:
|
488
|
origin = rr.transfers[vnode][0]
|
489
|
dest = rr.transfers[vnode][1]
|
490
|
|
491
|
self.resourcepool.remove_diskimage(origin, l.id, vnode)
|
492
|
self.resourcepool.add_diskimage(dest, l.software.image_id, l.software.image_size, l.id, vnode)
|
493
|
|
494
|
rr.state = ResourceReservation.STATE_DONE
|
495
|
l.print_contents()
|
496
|
self.logger.debug("LEASE-%i End of handleEndMigrate" % l.id)
|
497
|
self.logger.info("Migrated lease %i..." % (l.id))
|
498
|
|
499
|
def _add_diskimages(self, pnode_id, diskimage_id, diskimage_size, vnodes, timeout):
|
500
|
self.logger.debug("Adding image for leases=%s in nod_id=%i" % (vnodes, pnode_id))
|
501
|
|
502
|
pnode = self.resourcepool.get_node(pnode_id)
|
503
|
|
504
|
if self.reusealg == constants.REUSE_NONE:
|
505
|
for (lease_id, vnode) in vnodes:
|
506
|
self.resourcepool.add_diskimage(pnode_id, diskimage_id, diskimage_size, lease_id, vnode)
|
507
|
elif self.reusealg == constants.REUSE_IMAGECACHES:
|
508
|
|
509
|
|
510
|
|
511
|
if pnode.exists_reusable_image(diskimage_id):
|
512
|
for (lease_id, vnode) in vnodes:
|
513
|
pnode.add_mapping_to_existing_reusable_image(diskimage_id, lease_id, vnode, timeout)
|
514
|
else:
|
515
|
if self.maxcachesize == constants.CACHESIZE_UNLIMITED:
|
516
|
can_add_to_cache = True
|
517
|
else:
|
518
|
|
519
|
cachesize = pnode.get_reusable_images_size()
|
520
|
reqsize = cachesize + diskimage_size
|
521
|
if reqsize > self.maxcachesize:
|
522
|
|
523
|
desiredsize = self.maxcachesize - diskimage_size
|
524
|
self.logger.debug("Adding the image would make the size of pool in node %i = %iMB. Will try to bring it down to %i" % (pnode_id, reqsize, desiredsize))
|
525
|
pnode.print_files()
|
526
|
success = pnode.purge_downto(self.maxcachesize)
|
527
|
if not success:
|
528
|
can_add_to_cache = False
|
529
|
else:
|
530
|
can_add_to_cache = True
|
531
|
else:
|
532
|
can_add_to_cache = True
|
533
|
|
534
|
if can_add_to_cache:
|
535
|
self.resourcepool.add_reusable_image(pnode_id, diskimage_id, diskimage_size, vnodes, timeout)
|
536
|
else:
|
537
|
|
538
|
|
539
|
self.logger.debug("Unable to add to pool. Must create individual disk images directly instead.")
|
540
|
|
541
|
|
542
|
|
543
|
for (lease_id, vnode) in vnodes:
|
544
|
self.resourcepool.add_diskimage(pnode_id, diskimage_id, diskimage_size, lease_id, vnode)
|
545
|
|
546
|
pnode.print_files()
|
547
|
|
548
|
class FileTransferResourceReservation(ResourceReservation):
|
549
|
def __init__(self, lease, res, start=None, end=None):
|
550
|
ResourceReservation.__init__(self, lease, start, end, res)
|
551
|
self.deadline = None
|
552
|
self.file = None
|
553
|
|
554
|
self.transfers = {}
|
555
|
|
556
|
def print_contents(self, loglevel="VDEBUG"):
|
557
|
ResourceReservation.print_contents(self, loglevel)
|
558
|
self.logger.log(loglevel, "Type : FILE TRANSFER")
|
559
|
self.logger.log(loglevel, "Deadline : %s" % self.deadline)
|
560
|
self.logger.log(loglevel, "File : %s" % self.file)
|
561
|
self.logger.log(loglevel, "Transfers : %s" % self.transfers)
|
562
|
|
563
|
def piggyback(self, lease_id, vnode, physnode):
|
564
|
if self.transfers.has_key(physnode):
|
565
|
self.transfers[physnode].append((lease_id, vnode))
|
566
|
else:
|
567
|
self.transfers[physnode] = [(lease_id, vnode)]
|
568
|
|
569
|
def is_preemptible(self):
|
570
|
return False
|
571
|
|
572
|
def __cmp__(self, rr):
|
573
|
return cmp(self.start, rr.start)
|
574
|
|
575
|
class ImageTransferEarliestStartingTime(EarliestStartingTime):
|
576
|
EARLIEST_IMAGETRANSFER = 2
|
577
|
EARLIEST_REUSE = 3
|
578
|
EARLIEST_PIGGYBACK = 4
|
579
|
|
580
|
def __init__(self, time, type):
|
581
|
EarliestStartingTime.__init__(self, time, type)
|
582
|
self.transfer_start = None
|
583
|
self.piggybacking_on = None
|
584
|
|
585
|
class DiskImageMigrationResourceReservation(MigrationResourceReservation):
|
586
|
def __init__(self, lease, start, end, res, vmrr, transfers):
|
587
|
MigrationResourceReservation.__init__(self, lease, start, end, res, vmrr, transfers)
|
588
|
|
589
|
def print_contents(self, loglevel=constants.LOGLEVEL_VDEBUG):
|
590
|
self.logger.log(loglevel, "Type : DISK IMAGE MIGRATION")
|
591
|
self.logger.log(loglevel, "Transfers : %s" % self.transfers)
|
592
|
ResourceReservation.print_contents(self, loglevel)
|