]>
Commit | Line | Data |
---|---|---|
6278ae03 DB |
1 | #!/usr/bin/python |
2 | # | |
3 | # Copyright (C) 2016 Red Hat, Inc. | |
4 | # | |
5 | # This program is free software; you can redistribute it and/or modify | |
6 | # it under the terms of the GNU General Public License as published by | |
7 | # the Free Software Foundation; either version 2 of the License, or | |
8 | # (at your option) any later version. | |
9 | # | |
10 | # This program is distributed in the hope that it will be useful, | |
11 | # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
12 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
13 | # GNU General Public License for more details. | |
14 | # | |
15 | # You should have received a copy of the GNU General Public License | |
16 | # along with this program. If not, see <http://www.gnu.org/licenses/>. | |
17 | # | |
18 | # Creator/Owner: Daniel P. Berrange <[email protected]> | |
19 | # | |
20 | # Exercise the QEMU 'luks' block driver to validate interoperability | |
21 | # with the Linux dm-crypt + cryptsetup implementation | |
22 | ||
23 | import subprocess | |
24 | import os | |
25 | import os.path | |
26 | ||
27 | import base64 | |
28 | ||
29 | import iotests | |
30 | ||
31 | ||
32 | class LUKSConfig(object): | |
33 | """Represent configuration parameters for a single LUKS | |
34 | setup to be tested""" | |
35 | ||
36 | def __init__(self, name, cipher, keylen, mode, ivgen, | |
37 | ivgen_hash, hash, password=None, passwords=None): | |
38 | ||
39 | self.name = name | |
40 | self.cipher = cipher | |
41 | self.keylen = keylen | |
42 | self.mode = mode | |
43 | self.ivgen = ivgen | |
44 | self.ivgen_hash = ivgen_hash | |
45 | self.hash = hash | |
46 | ||
47 | if passwords is not None: | |
48 | self.passwords = passwords | |
49 | else: | |
50 | self.passwords = {} | |
51 | ||
52 | if password is None: | |
53 | self.passwords["0"] = "123456" | |
54 | else: | |
55 | self.passwords["0"] = password | |
56 | ||
57 | def __repr__(self): | |
58 | return self.name | |
59 | ||
60 | def image_name(self): | |
61 | return "luks-%s.img" % self.name | |
62 | ||
63 | def image_path(self): | |
64 | return os.path.join(iotests.test_dir, self.image_name()) | |
65 | ||
66 | def device_name(self): | |
67 | return "qiotest-145-%s" % self.name | |
68 | ||
69 | def device_path(self): | |
70 | return "/dev/mapper/" + self.device_name() | |
71 | ||
72 | def first_password(self): | |
73 | for i in range(8): | |
74 | slot = str(i) | |
75 | if slot in self.passwords: | |
76 | return (self.passwords[slot], slot) | |
77 | raise Exception("No password found") | |
78 | ||
79 | def first_password_base64(self): | |
80 | (pw, slot) = self.first_password() | |
81 | return base64.b64encode(pw) | |
82 | ||
83 | def active_slots(self): | |
84 | slots = [] | |
85 | for i in range(8): | |
86 | slot = str(i) | |
87 | if slot in self.passwords: | |
88 | slots.append(slot) | |
89 | return slots | |
90 | ||
91 | def verify_passwordless_sudo(): | |
92 | """Check whether sudo is configured to allow | |
93 | password-less access to commands""" | |
94 | ||
95 | args = ["sudo", "-n", "/bin/true"] | |
96 | ||
97 | proc = subprocess.Popen(args, | |
98 | stdin=subprocess.PIPE, | |
99 | stdout=subprocess.PIPE, | |
100 | stderr=subprocess.STDOUT) | |
101 | ||
102 | msg = proc.communicate()[0] | |
103 | ||
104 | if proc.returncode != 0: | |
105 | iotests.notrun('requires password-less sudo access: %s' % msg) | |
106 | ||
107 | ||
108 | def cryptsetup(args, password=None): | |
109 | """Run the cryptsetup command in batch mode""" | |
110 | ||
111 | fullargs = ["sudo", "cryptsetup", "-q", "-v"] | |
112 | fullargs.extend(args) | |
113 | ||
114 | iotests.log(" ".join(fullargs), filters=[iotests.filter_test_dir]) | |
115 | proc = subprocess.Popen(fullargs, | |
116 | stdin=subprocess.PIPE, | |
117 | stdout=subprocess.PIPE, | |
118 | stderr=subprocess.STDOUT) | |
119 | ||
120 | msg = proc.communicate(password)[0] | |
121 | ||
122 | if proc.returncode != 0: | |
123 | raise Exception(msg) | |
124 | ||
125 | ||
126 | def cryptsetup_add_password(config, slot): | |
127 | """Add another password to a LUKS key slot""" | |
128 | ||
129 | (password, mainslot) = config.first_password() | |
130 | ||
131 | pwfile = os.path.join(iotests.test_dir, "passwd.txt") | |
132 | with open(pwfile, "w") as fh: | |
133 | fh.write(config.passwords[slot]) | |
134 | ||
135 | try: | |
136 | args = ["luksAddKey", config.image_path(), | |
137 | "--key-slot", slot, | |
138 | "--key-file", "-", | |
139 | pwfile] | |
140 | ||
141 | cryptsetup(args, password) | |
142 | finally: | |
143 | os.unlink(pwfile) | |
144 | ||
145 | ||
146 | def cryptsetup_format(config): | |
147 | """Format a new LUKS volume with cryptsetup, adding the | |
148 | first key slot only""" | |
149 | ||
150 | (password, slot) = config.first_password() | |
151 | ||
152 | args = ["luksFormat"] | |
153 | cipher = config.cipher + "-" + config.mode + "-" + config.ivgen | |
154 | if config.ivgen_hash is not None: | |
155 | cipher = cipher + ":" + config.ivgen_hash | |
156 | args.extend(["--cipher", cipher]) | |
157 | if config.mode == "xts": | |
158 | args.extend(["--key-size", str(config.keylen * 2)]) | |
159 | else: | |
160 | args.extend(["--key-size", str(config.keylen)]) | |
161 | if config.hash is not None: | |
162 | args.extend(["--hash", config.hash]) | |
163 | args.extend(["--key-slot", slot]) | |
164 | args.extend(["--key-file", "-"]) | |
165 | args.append(config.image_path()) | |
166 | ||
167 | cryptsetup(args, password) | |
168 | ||
169 | ||
170 | def chown(config): | |
171 | """Set the ownership of a open LUKS device to this user""" | |
172 | ||
173 | path = config.device_path() | |
174 | ||
175 | args = ["sudo", "chown", "%d:%d" % (os.getuid(), os.getgid()), path] | |
176 | iotests.log(" ".join(args), filters=[iotests.filter_chown]) | |
177 | proc = subprocess.Popen(args, | |
178 | stdin=subprocess.PIPE, | |
179 | stdout=subprocess.PIPE, | |
180 | stderr=subprocess.STDOUT) | |
181 | ||
182 | msg = proc.communicate()[0] | |
183 | ||
184 | if proc.returncode != 0: | |
185 | raise Exception("Cannot change owner on %s" % path) | |
186 | ||
187 | ||
188 | def cryptsetup_open(config): | |
189 | """Open an image as a LUKS device""" | |
190 | ||
191 | (password, slot) = config.first_password() | |
192 | ||
193 | args = ["luksOpen", config.image_path(), config.device_name()] | |
194 | ||
195 | cryptsetup(args, password) | |
196 | ||
197 | ||
198 | def cryptsetup_close(config): | |
199 | """Close an active LUKS device """ | |
200 | ||
201 | args = ["luksClose", config.device_name()] | |
202 | cryptsetup(args) | |
203 | ||
204 | ||
205 | def delete_image(config): | |
206 | """Delete a disk image""" | |
207 | ||
208 | try: | |
209 | os.unlink(config.image_path()) | |
210 | iotests.log("unlink %s" % config.image_path(), | |
211 | filters=[iotests.filter_test_dir]) | |
212 | except Exception as e: | |
213 | pass | |
214 | ||
215 | ||
216 | def create_image(config, size_mb): | |
217 | """Create a bare disk image with requested size""" | |
218 | ||
219 | delete_image(config) | |
220 | iotests.log("truncate %s --size %dMB" % (config.image_path(), size_mb), | |
221 | filters=[iotests.filter_test_dir]) | |
222 | with open(config.image_path(), "w") as fn: | |
223 | fn.truncate(size_mb * 1024 * 1024) | |
224 | ||
225 | ||
226 | def qemu_img_create(config, size_mb): | |
227 | """Create and format a disk image with LUKS using qemu-img""" | |
228 | ||
229 | opts = [ | |
230 | "key-secret=sec0", | |
231 | "cipher-alg=%s-%d" % (config.cipher, config.keylen), | |
232 | "cipher-mode=%s" % config.mode, | |
233 | "ivgen-alg=%s" % config.ivgen, | |
234 | "hash-alg=%s" % config.hash, | |
235 | ] | |
236 | if config.ivgen_hash is not None: | |
237 | opts.append("ivgen-hash-alg=%s" % config.ivgen_hash) | |
238 | ||
239 | args = ["create", "-f", "luks", | |
240 | "--object", | |
241 | ("secret,id=sec0,data=%s,format=base64" % | |
242 | config.first_password_base64()), | |
243 | "-o", ",".join(opts), | |
244 | config.image_path(), | |
245 | "%dM" % size_mb] | |
246 | ||
247 | iotests.log("qemu-img " + " ".join(args), filters=[iotests.filter_test_dir]) | |
248 | iotests.log(iotests.qemu_img_pipe(*args), filters=[iotests.filter_test_dir]) | |
249 | ||
250 | def qemu_io_image_args(config, dev=False): | |
251 | """Get the args for access an image or device with qemu-io""" | |
252 | ||
253 | if dev: | |
254 | return [ | |
255 | "--image-opts", | |
256 | "driver=file,filename=%s" % config.device_path()] | |
257 | else: | |
258 | return [ | |
259 | "--object", | |
260 | ("secret,id=sec0,data=%s,format=base64" % | |
261 | config.first_password_base64()), | |
262 | "--image-opts", | |
263 | ("driver=luks,key-secret=sec0,file.filename=%s" % | |
264 | config.image_path())] | |
265 | ||
266 | def qemu_io_write_pattern(config, pattern, offset_mb, size_mb, dev=False): | |
267 | """Write a pattern of data to a LUKS image or device""" | |
268 | ||
269 | args = ["-c", "write -P 0x%x %dM %dM" % (pattern, offset_mb, size_mb)] | |
270 | args.extend(qemu_io_image_args(config, dev)) | |
271 | iotests.log("qemu-io " + " ".join(args), filters=[iotests.filter_test_dir]) | |
272 | iotests.log(iotests.qemu_io(*args), filters=[iotests.filter_test_dir, | |
273 | iotests.filter_qemu_io]) | |
274 | ||
275 | ||
276 | def qemu_io_read_pattern(config, pattern, offset_mb, size_mb, dev=False): | |
277 | """Read a pattern of data to a LUKS image or device""" | |
278 | ||
279 | args = ["-c", "read -P 0x%x %dM %dM" % (pattern, offset_mb, size_mb)] | |
280 | args.extend(qemu_io_image_args(config, dev)) | |
281 | iotests.log("qemu-io " + " ".join(args), filters=[iotests.filter_test_dir]) | |
282 | iotests.log(iotests.qemu_io(*args), filters=[iotests.filter_test_dir, | |
283 | iotests.filter_qemu_io]) | |
284 | ||
285 | ||
286 | def test_once(config, qemu_img=False): | |
287 | """Run the test with a desired LUKS configuration. Can either | |
288 | use qemu-img for creating the initial volume, or cryptsetup, | |
289 | in order to test interoperability in both directions""" | |
290 | ||
291 | iotests.log("# ================= %s %s =================" % ( | |
292 | "qemu-img" if qemu_img else "dm-crypt", config)) | |
293 | ||
294 | oneKB = 1024 | |
295 | oneMB = oneKB * 1024 | |
296 | oneGB = oneMB * 1024 | |
297 | oneTB = oneGB * 1024 | |
298 | ||
299 | # 4 TB, so that we pass the 32-bit sector number boundary. | |
300 | # Important for testing correctness of some IV generators | |
301 | # The files are sparse, so not actually using this much space | |
302 | image_size = 4 * oneTB | |
303 | if qemu_img: | |
304 | iotests.log("# Create image") | |
305 | qemu_img_create(config, image_size / oneMB) | |
306 | else: | |
307 | iotests.log("# Create image") | |
308 | create_image(config, image_size / oneMB) | |
309 | ||
310 | lowOffsetMB = 100 | |
311 | highOffsetMB = 3 * oneTB / oneMB | |
312 | ||
313 | try: | |
314 | if not qemu_img: | |
315 | iotests.log("# Format image") | |
316 | cryptsetup_format(config) | |
317 | ||
318 | for slot in config.active_slots()[1:]: | |
319 | iotests.log("# Add password slot %s" % slot) | |
320 | cryptsetup_add_password(config, slot) | |
321 | ||
322 | # First we'll open the image using cryptsetup and write a | |
323 | # known pattern of data that we'll then verify with QEMU | |
324 | ||
325 | iotests.log("# Open dev") | |
326 | cryptsetup_open(config) | |
327 | ||
328 | try: | |
329 | iotests.log("# Set dev owner") | |
330 | chown(config) | |
331 | ||
332 | iotests.log("# Write test pattern 0xa7") | |
333 | qemu_io_write_pattern(config, 0xa7, lowOffsetMB, 10, dev=True) | |
334 | iotests.log("# Write test pattern 0x13") | |
335 | qemu_io_write_pattern(config, 0x13, highOffsetMB, 10, dev=True) | |
336 | finally: | |
337 | iotests.log("# Close dev") | |
338 | cryptsetup_close(config) | |
339 | ||
340 | # Ok, now we're using QEMU to verify the pattern just | |
341 | # written via dm-crypt | |
342 | ||
343 | iotests.log("# Read test pattern 0xa7") | |
344 | qemu_io_read_pattern(config, 0xa7, lowOffsetMB, 10, dev=False) | |
345 | iotests.log("# Read test pattern 0x13") | |
346 | qemu_io_read_pattern(config, 0x13, highOffsetMB, 10, dev=False) | |
347 | ||
348 | ||
349 | # Write a new pattern to the image, which we'll later | |
350 | # verify with dm-crypt | |
351 | iotests.log("# Write test pattern 0x91") | |
352 | qemu_io_write_pattern(config, 0x91, lowOffsetMB, 10, dev=False) | |
353 | iotests.log("# Write test pattern 0x5e") | |
354 | qemu_io_write_pattern(config, 0x5e, highOffsetMB, 10, dev=False) | |
355 | ||
356 | ||
357 | # Now we're opening the image with dm-crypt once more | |
358 | # and verifying what QEMU wrote, completing the circle | |
359 | iotests.log("# Open dev") | |
360 | cryptsetup_open(config) | |
361 | ||
362 | try: | |
363 | iotests.log("# Set dev owner") | |
364 | chown(config) | |
365 | ||
366 | iotests.log("# Read test pattern 0x91") | |
367 | qemu_io_read_pattern(config, 0x91, lowOffsetMB, 10, dev=True) | |
368 | iotests.log("# Read test pattern 0x5e") | |
369 | qemu_io_read_pattern(config, 0x5e, highOffsetMB, 10, dev=True) | |
370 | finally: | |
371 | iotests.log("# Close dev") | |
372 | cryptsetup_close(config) | |
373 | finally: | |
374 | iotests.log("# Delete image") | |
375 | delete_image(config) | |
376 | ||
377 | ||
378 | ||
379 | # Obviously we only work with the luks image format | |
380 | iotests.verify_image_format(supported_fmts=['luks']) | |
381 | iotests.verify_platform() | |
382 | ||
383 | # We need sudo in order to run cryptsetup to create | |
384 | # dm-crypt devices. This is safe to use on any | |
385 | # machine, since all dm-crypt devices are backed | |
386 | # by newly created plain files, and have a dm-crypt | |
387 | # name prefix of 'qiotest' to avoid clashing with | |
388 | # user LUKS volumes | |
389 | verify_passwordless_sudo() | |
390 | ||
391 | ||
392 | # If we look at all permutations of cipher, key size, | |
393 | # mode, ivgen, hash, there are ~1000 possible configs. | |
394 | # | |
395 | # We certainly don't want/need to test every permutation | |
396 | # to get good validation of interoperability between QEMU | |
397 | # and dm-crypt/cryptsetup. | |
398 | # | |
399 | # The configs below are a representative set that aim to | |
400 | # exercise each axis of configurability. | |
401 | # | |
402 | configs = [ | |
403 | # A common LUKS default | |
404 | LUKSConfig("aes-256-xts-plain64-sha1", | |
405 | "aes", 256, "xts", "plain64", None, "sha1"), | |
406 | ||
407 | ||
408 | # LUKS default but diff ciphers | |
409 | LUKSConfig("twofish-256-xts-plain64-sha1", | |
410 | "twofish", 256, "xts", "plain64", None, "sha1"), | |
411 | LUKSConfig("serpent-256-xts-plain64-sha1", | |
412 | "serpent", 256, "xts", "plain64", None, "sha1"), | |
413 | # Should really be xts, but kernel doesn't support xts+cast5 | |
414 | # nor does it do essiv+cast5 | |
415 | LUKSConfig("cast5-128-cbc-plain64-sha1", | |
416 | "cast5", 128, "cbc", "plain64", None, "sha1"), | |
417 | LUKSConfig("cast6-256-xts-plain64-sha1", | |
418 | "cast6", 256, "xts", "plain64", None, "sha1"), | |
419 | ||
420 | ||
421 | # LUKS default but diff modes / ivgens | |
422 | LUKSConfig("aes-256-cbc-plain-sha1", | |
423 | "aes", 256, "cbc", "plain", None, "sha1"), | |
424 | LUKSConfig("aes-256-cbc-plain64-sha1", | |
425 | "aes", 256, "cbc", "plain64", None, "sha1"), | |
426 | LUKSConfig("aes-256-cbc-essiv-sha256-sha1", | |
427 | "aes", 256, "cbc", "essiv", "sha256", "sha1"), | |
428 | LUKSConfig("aes-256-xts-essiv-sha256-sha1", | |
429 | "aes", 256, "xts", "essiv", "sha256", "sha1"), | |
430 | ||
431 | ||
432 | # LUKS default but smaller key sizes | |
433 | LUKSConfig("aes-128-xts-plain64-sha256-sha1", | |
434 | "aes", 128, "xts", "plain64", None, "sha1"), | |
435 | LUKSConfig("aes-192-xts-plain64-sha256-sha1", | |
436 | "aes", 192, "xts", "plain64", None, "sha1"), | |
437 | ||
438 | LUKSConfig("twofish-128-xts-plain64-sha1", | |
439 | "twofish", 128, "xts", "plain64", None, "sha1"), | |
440 | LUKSConfig("twofish-192-xts-plain64-sha1", | |
441 | "twofish", 192, "xts", "plain64", None, "sha1"), | |
442 | ||
443 | LUKSConfig("serpent-128-xts-plain64-sha1", | |
444 | "serpent", 128, "xts", "plain64", None, "sha1"), | |
445 | LUKSConfig("serpent-192-xts-plain64-sha1", | |
446 | "serpent", 192, "xts", "plain64", None, "sha1"), | |
447 | ||
448 | LUKSConfig("cast6-128-xts-plain64-sha1", | |
449 | "cast6", 128, "xts", "plain", None, "sha1"), | |
450 | LUKSConfig("cast6-192-xts-plain64-sha1", | |
451 | "cast6", 192, "xts", "plain64", None, "sha1"), | |
452 | ||
453 | ||
454 | # LUKS default but diff hash | |
455 | LUKSConfig("aes-256-xts-plain64-sha256", | |
456 | "aes", 256, "xts", "plain64", None, "sha256"), | |
457 | LUKSConfig("aes-256-xts-plain64-sha512", | |
458 | "aes", 256, "xts", "plain64", None, "sha512"), | |
459 | LUKSConfig("aes-256-xts-plain64-ripemd160", | |
460 | "aes", 256, "xts", "plain64", None, "ripemd160"), | |
461 | ||
462 | # Password in slot 3 | |
463 | LUKSConfig("aes-256-xts-plain-sha1-pwslot3", | |
464 | "aes", 256, "xts", "plain", None, "sha1", | |
465 | passwords={ | |
466 | "3": "slot3", | |
467 | }), | |
468 | ||
469 | # Passwords in every slot | |
470 | LUKSConfig("aes-256-xts-plain-sha1-pwallslots", | |
471 | "aes", 256, "xts", "plain", None, "sha1", | |
472 | passwords={ | |
473 | "0": "slot1", | |
474 | "1": "slot1", | |
475 | "2": "slot2", | |
476 | "3": "slot3", | |
477 | "4": "slot4", | |
478 | "5": "slot5", | |
479 | "6": "slot6", | |
480 | "7": "slot7", | |
481 | }), | |
482 | ] | |
483 | ||
484 | blacklist = [ | |
485 | # We don't have a cast-6 cipher impl for QEMU yet | |
486 | "cast6-256-xts-plain64-sha1", | |
487 | "cast6-128-xts-plain64-sha1", | |
488 | "cast6-192-xts-plain64-sha1", | |
489 | ||
490 | # GCrypt doesn't support Twofish with 192 bit key | |
491 | "twofish-192-xts-plain64-sha1", | |
492 | ||
493 | # We don't have sha512 hash wired up yet | |
494 | "aes-256-xts-plain64-sha512", | |
495 | ||
496 | # We don't have ripemd160 hash wired up yet | |
497 | "aes-256-xts-plain64-ripemd160", | |
498 | ] | |
499 | ||
500 | whitelist = [] | |
501 | if "LUKS_CONFIG" in os.environ: | |
502 | whitelist = os.environ["LUKS_CONFIG"].split(",") | |
503 | ||
504 | for config in configs: | |
505 | if config.name in blacklist: | |
506 | iotests.log("Skipping %s in blacklist" % config.name) | |
507 | continue | |
508 | ||
509 | if len(whitelist) > 0 and config.name not in whitelist: | |
510 | iotests.log("Skipping %s not in whitelist" % config.name) | |
511 | continue | |
512 | ||
513 | test_once(config, qemu_img=False) | |
514 | ||
515 | # XXX we should support setting passwords in a non-0 | |
516 | # key slot with 'qemu-img create' in future | |
517 | (pw, slot) = config.first_password() | |
518 | if slot == "0": | |
519 | test_once(config, qemu_img=True) |