Skip to content
GitLab
Explore
Sign in
Primary navigation
Search or go to…
Project
C
cc-node
Manage
Activity
Members
Code
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Deploy
Releases
Model registry
Analyze
Contributor analytics
Repository analytics
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
Mirror
cc-node
Commits
db7367b7
Commit
db7367b7
authored
12 years ago
by
Anael Beutot
Browse files
Options
Downloads
Patches
Plain Diff
Added hypervisor jobs for volume export/import.
These jobs are needed for VM migration/clone.
parent
95380942
No related branches found
Branches containing commit
No related tags found
Tags containing commit
No related merge requests found
Changes
1
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
ccnode/hypervisor/jobs.py
+238
-0
238 additions, 0 deletions
ccnode/hypervisor/jobs.py
with
238 additions
and
0 deletions
ccnode/hypervisor/jobs.py
0 → 100644
+
238
−
0
View file @
db7367b7
import
io
import
os
import
socket
import
logging
from
hashlib
import
md5
from
ccnode.jobs
import
BaseJob
logger
=
logging
.
getLogger
(
__name__
)
class
ImportVolume
(
BaseJob
):
"""
Import volume job.
"""
BUFFER_LEN
=
8192
*
16
HASH
=
md5
def
__init__
(
self
,
job_manager
,
ev_loop
,
volume
):
BaseJob
.
__init__
(
self
,
job_manager
,
ev_loop
)
self
.
checksum
=
None
self
.
volume
=
volume
# where the other node will connect
self
.
port
=
None
# fds
self
.
sock
=
None
self
.
client_sock
=
None
self
.
disk
=
None
def
clean_fds
(
self
):
if
self
.
sock
is
not
None
:
self
.
sock
.
close
()
self
.
sock
=
None
if
self
.
client_sock
is
not
None
:
self
.
client_sock
.
close
()
self
.
client_sock
=
None
if
self
.
disk
is
not
None
:
self
.
disk
.
close
()
self
.
disk
=
None
def
pre_job
(
self
):
"""
:returns: port number the socket is listening on
"""
# create socket
try
:
self
.
sock
=
socket
.
socket
(
socket
.
AF_INET
,
socket
.
SOCK_STREAM
)
except
socket
.
error
:
logger
.
exception
(
'
Error while creating socket for volume export
'
)
self
.
clean_fds
()
raise
try
:
self
.
sock
.
settimeout
(
10.
)
except
socket
.
error
:
logger
.
exception
(
'
Cannot set timeout on socket for volume export
'
)
self
.
clean_fds
()
raise
try
:
self
.
sock
.
bind
((
'
0.0.0.0
'
,
0
))
except
socket
.
error
:
logger
.
exception
(
'
Error while binding socket for volume export
'
)
self
.
clean_fds
()
raise
try
:
self
.
sock
.
listen
(
1
)
except
socket
.
error
:
logger
.
exception
(
'
Error while listening on socket
'
)
self
.
clean_fds
()
raise
# open local disk
try
:
self
.
disk
=
io
.
open
(
self
.
volume
.
path
,
'
wb
'
,
0
)
except
IOError
:
logger
.
exception
(
'
Error while trying to open local disk
'
)
self
.
clean_fds
()
raise
self
.
port
=
self
.
sock
.
getsockname
()[
1
]
return
self
.
port
def
run_job
(
self
):
# FIXME raised exceptions in this functions will be in the context of a
# thread that is not running in the sjRPC, therefore these won't be
# caught
try
:
self
.
client_sock
,
_
=
self
.
sock
.
accept
()
except
socket
.
timeout
:
logger
.
exception
(
'
Error for importing job: client did not connect
'
)
self
.
clean_fds
()
raise
except
socket
.
error
:
logger
.
exception
(
'
Error while accepting socket
'
)
self
.
clean_fds
()
raise
# close the listening socket
self
.
sock
.
close
()
self
.
sock
=
None
checksum
=
self
.
HASH
()
# start downloading disk image
while
self
.
running
:
try
:
received
=
[]
# keep a list of received buffers in order to do
# only one concatenation in the end
total_received
=
0
while
True
:
recv_buf
=
self
.
client_sock
.
recv
(
self
.
BUFFER_LEN
-
total_received
)
# logger.debug('Received %d', len(recv_buf))
if
not
recv_buf
:
# EOF
# in case received in not empty, we will come back here
# once again and it returns EOF one more time
break
total_received
+=
len
(
recv_buf
)
received
.
append
(
recv_buf
)
if
total_received
==
self
.
BUFFER_LEN
:
break
except
socket
.
error
:
logger
.
exception
(
'
Error while receiving disk image
'
)
self
.
clean_fds
()
raise
buffer_
=
b
''
.
join
(
received
)
if
not
buffer_
:
logger
.
debug
(
'
Received EOF import job
'
)
break
checksum
.
update
(
buffer_
)
try
:
written
=
0
# FIXME never write small chuncks
# in which case does disk.write would not write all the buffer ?
to_send
=
buffer_
while
True
:
written
+=
self
.
disk
.
write
(
to_send
)
# logger.debug('Written %s to disk', written)
to_send
=
buffer
(
buffer_
,
written
)
if
not
to_send
:
break
except
IOError
:
logger
.
exception
(
'
Error while writing image to disk
'
)
self
.
clean_fds
()
raise
# here we could not have received the full disk but we don't consider
# this as an error in the import part
self
.
checksum
=
checksum
.
hexdigest
()
# clean the fds
self
.
clean_fds
()
logger
.
debug
(
'
Volume import done
'
)
class
ExportVolume
(
BaseJob
):
"""
Export volume job.
"""
BUFFER_LEN
=
8192
*
16
HASH
=
md5
def
__init__
(
self
,
job_manager
,
ev_loop
,
volume
,
raddr
,
rport
):
"""
:param volume: :class:`Volume` instance
:param raddr: remote IP address
:param rport: remote TCP port
"""
BaseJob
.
__init__
(
self
,
job_manager
,
ev_loop
)
# where to connect to send the volume
self
.
raddr
=
raddr
self
.
rport
=
rport
self
.
volume
=
volume
self
.
checksum
=
None
# fds
self
.
sock
=
None
self
.
disk
=
None
def
clean_fds
(
self
):
if
self
.
sock
is
not
None
:
self
.
sock
.
close
()
self
.
sock
=
None
if
self
.
disk
is
not
None
:
self
.
disk
.
close
()
self
.
disk
=
None
def
pre_job
(
self
):
self
.
sock
=
socket
.
socket
(
socket
.
AF_INET
,
socket
.
SOCK_STREAM
)
# connect to the remote host
try
:
self
.
sock
.
connect
((
self
.
raddr
,
self
.
rport
))
except
socket
.
error
as
exc
:
logger
.
exception
(
'
Error while trying to connect to remote host %s
'
,
os
.
strerror
(
exc
.
errno
))
self
.
clean_fds
()
raise
# open local volume
try
:
self
.
disk
=
io
.
open
(
self
.
volume
.
path
,
'
rb
'
,
0
)
except
IOError
:
logger
.
exception
(
'
Error while opening disk for export job
'
)
self
.
clean_fds
()
raise
def
run_job
(
self
):
checksum
=
self
.
HASH
()
# sent_count = 0
# do copy
while
self
.
running
:
try
:
read
=
self
.
disk
.
read
(
self
.
BUFFER_LEN
)
except
IOError
:
logger
.
exception
(
'
Error while reading from disk
'
)
self
.
clean_fds
()
break
# read length may be less than BUFFER_LEN but we don't care as it
# will go over TCP
if
not
read
:
# end of file
# logger.debug('EOF, exported %d bytes', sent_count)
break
# sent_count += len(read)
# logger.debug('Read %d from disk', len(read))
checksum
.
update
(
read
)
try
:
self
.
sock
.
sendall
(
read
)
except
socket
.
error
:
logger
.
exception
(
'
Error while sending through socket
'
)
self
.
clean_fds
()
break
self
.
checksum
=
checksum
.
hexdigest
()
self
.
clean_fds
()
This diff is collapsed.
Click to expand it.
Preview
0%
Loading
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
register
or
sign in
to comment