Skip to content

bpo-38334: Fix seeking bug for encrypted zipfiles #16529

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions Lib/test/test_zipfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -1934,6 +1934,44 @@ def test_unicode_password(self):
self.assertRaises(TypeError, self.zip.open, "test.txt", pwd="python")
self.assertRaises(TypeError, self.zip.extract, "test.txt", pwd="python")

def test_seek_tell(self):
self.zip.setpassword(b"python")
txt = self.plain
test_word = b'encryption'
bloc = txt.find(test_word)
bloc_len = len(test_word)
with self.zip.open("test.txt", "r") as fp:
fp.seek(bloc, os.SEEK_SET)
self.assertEqual(fp.tell(), bloc)
fp.seek(-bloc, os.SEEK_CUR)
self.assertEqual(fp.tell(), 0)
fp.seek(bloc, os.SEEK_CUR)
self.assertEqual(fp.tell(), bloc)
self.assertEqual(fp.read(bloc_len), txt[bloc:bloc+bloc_len])

# Make sure that the second read after seeking back beyond
# _readbuffer returns the same content (ie. rewind to the start of
# the file to read forward to the required position).
old_read_size = fp.MIN_READ_SIZE
fp.MIN_READ_SIZE = 1
fp._readbuffer = b''
fp._offset = 0
fp.seek(0, os.SEEK_SET)
self.assertEqual(fp.tell(), 0)
fp.seek(bloc, os.SEEK_CUR)
self.assertEqual(fp.read(bloc_len), txt[bloc:bloc+bloc_len])
fp.MIN_READ_SIZE = old_read_size

fp.seek(0, os.SEEK_END)
self.assertEqual(fp.tell(), len(txt))
fp.seek(0, os.SEEK_SET)
self.assertEqual(fp.tell(), 0)

# Read the file completely to definitely call any eof integrity
# checks (crc) and make sure they still pass.
fp.read()


class AbstractTestsWithRandomBinaryFiles:
@classmethod
def setUpClass(cls):
Expand Down
221 changes: 141 additions & 80 deletions Lib/zipfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -550,50 +550,117 @@ def _gen_crc(crc):
crc >>= 1
return crc

# ZIP supports a password-based form of encryption. Even though known
# plaintext attacks have been found against it, it is still useful
# to be able to get data out of such a file.
#
# Usage:
# zd = _ZipDecrypter(mypwd)
# plain_bytes = zd(cypher_bytes)

def _ZipDecrypter(pwd):
key0 = 305419896
key1 = 591751049
key2 = 878082192

global _crctable
if _crctable is None:
_crctable = list(map(_gen_crc, range(256)))
crctable = _crctable

def crc32(ch, crc):
"""Compute the CRC32 primitive on one byte."""
return (crc >> 8) ^ crctable[(crc ^ ch) & 0xFF]

def update_keys(c):
nonlocal key0, key1, key2
key0 = crc32(c, key0)
key1 = (key1 + (key0 & 0xFF)) & 0xFFFFFFFF
key1 = (key1 * 134775813 + 1) & 0xFFFFFFFF
key2 = crc32(key1 >> 24, key2)
class CRCZipDecrypter:
"""PKWARE Encryption Decrypter

ZIP supports a password-based form of encryption. Even though known
plaintext attacks have been found against it, it is still useful
to be able to get data out of such a file.
Usage:
zd = CRCZipDecrypter(zinfo, mypwd)
zd.start_decrypt(fileobj)
plain_bytes = zd.decrypt(cypher_bytes)
"""

encryption_header_length = 12

def __init__(self, zinfo, pwd):
self.zinfo = zinfo
self.name = zinfo.filename

if not pwd:
raise RuntimeError("File %r is encrypted, a password is "
"required for extraction" % self.name)
self.pwd = pwd

def start_decrypt(self, fileobj):
"""Initialise or reset the decrypter.

This method reads from the `SharedFile` `fileobj` zip stream and
therefore should be executed within the context of the
ZipExtFile (so that all read operations occur in that object).

By the end of this method, `fileobj` must be at the start of the
"file data" section of the zipstream.
"""

self.key0 = 305419896
self.key1 = 591751049
self.key2 = 878082192

global _crctable
if _crctable is None:
_crctable = list(map(_gen_crc, range(256)))

for p in pwd:
update_keys(p)
for p in self.pwd:
self.update_keys(p)

def decrypter(data):
# The first 12 bytes in the cypher stream is an encryption
# header used to strengthen the algorithm. The first 11 bytes
# are completely random, while the 12th contains the MSB of the
# CRC, or the MSB of the file time depending on the header type
# and is used to check the correctness of the password.
header = fileobj.read(self.encryption_header_length)
h = self.decrypt(header[0:12])

if self.zinfo.flag_bits & 0x8:
# compare against the file type from extended local headers
check_byte = (self.zinfo._raw_time >> 8) & 0xff
else:
# compare against the CRC otherwise
check_byte = (self.zinfo.CRC >> 24) & 0xff

if h[11] != check_byte:
raise RuntimeError("Bad password for file %r" % self.name)

def encrypt_overhead_bytes_cnt(self):
"""Returns the number of bytes in the zipstream used to initialise the decrypter.

The returned values represents the number of bytes used for
encryption that should be excluded from the _compress_size
counter (eg. the "encryption header" section and any bytes after
the "file data" used for encryption, such as the HMAC value for
winzip's AES encryption).
"""
return self.encryption_header_length

def crc32(self, ch, crc):
"""Compute the CRC32 primitive on one byte."""
return (crc >> 8) ^ _crctable[(crc ^ ch) & 0xFF]

def update_keys(self, c):
# Note: this is also inlined in the `decrypt` method.
self.key0 = self.crc32(c, self.key0)
self.key1 = (self.key1 + (self.key0 & 0xFF)) & 0xFFFFFFFF
self.key1 = (self.key1 * 134775813 + 1) & 0xFFFFFFFF
self.key2 = self.crc32(self.key1 >> 24, self.key2)

def decrypt(self, data):
"""Decrypt a bytes object."""
result = bytearray()
key0 = self.key0
key1 = self.key1
key2 = self.key2
append = result.append
for c in data:
k = key2 | 2
c ^= ((k * (k^1)) >> 8) & 0xFF
update_keys(c)
c ^= ((k * (k ^ 1)) >> 8) & 0xFF
# update_keys inlined as this is a tight loop.
# crc32 inlined
key0 = (key0 >> 8) ^ _crctable[(key0 ^ c) & 0xFF]
key1 = (key1 + (key0 & 0xFF)) & 0xFFFFFFFF
key1 = (key1 * 134775813 + 1) & 0xFFFFFFFF
# crc32 inlined
key2 = (key2 >> 8) ^ _crctable[(key2 ^ key1 >> 24) & 0xFF]
# End inlining
append(c)
return bytes(result)

return decrypter
self.key0 = key0
self.key1 = key1
self.key2 = key2

return bytes(result)


class LZMACompressor:
Expand Down Expand Up @@ -794,46 +861,64 @@ class ZipExtFile(io.BufferedIOBase):

def __init__(self, fileobj, mode, zipinfo, decrypter=None,
close_fileobj=False):
self._zinfo = zipinfo
self._fileobj = fileobj
self._decrypter = decrypter
self._close_fileobj = close_fileobj

self._compress_type = zipinfo.compress_type
self._compress_left = zipinfo.compress_size
self._left = zipinfo.file_size

self._decompressor = _get_decompressor(self._compress_type)

self._eof = False
self._readbuffer = b''
self._offset = 0

self.newlines = None

# Adjust read size for encrypted files since the first 12 bytes
# are for the encryption/password information.
if self._decrypter is not None:
self._compress_left -= 12

self.mode = mode
self.name = zipinfo.filename

if hasattr(zipinfo, 'CRC'):
self._expected_crc = zipinfo.CRC
self._running_crc = crc32(b'')
else:
self._expected_crc = None

self._seekable = False
try:
if fileobj.seekable():
self._orig_compress_start = fileobj.tell()
self._orig_compress_size = zipinfo.compress_size
self._orig_file_size = zipinfo.file_size
self._orig_start_crc = self._running_crc
self._seekable = True
except AttributeError:
pass
# Compress start is the byte after the 'local file header' ie. the
# start of 'encryption header' section, if present, or
# 'file data' otherwise.
self._compress_start = fileobj.tell()
self.read_init()

def read_init(self):
self._running_crc = crc32(b'')
self._compress_left = self._zinfo.compress_size
self._left = self._zinfo.file_size
self._eof = False
self._readbuffer = b''
self._offset = 0

self.start_decrypter()
self._decompressor = _get_decompressor(self._compress_type)

def start_decrypter(self):
# check for encrypted flag & handle password
if self._zinfo.flag_bits & 0x1:
if not self._decrypter:
raise RuntimeError("File %r is encrypted, a decrypter is "
"required for extraction" % self.name)

# self._decrypter is responsible for reading the
# "encryption header" section if present.
self._decrypter.start_decrypt(self._fileobj)
# By here, self._fileobj should be at the start of the "file data"
# section.

# Adjust read size for encrypted files by the length of the
# "encryption header" section and any bytes after the encrypted
# data.
self._compress_left -= self._decrypter.encrypt_overhead_bytes_cnt()

def __repr__(self):
result = ['<%s.%s' % (self.__class__.__module__,
Expand Down Expand Up @@ -1010,7 +1095,7 @@ def _read2(self, n):
raise EOFError

if self._decrypter is not None:
data = self._decrypter(data)
data = self._decrypter.decrypt(data)
return data

def close(self):
Expand Down Expand Up @@ -1052,14 +1137,8 @@ def seek(self, offset, whence=0):
read_offset = 0
elif read_offset < 0:
# Position is before the current position. Reset the ZipExtFile
self._fileobj.seek(self._orig_compress_start)
self._running_crc = self._orig_start_crc
self._compress_left = self._orig_compress_size
self._left = self._orig_file_size
self._readbuffer = b''
self._offset = 0
self._decompressor = _get_decompressor(self._compress_type)
self._eof = False
self._fileobj.seek(self._compress_start)
self.read_init()
read_offset = new_pos

while read_offset > 0:
Expand Down Expand Up @@ -1528,26 +1607,8 @@ def open(self, name, mode="r", pwd=None, *, force_zip64=False):
if is_encrypted:
if not pwd:
pwd = self.pwd
if not pwd:
raise RuntimeError("File %r is encrypted, password "
"required for extraction" % name)

zd = _ZipDecrypter(pwd)
# The first 12 bytes in the cypher stream is an encryption header
# used to strengthen the algorithm. The first 11 bytes are
# completely random, while the 12th contains the MSB of the CRC,
# or the MSB of the file time depending on the header type
# and is used to check the correctness of the password.
header = zef_file.read(12)
h = zd(header[0:12])
if zinfo.flag_bits & 0x8:
# compare against the file type from extended local headers
check_byte = (zinfo._raw_time >> 8) & 0xff
else:
# compare against the CRC otherwise
check_byte = (zinfo.CRC >> 24) & 0xff
if h[11] != check_byte:
raise RuntimeError("Bad password for file %r" % name)

zd = CRCZipDecrypter(zinfo, pwd)

return ZipExtFile(zef_file, mode, zinfo, zd, True)
except:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix seeking on an encrypted :class:`zipfile.ZipExtFile` where seeking backwards would cause the crc keys to go out of sync resulting in bad data to be read after the seek. This refactors the :func:`zipfile._ZipDecrypter` function into a class :class:`zipfile.CRCZipDecrypter`.