| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069 |
- import itertools
- import logging
- import re
- import struct
- from hashlib import md5, sha256, sha384, sha512
- from typing import (
- Any,
- Callable,
- Dict,
- Iterable,
- Iterator,
- KeysView,
- List,
- Optional,
- Sequence,
- Tuple,
- Type,
- Union,
- cast,
- )
- from cryptography.hazmat.backends import default_backend
- from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
- from pdf2zh import settings
- from pdf2zh.arcfour import Arcfour
- from pdf2zh.data_structures import NumberTree
- from pdf2zh.pdfexceptions import (
- PDFException,
- PDFKeyError,
- PDFObjectNotFound,
- PDFTypeError,
- )
- from pdf2zh.pdfparser import PDFParser, PDFStreamParser, PDFSyntaxError
- from pdf2zh.pdftypes import (
- DecipherCallable,
- PDFStream,
- decipher_all,
- dict_value,
- int_value,
- list_value,
- str_value,
- stream_value,
- uint_value,
- )
- from pdf2zh.psexceptions import PSEOF
- from pdf2zh.psparser import KWD, LIT, literal_name
- from pdf2zh.utils import (
- choplist,
- decode_text,
- format_int_alpha,
- format_int_roman,
- nunpack,
- )
- log = logging.getLogger(__name__)
- class PDFNoValidXRef(PDFSyntaxError):
- pass
- class PDFNoValidXRefWarning(SyntaxWarning):
- """Legacy warning for missing xref.
- Not used anymore because warnings.warn is replaced by logger.Logger.warn.
- """
- class PDFNoOutlines(PDFException):
- pass
- class PDFNoPageLabels(PDFException):
- pass
- class PDFDestinationNotFound(PDFException):
- pass
- class PDFEncryptionError(PDFException):
- pass
- class PDFPasswordIncorrect(PDFEncryptionError):
- pass
- class PDFEncryptionWarning(UserWarning):
- """Legacy warning for failed decryption.
- Not used anymore because warnings.warn is replaced by logger.Logger.warn.
- """
- class PDFTextExtractionNotAllowedWarning(UserWarning):
- """Legacy warning for PDF that does not allow extraction.
- Not used anymore because warnings.warn is replaced by logger.Logger.warn.
- """
- class PDFTextExtractionNotAllowed(PDFEncryptionError):
- pass
- # some predefined literals and keywords.
- LITERAL_OBJSTM = LIT("ObjStm")
- LITERAL_XREF = LIT("XRef")
- LITERAL_CATALOG = LIT("Catalog")
- class PDFBaseXRef:
- def get_trailer(self) -> Dict[str, Any]:
- raise NotImplementedError
- def get_objids(self) -> Iterable[int]:
- return []
- # Must return
- # (strmid, index, genno)
- # or (None, pos, genno)
- def get_pos(self, objid: int) -> Tuple[Optional[int], int, int]:
- raise PDFKeyError(objid)
- def load(self, parser: PDFParser) -> None:
- raise NotImplementedError
- class PDFXRef(PDFBaseXRef):
- def __init__(self) -> None:
- self.offsets: Dict[int, Tuple[Optional[int], int, int]] = {}
- self.trailer: Dict[str, Any] = {}
- def __repr__(self) -> str:
- return "<PDFXRef: offsets=%r>" % (self.offsets.keys())
- def load(self, parser: PDFParser) -> None:
- while True:
- try:
- (pos, line) = parser.nextline()
- line = line.strip()
- if not line:
- continue
- except PSEOF:
- raise PDFNoValidXRef("Unexpected EOF - file corrupted?")
- if line.startswith(b"trailer"):
- parser.seek(pos)
- break
- f = line.split(b" ")
- if len(f) != 2:
- error_msg = f"Trailer not found: {parser!r}: line={line!r}"
- raise PDFNoValidXRef(error_msg)
- try:
- (start, nobjs) = map(int, f)
- except ValueError:
- error_msg = f"Invalid line: {parser!r}: line={line!r}"
- raise PDFNoValidXRef(error_msg)
- for objid in range(start, start + nobjs):
- try:
- (_, line) = parser.nextline()
- line = line.strip()
- except PSEOF:
- raise PDFNoValidXRef("Unexpected EOF - file corrupted?")
- f = line.split(b" ")
- if len(f) != 3:
- error_msg = f"Invalid XRef format: {parser!r}, line={line!r}"
- raise PDFNoValidXRef(error_msg)
- (pos_b, genno_b, use_b) = f
- if use_b != b"n":
- continue
- self.offsets[objid] = (None, int(pos_b), int(genno_b))
- # log.debug("xref objects: %r", self.offsets)
- self.load_trailer(parser)
- def load_trailer(self, parser: PDFParser) -> None:
- try:
- (_, kwd) = parser.nexttoken()
- assert kwd is KWD(b"trailer"), str(kwd)
- _, (_, dic) = parser.nextobject()
- except PSEOF:
- x = parser.pop(1)
- if not x:
- raise PDFNoValidXRef("Unexpected EOF - file corrupted")
- (_, dic) = x[0]
- self.trailer.update(dict_value(dic))
- # log.debug("trailer=%r", self.trailer)
- def get_trailer(self) -> Dict[str, Any]:
- return self.trailer
- def get_objids(self) -> KeysView[int]:
- return self.offsets.keys()
- def get_pos(self, objid: int) -> Tuple[Optional[int], int, int]:
- return self.offsets[objid]
- class PDFXRefFallback(PDFXRef):
- def __repr__(self) -> str:
- return "<PDFXRefFallback: offsets=%r>" % (self.offsets.keys())
- PDFOBJ_CUE = re.compile(r"^(\d+)\s+(\d+)\s+obj\b")
- def load(self, parser: PDFParser) -> None:
- parser.seek(0)
- while 1:
- try:
- (pos, line_bytes) = parser.nextline()
- except PSEOF:
- break
- if line_bytes.startswith(b"trailer"):
- parser.seek(pos)
- self.load_trailer(parser)
- # log.debug("trailer: %r", self.trailer)
- break
- line = line_bytes.decode("latin-1") # default pdf encoding
- m = self.PDFOBJ_CUE.match(line)
- if not m:
- continue
- (objid_s, genno_s) = m.groups()
- objid = int(objid_s)
- genno = int(genno_s)
- self.offsets[objid] = (None, pos, genno)
- # expand ObjStm.
- parser.seek(pos)
- _, (_, obj) = parser.nextobject()
- if isinstance(obj, PDFStream) and obj.get("Type") is LITERAL_OBJSTM:
- stream = stream_value(obj)
- try:
- n = stream["N"]
- except KeyError:
- if settings.STRICT:
- raise PDFSyntaxError("N is not defined: %r" % stream)
- n = 0
- parser1 = PDFStreamParser(stream.get_data())
- objs: List[int] = []
- try:
- while 1:
- _, (_, obj) = parser1.nextobject()
- objs.append(cast(int, obj))
- except PSEOF:
- pass
- n = min(n, len(objs) // 2)
- for index in range(n):
- objid1 = objs[index * 2]
- self.offsets[objid1] = (objid, index, 0)
- class PDFXRefStream(PDFBaseXRef):
- def __init__(self) -> None:
- self.data: Optional[bytes] = None
- self.entlen: Optional[int] = None
- self.fl1: Optional[int] = None
- self.fl2: Optional[int] = None
- self.fl3: Optional[int] = None
- self.ranges: List[Tuple[int, int]] = []
- def __repr__(self) -> str:
- return "<PDFXRefStream: ranges=%r>" % (self.ranges)
- def load(self, parser: PDFParser) -> None:
- (_, objid) = parser.nexttoken() # ignored
- (_, genno) = parser.nexttoken() # ignored
- (_, kwd) = parser.nexttoken()
- _, (_, stream) = parser.nextobject()
- if not isinstance(stream, PDFStream) or stream.get("Type") is not LITERAL_XREF:
- raise PDFNoValidXRef("Invalid PDF stream spec.")
- size = stream["Size"]
- index_array = stream.get("Index", (0, size))
- if len(index_array) % 2 != 0:
- raise PDFSyntaxError("Invalid index number")
- self.ranges.extend(cast(Iterator[Tuple[int, int]], choplist(2, index_array)))
- (self.fl1, self.fl2, self.fl3) = stream["W"]
- assert self.fl1 is not None and self.fl2 is not None and self.fl3 is not None
- self.data = stream.get_data()
- self.entlen = self.fl1 + self.fl2 + self.fl3
- self.trailer = stream.attrs
- # log.debug(
- # "xref stream: objid=%s, fields=%d,%d,%d",
- # ", ".join(map(repr, self.ranges)),
- # self.fl1,
- # self.fl2,
- # self.fl3,
- # )
- def get_trailer(self) -> Dict[str, Any]:
- return self.trailer
- def get_objids(self) -> Iterator[int]:
- for start, nobjs in self.ranges:
- for i in range(nobjs):
- assert self.entlen is not None
- assert self.data is not None
- offset = self.entlen * i
- ent = self.data[offset : offset + self.entlen]
- f1 = nunpack(ent[: self.fl1], 1)
- if f1 == 1 or f1 == 2:
- yield start + i
- def get_pos(self, objid: int) -> Tuple[Optional[int], int, int]:
- index = 0
- for start, nobjs in self.ranges:
- if start <= objid and objid < start + nobjs:
- index += objid - start
- break
- else:
- index += nobjs
- else:
- raise PDFKeyError(objid)
- assert self.entlen is not None
- assert self.data is not None
- assert self.fl1 is not None and self.fl2 is not None and self.fl3 is not None
- offset = self.entlen * index
- ent = self.data[offset : offset + self.entlen]
- f1 = nunpack(ent[: self.fl1], 1)
- f2 = nunpack(ent[self.fl1 : self.fl1 + self.fl2])
- f3 = nunpack(ent[self.fl1 + self.fl2 :])
- if f1 == 1:
- return (None, f2, f3)
- elif f1 == 2:
- return (f2, f3, 0)
- else:
- # this is a free object
- raise PDFKeyError(objid)
- class PDFStandardSecurityHandler:
- PASSWORD_PADDING = (
- b"(\xbfN^Nu\x8aAd\x00NV\xff\xfa\x01\x08"
- b"..\x00\xb6\xd0h>\x80/\x0c\xa9\xfedSiz"
- )
- supported_revisions: Tuple[int, ...] = (2, 3)
- def __init__(
- self,
- docid: Sequence[bytes],
- param: Dict[str, Any],
- password: str = "",
- ) -> None:
- self.docid = docid
- self.param = param
- self.password = password
- self.init()
- def init(self) -> None:
- self.init_params()
- if self.r not in self.supported_revisions:
- error_msg = "Unsupported revision: param=%r" % self.param
- raise PDFEncryptionError(error_msg)
- self.init_key()
- def init_params(self) -> None:
- self.v = int_value(self.param.get("V", 0))
- self.r = int_value(self.param["R"])
- self.p = uint_value(self.param["P"], 32)
- self.o = str_value(self.param["O"])
- self.u = str_value(self.param["U"])
- self.length = int_value(self.param.get("Length", 40))
- def init_key(self) -> None:
- self.key = self.authenticate(self.password)
- if self.key is None:
- raise PDFPasswordIncorrect
- def is_printable(self) -> bool:
- return bool(self.p & 4)
- def is_modifiable(self) -> bool:
- return bool(self.p & 8)
- def is_extractable(self) -> bool:
- return bool(self.p & 16)
- def compute_u(self, key: bytes) -> bytes:
- if self.r == 2:
- # Algorithm 3.4
- return Arcfour(key).encrypt(self.PASSWORD_PADDING) # 2
- else:
- # Algorithm 3.5
- hash = md5(self.PASSWORD_PADDING) # 2
- hash.update(self.docid[0]) # 3
- result = Arcfour(key).encrypt(hash.digest()) # 4
- for i in range(1, 20): # 5
- k = b"".join(bytes((c ^ i,)) for c in iter(key))
- result = Arcfour(k).encrypt(result)
- result += result # 6
- return result
- def compute_encryption_key(self, password: bytes) -> bytes:
- # Algorithm 3.2
- password = (password + self.PASSWORD_PADDING)[:32] # 1
- hash = md5(password) # 2
- hash.update(self.o) # 3
- # See https://github.com/pdf2zh/pdf2zh.six/issues/186
- hash.update(struct.pack("<L", self.p)) # 4
- hash.update(self.docid[0]) # 5
- if self.r >= 4:
- if not cast(PDFStandardSecurityHandlerV4, self).encrypt_metadata:
- hash.update(b"\xff\xff\xff\xff")
- result = hash.digest()
- n = 5
- if self.r >= 3:
- n = self.length // 8
- for _ in range(50):
- result = md5(result[:n]).digest()
- return result[:n]
- def authenticate(self, password: str) -> Optional[bytes]:
- password_bytes = password.encode("latin1")
- key = self.authenticate_user_password(password_bytes)
- if key is None:
- key = self.authenticate_owner_password(password_bytes)
- return key
- def authenticate_user_password(self, password: bytes) -> Optional[bytes]:
- key = self.compute_encryption_key(password)
- if self.verify_encryption_key(key):
- return key
- else:
- return None
- def verify_encryption_key(self, key: bytes) -> bool:
- # Algorithm 3.6
- u = self.compute_u(key)
- if self.r == 2:
- return u == self.u
- return u[:16] == self.u[:16]
- def authenticate_owner_password(self, password: bytes) -> Optional[bytes]:
- # Algorithm 3.7
- password = (password + self.PASSWORD_PADDING)[:32]
- hash = md5(password)
- if self.r >= 3:
- for _ in range(50):
- hash = md5(hash.digest())
- n = 5
- if self.r >= 3:
- n = self.length // 8
- key = hash.digest()[:n]
- if self.r == 2:
- user_password = Arcfour(key).decrypt(self.o)
- else:
- user_password = self.o
- for i in range(19, -1, -1):
- k = b"".join(bytes((c ^ i,)) for c in iter(key))
- user_password = Arcfour(k).decrypt(user_password)
- return self.authenticate_user_password(user_password)
- def decrypt(
- self,
- objid: int,
- genno: int,
- data: bytes,
- attrs: Optional[Dict[str, Any]] = None,
- ) -> bytes:
- return self.decrypt_rc4(objid, genno, data)
- def decrypt_rc4(self, objid: int, genno: int, data: bytes) -> bytes:
- assert self.key is not None
- key = self.key + struct.pack("<L", objid)[:3] + struct.pack("<L", genno)[:2]
- hash = md5(key)
- key = hash.digest()[: min(len(key), 16)]
- return Arcfour(key).decrypt(data)
- class PDFStandardSecurityHandlerV4(PDFStandardSecurityHandler):
- supported_revisions: Tuple[int, ...] = (4,)
- def init_params(self) -> None:
- super().init_params()
- self.length = 128
- self.cf = dict_value(self.param.get("CF"))
- self.stmf = literal_name(self.param["StmF"])
- self.strf = literal_name(self.param["StrF"])
- self.encrypt_metadata = bool(self.param.get("EncryptMetadata", True))
- if self.stmf != self.strf:
- error_msg = "Unsupported crypt filter: param=%r" % self.param
- raise PDFEncryptionError(error_msg)
- self.cfm = {}
- for k, v in self.cf.items():
- f = self.get_cfm(literal_name(v["CFM"]))
- if f is None:
- error_msg = "Unknown crypt filter method: param=%r" % self.param
- raise PDFEncryptionError(error_msg)
- self.cfm[k] = f
- self.cfm["Identity"] = self.decrypt_identity
- if self.strf not in self.cfm:
- error_msg = "Undefined crypt filter: param=%r" % self.param
- raise PDFEncryptionError(error_msg)
- def get_cfm(self, name: str) -> Optional[Callable[[int, int, bytes], bytes]]:
- if name == "V2":
- return self.decrypt_rc4
- elif name == "AESV2":
- return self.decrypt_aes128
- else:
- return None
- def decrypt(
- self,
- objid: int,
- genno: int,
- data: bytes,
- attrs: Optional[Dict[str, Any]] = None,
- name: Optional[str] = None,
- ) -> bytes:
- if not self.encrypt_metadata and attrs is not None:
- t = attrs.get("Type")
- if t is not None and literal_name(t) == "Metadata":
- return data
- if name is None:
- name = self.strf
- return self.cfm[name](objid, genno, data)
- def decrypt_identity(self, objid: int, genno: int, data: bytes) -> bytes:
- return data
- def decrypt_aes128(self, objid: int, genno: int, data: bytes) -> bytes:
- assert self.key is not None
- key = (
- self.key
- + struct.pack("<L", objid)[:3]
- + struct.pack("<L", genno)[:2]
- + b"sAlT"
- )
- hash = md5(key)
- key = hash.digest()[: min(len(key), 16)]
- initialization_vector = data[:16]
- ciphertext = data[16:]
- cipher = Cipher(
- algorithms.AES(key),
- modes.CBC(initialization_vector),
- backend=default_backend(),
- ) # type: ignore
- return cipher.decryptor().update(ciphertext) # type: ignore
- class PDFStandardSecurityHandlerV5(PDFStandardSecurityHandlerV4):
- supported_revisions = (5, 6)
- def init_params(self) -> None:
- super().init_params()
- self.length = 256
- self.oe = str_value(self.param["OE"])
- self.ue = str_value(self.param["UE"])
- self.o_hash = self.o[:32]
- self.o_validation_salt = self.o[32:40]
- self.o_key_salt = self.o[40:]
- self.u_hash = self.u[:32]
- self.u_validation_salt = self.u[32:40]
- self.u_key_salt = self.u[40:]
- def get_cfm(self, name: str) -> Optional[Callable[[int, int, bytes], bytes]]:
- if name == "AESV3":
- return self.decrypt_aes256
- else:
- return None
- def authenticate(self, password: str) -> Optional[bytes]:
- password_b = self._normalize_password(password)
- hash = self._password_hash(password_b, self.o_validation_salt, self.u)
- if hash == self.o_hash:
- hash = self._password_hash(password_b, self.o_key_salt, self.u)
- cipher = Cipher(
- algorithms.AES(hash),
- modes.CBC(b"\0" * 16),
- backend=default_backend(),
- ) # type: ignore
- return cipher.decryptor().update(self.oe) # type: ignore
- hash = self._password_hash(password_b, self.u_validation_salt)
- if hash == self.u_hash:
- hash = self._password_hash(password_b, self.u_key_salt)
- cipher = Cipher(
- algorithms.AES(hash),
- modes.CBC(b"\0" * 16),
- backend=default_backend(),
- ) # type: ignore
- return cipher.decryptor().update(self.ue) # type: ignore
- return None
- def _normalize_password(self, password: str) -> bytes:
- if self.r == 6:
- # saslprep expects non-empty strings, apparently
- if not password:
- return b""
- from pdf2zh._saslprep import saslprep
- password = saslprep(password)
- return password.encode("utf-8")[:127]
- def _password_hash(
- self,
- password: bytes,
- salt: bytes,
- vector: Optional[bytes] = None,
- ) -> bytes:
- """Compute password hash depending on revision number"""
- if self.r == 5:
- return self._r5_password(password, salt, vector)
- return self._r6_password(password, salt[0:8], vector)
- def _r5_password(
- self,
- password: bytes,
- salt: bytes,
- vector: Optional[bytes] = None,
- ) -> bytes:
- """Compute the password for revision 5"""
- hash = sha256(password)
- hash.update(salt)
- if vector is not None:
- hash.update(vector)
- return hash.digest()
- def _r6_password(
- self,
- password: bytes,
- salt: bytes,
- vector: Optional[bytes] = None,
- ) -> bytes:
- """Compute the password for revision 6"""
- initial_hash = sha256(password)
- initial_hash.update(salt)
- if vector is not None:
- initial_hash.update(vector)
- k = initial_hash.digest()
- hashes = (sha256, sha384, sha512)
- round_no = last_byte_val = 0
- while round_no < 64 or last_byte_val > round_no - 32:
- k1 = (password + k + (vector or b"")) * 64
- e = self._aes_cbc_encrypt(key=k[:16], iv=k[16:32], data=k1)
- # compute the first 16 bytes of e,
- # interpreted as an unsigned integer mod 3
- next_hash = hashes[self._bytes_mod_3(e[:16])]
- k = next_hash(e).digest()
- last_byte_val = e[len(e) - 1]
- round_no += 1
- return k[:32]
- @staticmethod
- def _bytes_mod_3(input_bytes: bytes) -> int:
- # 256 is 1 mod 3, so we can just sum 'em
- return sum(b % 3 for b in input_bytes) % 3
- def _aes_cbc_encrypt(self, key: bytes, iv: bytes, data: bytes) -> bytes:
- cipher = Cipher(algorithms.AES(key), modes.CBC(iv))
- encryptor = cipher.encryptor() # type: ignore
- return encryptor.update(data) + encryptor.finalize() # type: ignore
- def decrypt_aes256(self, objid: int, genno: int, data: bytes) -> bytes:
- initialization_vector = data[:16]
- ciphertext = data[16:]
- assert self.key is not None
- cipher = Cipher(
- algorithms.AES(self.key),
- modes.CBC(initialization_vector),
- backend=default_backend(),
- ) # type: ignore
- return cipher.decryptor().update(ciphertext) # type: ignore
- class PDFDocument:
- """PDFDocument object represents a PDF document.
- Since a PDF file can be very big, normally it is not loaded at
- once. So PDF document has to cooperate with a PDF parser in order to
- dynamically import the data as processing goes.
- Typical usage:
- doc = PDFDocument(parser, password)
- obj = doc.getobj(objid)
- """
- security_handler_registry: Dict[int, Type[PDFStandardSecurityHandler]] = {
- 1: PDFStandardSecurityHandler,
- 2: PDFStandardSecurityHandler,
- 4: PDFStandardSecurityHandlerV4,
- 5: PDFStandardSecurityHandlerV5,
- }
- def __init__(
- self,
- parser: PDFParser,
- password: str = "",
- caching: bool = True,
- fallback: bool = True,
- ) -> None:
- """Set the document to use a given PDFParser object."""
- self.caching = caching
- self.xrefs: List[PDFBaseXRef] = []
- self.info = []
- self.catalog: Dict[str, Any] = {}
- self.encryption: Optional[Tuple[Any, Any]] = None
- self.decipher: Optional[DecipherCallable] = None
- self._parser = None
- self._cached_objs: Dict[int, Tuple[object, int]] = {}
- self._parsed_objs: Dict[int, Tuple[List[object], int]] = {}
- self._parser = parser
- self._parser.set_document(self)
- self.is_printable = self.is_modifiable = self.is_extractable = True
- # Retrieve the information of each header that was appended
- # (maybe multiple times) at the end of the document.
- try:
- # print('FIND XREF')
- pos = self.find_xref(parser)
- self.pos=pos
- self.read_xref_from(parser, pos, self.xrefs)
- except PDFNoValidXRef:
- if fallback:
- parser.fallback = True
- newxref = PDFXRefFallback()
- newxref.load(parser)
- self.xrefs.append(newxref)
- # print(f'XREF {self.xrefs}')
- for xref in self.xrefs:
- trailer = xref.get_trailer()
- if not trailer:
- continue
- # If there's an encryption info, remember it.
- if "Encrypt" in trailer:
- if "ID" in trailer:
- id_value = list_value(trailer["ID"])
- else:
- # Some documents may not have a /ID, use two empty
- # byte strings instead. Solves
- # https://github.com/pdf2zh/pdf2zh.six/issues/594
- id_value = (b"", b"")
- self.encryption = (id_value, dict_value(trailer["Encrypt"]))
- self._initialize_password(password)
- if "Info" in trailer:
- self.info.append(dict_value(trailer["Info"]))
- if "Root" in trailer:
- # Every PDF file must have exactly one /Root dictionary.
- self.catalog = dict_value(trailer["Root"])
- break
- else:
- raise PDFSyntaxError("No /Root object! - Is this really a PDF?")
- if self.catalog.get("Type") is not LITERAL_CATALOG:
- if settings.STRICT:
- raise PDFSyntaxError("Catalog not found!")
- KEYWORD_OBJ = KWD(b"obj")
- # _initialize_password(password=b'')
- # Perform the initialization with a given password.
- def _initialize_password(self, password: str = "") -> None:
- assert self.encryption is not None
- (docid, param) = self.encryption
- if literal_name(param.get("Filter")) != "Standard":
- raise PDFEncryptionError("Unknown filter: param=%r" % param)
- v = int_value(param.get("V", 0))
- factory = self.security_handler_registry.get(v)
- if factory is None:
- raise PDFEncryptionError("Unknown algorithm: param=%r" % param)
- handler = factory(docid, param, password)
- self.decipher = handler.decrypt
- self.is_printable = handler.is_printable()
- self.is_modifiable = handler.is_modifiable()
- self.is_extractable = handler.is_extractable()
- assert self._parser is not None
- self._parser.fallback = False # need to read streams with exact length
- def _getobj_objstm(self, stream: PDFStream, index: int, objid: int) -> object:
- if stream.objid in self._parsed_objs:
- (objs, n) = self._parsed_objs[stream.objid]
- else:
- (objs, n) = self._get_objects(stream)
- if self.caching:
- assert stream.objid is not None
- self._parsed_objs[stream.objid] = (objs, n)
- i = n * 2 + index
- try:
- obj = objs[i]
- except IndexError:
- raise PDFSyntaxError("index too big: %r" % index)
- return obj
- def _get_objects(self, stream: PDFStream) -> Tuple[List[object], int]:
- if stream.get("Type") is not LITERAL_OBJSTM:
- if settings.STRICT:
- raise PDFSyntaxError("Not a stream object: %r" % stream)
- try:
- n = cast(int, stream["N"])
- except KeyError:
- if settings.STRICT:
- raise PDFSyntaxError("N is not defined: %r" % stream)
- n = 0
- parser = PDFStreamParser(stream.get_data())
- parser.set_document(self)
- objs: List[object] = []
- try:
- while 1:
- _, (_, obj) = parser.nextobject()
- objs.append(obj)
- except PSEOF:
- pass
- return (objs, n)
- def _getobj_parse(self, pos: int, objid: int) -> object:
- assert self._parser is not None
- self._parser.seek(pos)
- (_, objid1) = self._parser.nexttoken() # objid
- (_, genno) = self._parser.nexttoken() # genno
- (_, kwd) = self._parser.nexttoken()
- # hack around malformed pdf files
- # copied from https://github.com/jaepil/pdf2zh3k/blob/master/
- # pdf2zh/pdfparser.py#L399
- # to solve https://github.com/pdf2zh/pdf2zh.six/issues/56
- # assert objid1 == objid, str((objid1, objid))
- if objid1 != objid:
- x = []
- while kwd is not self.KEYWORD_OBJ:
- (_, kwd) = self._parser.nexttoken()
- x.append(kwd)
- if len(x) >= 2:
- objid1 = x[-2]
- # #### end hack around malformed pdf files
- if objid1 != objid:
- raise PDFSyntaxError(f"objid mismatch: {objid1!r}={objid!r}")
- if kwd != KWD(b"obj"):
- raise PDFSyntaxError("Invalid object spec: offset=%r" % pos)
- end, (_, obj) = self._parser.nextobject()
- return end, obj
- # can raise PDFObjectNotFound
- def getobj(self, objid: int) -> object:
- """Get object from PDF
- :raises PDFException if PDFDocument is not initialized
- :raises PDFObjectNotFound if objid does not exist in PDF
- """
- if not self.xrefs:
- raise PDFException("PDFDocument is not initialized")
- # log.debug("getobj: objid=%r", objid)
- if objid in self._cached_objs:
- (obj, genno) = self._cached_objs[objid]
- else:
- for xref in self.xrefs:
- try:
- (strmid, index, genno) = xref.get_pos(objid)
- except KeyError:
- continue
- try:
- if strmid is not None:
- stream = stream_value(self.getobj(strmid))
- obj = self._getobj_objstm(stream, index, objid)
- else:
- end, obj = self._getobj_parse(index, objid)
- if self.decipher:
- obj = decipher_all(self.decipher, objid, genno, obj)
- if isinstance(obj, PDFStream):
- obj.set_objid(objid, genno)
- break
- except (PSEOF, PDFSyntaxError):
- continue
- else:
- raise PDFObjectNotFound(objid)
- # log.debug("register: objid=%r: %r", objid, obj)
- if self.caching:
- self._cached_objs[objid] = (obj, genno)
- return obj
- OutlineType = Tuple[Any, Any, Any, Any, Any]
- def get_outlines(self) -> Iterator[OutlineType]:
- if "Outlines" not in self.catalog:
- raise PDFNoOutlines
- def search(entry: object, level: int) -> Iterator[PDFDocument.OutlineType]:
- entry = dict_value(entry)
- if "Title" in entry:
- if "A" in entry or "Dest" in entry:
- title = decode_text(str_value(entry["Title"]))
- dest = entry.get("Dest")
- action = entry.get("A")
- se = entry.get("SE")
- yield (level, title, dest, action, se)
- if "First" in entry and "Last" in entry:
- yield from search(entry["First"], level + 1)
- if "Next" in entry:
- yield from search(entry["Next"], level)
- return search(self.catalog["Outlines"], 0)
- def get_page_labels(self) -> Iterator[str]:
- """Generate page label strings for the PDF document.
- If the document includes page labels, generates strings, one per page.
- If not, raises PDFNoPageLabels.
- The resulting iteration is unbounded.
- """
- assert self.catalog is not None
- try:
- page_labels = PageLabels(self.catalog["PageLabels"])
- except (PDFTypeError, KeyError):
- raise PDFNoPageLabels
- return page_labels.labels
- def lookup_name(self, cat: str, key: Union[str, bytes]) -> Any:
- try:
- names = dict_value(self.catalog["Names"])
- except (PDFTypeError, KeyError):
- raise PDFKeyError((cat, key))
- # may raise KeyError
- d0 = dict_value(names[cat])
- def lookup(d: Dict[str, Any]) -> Any:
- if "Limits" in d:
- (k1, k2) = list_value(d["Limits"])
- if key < k1 or k2 < key:
- return None
- if "Names" in d:
- objs = list_value(d["Names"])
- names = dict(
- cast(Iterator[Tuple[Union[str, bytes], Any]], choplist(2, objs)),
- )
- return names[key]
- if "Kids" in d:
- for c in list_value(d["Kids"]):
- v = lookup(dict_value(c))
- if v:
- return v
- raise PDFKeyError((cat, key))
- return lookup(d0)
- def get_dest(self, name: Union[str, bytes]) -> Any:
- try:
- # PDF-1.2 or later
- obj = self.lookup_name("Dests", name)
- except KeyError:
- # PDF-1.1 or prior
- if "Dests" not in self.catalog:
- raise PDFDestinationNotFound(name)
- d0 = dict_value(self.catalog["Dests"])
- if name not in d0:
- raise PDFDestinationNotFound(name)
- obj = d0[name]
- return obj
- # find_xref
- def find_xref(self, parser: PDFParser) -> int:
- """Internal function used to locate the first XRef."""
- # search the last xref table by scanning the file backwards.
- prev = b""
- for line in parser.revreadlines():
- line = line.strip()
- # log.debug("find_xref: %r", line)
- if line == b"startxref":
- # log.debug("xref found: pos=%r", prev)
- if not prev.isdigit():
- raise PDFNoValidXRef(f"Invalid xref position: {prev!r}")
- start = int(prev)
- if not start >= 0:
- raise PDFNoValidXRef(f"Invalid negative xref position: {start}")
- return start
- if line:
- prev = line
- raise PDFNoValidXRef("Unexpected EOF")
- # read xref table
- def read_xref_from(
- self,
- parser: PDFParser,
- start: int,
- xrefs: List[PDFBaseXRef],
- ) -> None:
- """Reads XRefs from the given location."""
- parser.seek(start)
- parser.reset()
- try:
- (pos, token) = parser.nexttoken()
- except PSEOF:
- raise PDFNoValidXRef("Unexpected EOF")
- # log.debug("read_xref_from: start=%d, token=%r", start, token)
- if isinstance(token, int):
- # XRefStream: PDF-1.5
- parser.seek(pos)
- parser.reset()
- xref: PDFBaseXRef = PDFXRefStream()
- xref.load(parser)
- else:
- if token is parser.KEYWORD_XREF:
- parser.nextline()
- xref = PDFXRef()
- xref.load(parser)
- xrefs.append(xref)
- trailer = xref.get_trailer()
- # log.debug("trailer: %r", trailer)
- if "XRefStm" in trailer:
- pos = int_value(trailer["XRefStm"])
- self.read_xref_from(parser, pos, xrefs)
- if "Prev" in trailer:
- # find previous xref
- pos = int_value(trailer["Prev"])
- self.read_xref_from(parser, pos, xrefs)
- class PageLabels(NumberTree):
- """PageLabels from the document catalog.
- See Section 8.3.1 in the PDF Reference.
- """
- @property
- def labels(self) -> Iterator[str]:
- ranges = self.values
- # The tree must begin with page index 0
- if len(ranges) == 0 or ranges[0][0] != 0:
- if settings.STRICT:
- raise PDFSyntaxError("PageLabels is missing page index 0")
- else:
- # Try to cope, by assuming empty labels for the initial pages
- ranges.insert(0, (0, {}))
- for next, (start, label_dict_unchecked) in enumerate(ranges, 1):
- label_dict = dict_value(label_dict_unchecked)
- style = label_dict.get("S")
- prefix = decode_text(str_value(label_dict.get("P", b"")))
- first_value = int_value(label_dict.get("St", 1))
- if next == len(ranges):
- # This is the last specified range. It continues until the end
- # of the document.
- values: Iterable[int] = itertools.count(first_value)
- else:
- end, _ = ranges[next]
- range_length = end - start
- values = range(first_value, first_value + range_length)
- for value in values:
- label = self._format_page_label(value, style)
- yield prefix + label
- @staticmethod
- def _format_page_label(value: int, style: Any) -> str:
- """Format page label value in a specific style"""
- if style is None:
- label = ""
- elif style is LIT("D"): # Decimal arabic numerals
- label = str(value)
- elif style is LIT("R"): # Uppercase roman numerals
- label = format_int_roman(value).upper()
- elif style is LIT("r"): # Lowercase roman numerals
- label = format_int_roman(value)
- elif style is LIT("A"): # Uppercase letters A-Z, AA-ZZ...
- label = format_int_alpha(value).upper()
- elif style is LIT("a"): # Lowercase letters a-z, aa-zz...
- label = format_int_alpha(value)
- else:
- log.warning("Unknown page label style: %r", style)
- label = ""
- return label
|