Spaces:
Runtime error
Runtime error
text-generation-webui
/
installer_files
/conda
/lib
/python3.10
/site-packages
/cryptography
/x509
/base.py
| # This file is dual licensed under the terms of the Apache License, Version | |
| # 2.0, and the BSD License. See the LICENSE file in the root of this repository | |
| # for complete details. | |
| import abc | |
| import datetime | |
| import os | |
| import typing | |
| from cryptography import utils | |
| from cryptography.hazmat.bindings._rust import x509 as rust_x509 | |
| from cryptography.hazmat.primitives import hashes, serialization | |
| from cryptography.hazmat.primitives.asymmetric import ( | |
| dsa, | |
| ec, | |
| ed448, | |
| ed25519, | |
| rsa, | |
| x448, | |
| x25519, | |
| ) | |
| from cryptography.hazmat.primitives.asymmetric.types import ( | |
| CERTIFICATE_ISSUER_PUBLIC_KEY_TYPES, | |
| CERTIFICATE_PRIVATE_KEY_TYPES, | |
| CERTIFICATE_PUBLIC_KEY_TYPES, | |
| ) | |
| from cryptography.x509.extensions import ( | |
| Extension, | |
| Extensions, | |
| ExtensionType, | |
| _make_sequence_methods, | |
| ) | |
| from cryptography.x509.name import Name, _ASN1Type | |
| from cryptography.x509.oid import ObjectIdentifier | |
| _EARLIEST_UTC_TIME = datetime.datetime(1950, 1, 1) | |
| class AttributeNotFound(Exception): | |
| def __init__(self, msg: str, oid: ObjectIdentifier) -> None: | |
| super(AttributeNotFound, self).__init__(msg) | |
| self.oid = oid | |
| def _reject_duplicate_extension( | |
| extension: Extension[ExtensionType], | |
| extensions: typing.List[Extension[ExtensionType]], | |
| ) -> None: | |
| # This is quadratic in the number of extensions | |
| for e in extensions: | |
| if e.oid == extension.oid: | |
| raise ValueError("This extension has already been set.") | |
| def _reject_duplicate_attribute( | |
| oid: ObjectIdentifier, | |
| attributes: typing.List[ | |
| typing.Tuple[ObjectIdentifier, bytes, typing.Optional[int]] | |
| ], | |
| ) -> None: | |
| # This is quadratic in the number of attributes | |
| for attr_oid, _, _ in attributes: | |
| if attr_oid == oid: | |
| raise ValueError("This attribute has already been set.") | |
| def _convert_to_naive_utc_time(time: datetime.datetime) -> datetime.datetime: | |
| """Normalizes a datetime to a naive datetime in UTC. | |
| time -- datetime to normalize. Assumed to be in UTC if not timezone | |
| aware. | |
| """ | |
| if time.tzinfo is not None: | |
| offset = time.utcoffset() | |
| offset = offset if offset else datetime.timedelta() | |
| return time.replace(tzinfo=None) - offset | |
| else: | |
| return time | |
| class Attribute: | |
| def __init__( | |
| self, | |
| oid: ObjectIdentifier, | |
| value: bytes, | |
| _type: int = _ASN1Type.UTF8String.value, | |
| ) -> None: | |
| self._oid = oid | |
| self._value = value | |
| self._type = _type | |
| def oid(self) -> ObjectIdentifier: | |
| return self._oid | |
| def value(self) -> bytes: | |
| return self._value | |
| def __repr__(self) -> str: | |
| return "<Attribute(oid={}, value={!r})>".format(self.oid, self.value) | |
| def __eq__(self, other: object) -> bool: | |
| if not isinstance(other, Attribute): | |
| return NotImplemented | |
| return ( | |
| self.oid == other.oid | |
| and self.value == other.value | |
| and self._type == other._type | |
| ) | |
| def __hash__(self) -> int: | |
| return hash((self.oid, self.value, self._type)) | |
| class Attributes: | |
| def __init__( | |
| self, | |
| attributes: typing.Iterable[Attribute], | |
| ) -> None: | |
| self._attributes = list(attributes) | |
| __len__, __iter__, __getitem__ = _make_sequence_methods("_attributes") | |
| def __repr__(self) -> str: | |
| return "<Attributes({})>".format(self._attributes) | |
| def get_attribute_for_oid(self, oid: ObjectIdentifier) -> Attribute: | |
| for attr in self: | |
| if attr.oid == oid: | |
| return attr | |
| raise AttributeNotFound("No {} attribute was found".format(oid), oid) | |
| class Version(utils.Enum): | |
| v1 = 0 | |
| v3 = 2 | |
| class InvalidVersion(Exception): | |
| def __init__(self, msg: str, parsed_version: int) -> None: | |
| super(InvalidVersion, self).__init__(msg) | |
| self.parsed_version = parsed_version | |
| class Certificate(metaclass=abc.ABCMeta): | |
| def fingerprint(self, algorithm: hashes.HashAlgorithm) -> bytes: | |
| """ | |
| Returns bytes using digest passed. | |
| """ | |
| def serial_number(self) -> int: | |
| """ | |
| Returns certificate serial number | |
| """ | |
| def version(self) -> Version: | |
| """ | |
| Returns the certificate version | |
| """ | |
| def public_key(self) -> CERTIFICATE_PUBLIC_KEY_TYPES: | |
| """ | |
| Returns the public key | |
| """ | |
| def not_valid_before(self) -> datetime.datetime: | |
| """ | |
| Not before time (represented as UTC datetime) | |
| """ | |
| def not_valid_after(self) -> datetime.datetime: | |
| """ | |
| Not after time (represented as UTC datetime) | |
| """ | |
| def issuer(self) -> Name: | |
| """ | |
| Returns the issuer name object. | |
| """ | |
| def subject(self) -> Name: | |
| """ | |
| Returns the subject name object. | |
| """ | |
| def signature_hash_algorithm( | |
| self, | |
| ) -> typing.Optional[hashes.HashAlgorithm]: | |
| """ | |
| Returns a HashAlgorithm corresponding to the type of the digest signed | |
| in the certificate. | |
| """ | |
| def signature_algorithm_oid(self) -> ObjectIdentifier: | |
| """ | |
| Returns the ObjectIdentifier of the signature algorithm. | |
| """ | |
| def extensions(self) -> Extensions: | |
| """ | |
| Returns an Extensions object. | |
| """ | |
| def signature(self) -> bytes: | |
| """ | |
| Returns the signature bytes. | |
| """ | |
| def tbs_certificate_bytes(self) -> bytes: | |
| """ | |
| Returns the tbsCertificate payload bytes as defined in RFC 5280. | |
| """ | |
| def tbs_precertificate_bytes(self) -> bytes: | |
| """ | |
| Returns the tbsCertificate payload bytes with the SCT list extension | |
| stripped. | |
| """ | |
| def __eq__(self, other: object) -> bool: | |
| """ | |
| Checks equality. | |
| """ | |
| def __hash__(self) -> int: | |
| """ | |
| Computes a hash. | |
| """ | |
| def public_bytes(self, encoding: serialization.Encoding) -> bytes: | |
| """ | |
| Serializes the certificate to PEM or DER format. | |
| """ | |
| # Runtime isinstance checks need this since the rust class is not a subclass. | |
| Certificate.register(rust_x509.Certificate) | |
| class RevokedCertificate(metaclass=abc.ABCMeta): | |
| def serial_number(self) -> int: | |
| """ | |
| Returns the serial number of the revoked certificate. | |
| """ | |
| def revocation_date(self) -> datetime.datetime: | |
| """ | |
| Returns the date of when this certificate was revoked. | |
| """ | |
| def extensions(self) -> Extensions: | |
| """ | |
| Returns an Extensions object containing a list of Revoked extensions. | |
| """ | |
| # Runtime isinstance checks need this since the rust class is not a subclass. | |
| RevokedCertificate.register(rust_x509.RevokedCertificate) | |
| class _RawRevokedCertificate(RevokedCertificate): | |
| def __init__( | |
| self, | |
| serial_number: int, | |
| revocation_date: datetime.datetime, | |
| extensions: Extensions, | |
| ): | |
| self._serial_number = serial_number | |
| self._revocation_date = revocation_date | |
| self._extensions = extensions | |
| def serial_number(self) -> int: | |
| return self._serial_number | |
| def revocation_date(self) -> datetime.datetime: | |
| return self._revocation_date | |
| def extensions(self) -> Extensions: | |
| return self._extensions | |
| class CertificateRevocationList(metaclass=abc.ABCMeta): | |
| def public_bytes(self, encoding: serialization.Encoding) -> bytes: | |
| """ | |
| Serializes the CRL to PEM or DER format. | |
| """ | |
| def fingerprint(self, algorithm: hashes.HashAlgorithm) -> bytes: | |
| """ | |
| Returns bytes using digest passed. | |
| """ | |
| def get_revoked_certificate_by_serial_number( | |
| self, serial_number: int | |
| ) -> typing.Optional[RevokedCertificate]: | |
| """ | |
| Returns an instance of RevokedCertificate or None if the serial_number | |
| is not in the CRL. | |
| """ | |
| def signature_hash_algorithm( | |
| self, | |
| ) -> typing.Optional[hashes.HashAlgorithm]: | |
| """ | |
| Returns a HashAlgorithm corresponding to the type of the digest signed | |
| in the certificate. | |
| """ | |
| def signature_algorithm_oid(self) -> ObjectIdentifier: | |
| """ | |
| Returns the ObjectIdentifier of the signature algorithm. | |
| """ | |
| def issuer(self) -> Name: | |
| """ | |
| Returns the X509Name with the issuer of this CRL. | |
| """ | |
| def next_update(self) -> typing.Optional[datetime.datetime]: | |
| """ | |
| Returns the date of next update for this CRL. | |
| """ | |
| def last_update(self) -> datetime.datetime: | |
| """ | |
| Returns the date of last update for this CRL. | |
| """ | |
| def extensions(self) -> Extensions: | |
| """ | |
| Returns an Extensions object containing a list of CRL extensions. | |
| """ | |
| def signature(self) -> bytes: | |
| """ | |
| Returns the signature bytes. | |
| """ | |
| def tbs_certlist_bytes(self) -> bytes: | |
| """ | |
| Returns the tbsCertList payload bytes as defined in RFC 5280. | |
| """ | |
| def __eq__(self, other: object) -> bool: | |
| """ | |
| Checks equality. | |
| """ | |
| def __len__(self) -> int: | |
| """ | |
| Number of revoked certificates in the CRL. | |
| """ | |
| def __getitem__(self, idx: int) -> RevokedCertificate: | |
| ... | |
| def __getitem__(self, idx: slice) -> typing.List[RevokedCertificate]: | |
| ... | |
| def __getitem__( | |
| self, idx: typing.Union[int, slice] | |
| ) -> typing.Union[RevokedCertificate, typing.List[RevokedCertificate]]: | |
| """ | |
| Returns a revoked certificate (or slice of revoked certificates). | |
| """ | |
| def __iter__(self) -> typing.Iterator[RevokedCertificate]: | |
| """ | |
| Iterator over the revoked certificates | |
| """ | |
| def is_signature_valid( | |
| self, public_key: CERTIFICATE_ISSUER_PUBLIC_KEY_TYPES | |
| ) -> bool: | |
| """ | |
| Verifies signature of revocation list against given public key. | |
| """ | |
| CertificateRevocationList.register(rust_x509.CertificateRevocationList) | |
| class CertificateSigningRequest(metaclass=abc.ABCMeta): | |
| def __eq__(self, other: object) -> bool: | |
| """ | |
| Checks equality. | |
| """ | |
| def __hash__(self) -> int: | |
| """ | |
| Computes a hash. | |
| """ | |
| def public_key(self) -> CERTIFICATE_PUBLIC_KEY_TYPES: | |
| """ | |
| Returns the public key | |
| """ | |
| def subject(self) -> Name: | |
| """ | |
| Returns the subject name object. | |
| """ | |
| def signature_hash_algorithm( | |
| self, | |
| ) -> typing.Optional[hashes.HashAlgorithm]: | |
| """ | |
| Returns a HashAlgorithm corresponding to the type of the digest signed | |
| in the certificate. | |
| """ | |
| def signature_algorithm_oid(self) -> ObjectIdentifier: | |
| """ | |
| Returns the ObjectIdentifier of the signature algorithm. | |
| """ | |
| def extensions(self) -> Extensions: | |
| """ | |
| Returns the extensions in the signing request. | |
| """ | |
| def attributes(self) -> Attributes: | |
| """ | |
| Returns an Attributes object. | |
| """ | |
| def public_bytes(self, encoding: serialization.Encoding) -> bytes: | |
| """ | |
| Encodes the request to PEM or DER format. | |
| """ | |
| def signature(self) -> bytes: | |
| """ | |
| Returns the signature bytes. | |
| """ | |
| def tbs_certrequest_bytes(self) -> bytes: | |
| """ | |
| Returns the PKCS#10 CertificationRequestInfo bytes as defined in RFC | |
| 2986. | |
| """ | |
| def is_signature_valid(self) -> bool: | |
| """ | |
| Verifies signature of signing request. | |
| """ | |
| def get_attribute_for_oid(self, oid: ObjectIdentifier) -> bytes: | |
| """ | |
| Get the attribute value for a given OID. | |
| """ | |
| # Runtime isinstance checks need this since the rust class is not a subclass. | |
| CertificateSigningRequest.register(rust_x509.CertificateSigningRequest) | |
| # Backend argument preserved for API compatibility, but ignored. | |
| def load_pem_x509_certificate( | |
| data: bytes, backend: typing.Any = None | |
| ) -> Certificate: | |
| return rust_x509.load_pem_x509_certificate(data) | |
| def load_pem_x509_certificates(data: bytes) -> typing.List[Certificate]: | |
| return rust_x509.load_pem_x509_certificates(data) | |
| # Backend argument preserved for API compatibility, but ignored. | |
| def load_der_x509_certificate( | |
| data: bytes, backend: typing.Any = None | |
| ) -> Certificate: | |
| return rust_x509.load_der_x509_certificate(data) | |
| # Backend argument preserved for API compatibility, but ignored. | |
| def load_pem_x509_csr( | |
| data: bytes, backend: typing.Any = None | |
| ) -> CertificateSigningRequest: | |
| return rust_x509.load_pem_x509_csr(data) | |
| # Backend argument preserved for API compatibility, but ignored. | |
| def load_der_x509_csr( | |
| data: bytes, backend: typing.Any = None | |
| ) -> CertificateSigningRequest: | |
| return rust_x509.load_der_x509_csr(data) | |
| # Backend argument preserved for API compatibility, but ignored. | |
| def load_pem_x509_crl( | |
| data: bytes, backend: typing.Any = None | |
| ) -> CertificateRevocationList: | |
| return rust_x509.load_pem_x509_crl(data) | |
| # Backend argument preserved for API compatibility, but ignored. | |
| def load_der_x509_crl( | |
| data: bytes, backend: typing.Any = None | |
| ) -> CertificateRevocationList: | |
| return rust_x509.load_der_x509_crl(data) | |
| class CertificateSigningRequestBuilder: | |
| def __init__( | |
| self, | |
| subject_name: typing.Optional[Name] = None, | |
| extensions: typing.List[Extension[ExtensionType]] = [], | |
| attributes: typing.List[ | |
| typing.Tuple[ObjectIdentifier, bytes, typing.Optional[int]] | |
| ] = [], | |
| ): | |
| """ | |
| Creates an empty X.509 certificate request (v1). | |
| """ | |
| self._subject_name = subject_name | |
| self._extensions = extensions | |
| self._attributes = attributes | |
| def subject_name(self, name: Name) -> "CertificateSigningRequestBuilder": | |
| """ | |
| Sets the certificate requestor's distinguished name. | |
| """ | |
| if not isinstance(name, Name): | |
| raise TypeError("Expecting x509.Name object.") | |
| if self._subject_name is not None: | |
| raise ValueError("The subject name may only be set once.") | |
| return CertificateSigningRequestBuilder( | |
| name, self._extensions, self._attributes | |
| ) | |
| def add_extension( | |
| self, extval: ExtensionType, critical: bool | |
| ) -> "CertificateSigningRequestBuilder": | |
| """ | |
| Adds an X.509 extension to the certificate request. | |
| """ | |
| if not isinstance(extval, ExtensionType): | |
| raise TypeError("extension must be an ExtensionType") | |
| extension = Extension(extval.oid, critical, extval) | |
| _reject_duplicate_extension(extension, self._extensions) | |
| return CertificateSigningRequestBuilder( | |
| self._subject_name, | |
| self._extensions + [extension], | |
| self._attributes, | |
| ) | |
| def add_attribute( | |
| self, | |
| oid: ObjectIdentifier, | |
| value: bytes, | |
| *, | |
| _tag: typing.Optional[_ASN1Type] = None, | |
| ) -> "CertificateSigningRequestBuilder": | |
| """ | |
| Adds an X.509 attribute with an OID and associated value. | |
| """ | |
| if not isinstance(oid, ObjectIdentifier): | |
| raise TypeError("oid must be an ObjectIdentifier") | |
| if not isinstance(value, bytes): | |
| raise TypeError("value must be bytes") | |
| if _tag is not None and not isinstance(_tag, _ASN1Type): | |
| raise TypeError("tag must be _ASN1Type") | |
| _reject_duplicate_attribute(oid, self._attributes) | |
| if _tag is not None: | |
| tag = _tag.value | |
| else: | |
| tag = None | |
| return CertificateSigningRequestBuilder( | |
| self._subject_name, | |
| self._extensions, | |
| self._attributes + [(oid, value, tag)], | |
| ) | |
| def sign( | |
| self, | |
| private_key: CERTIFICATE_PRIVATE_KEY_TYPES, | |
| algorithm: typing.Optional[hashes.HashAlgorithm], | |
| backend: typing.Any = None, | |
| ) -> CertificateSigningRequest: | |
| """ | |
| Signs the request using the requestor's private key. | |
| """ | |
| if self._subject_name is None: | |
| raise ValueError("A CertificateSigningRequest must have a subject") | |
| return rust_x509.create_x509_csr(self, private_key, algorithm) | |
| class CertificateBuilder: | |
| _extensions: typing.List[Extension[ExtensionType]] | |
| def __init__( | |
| self, | |
| issuer_name: typing.Optional[Name] = None, | |
| subject_name: typing.Optional[Name] = None, | |
| public_key: typing.Optional[CERTIFICATE_PUBLIC_KEY_TYPES] = None, | |
| serial_number: typing.Optional[int] = None, | |
| not_valid_before: typing.Optional[datetime.datetime] = None, | |
| not_valid_after: typing.Optional[datetime.datetime] = None, | |
| extensions: typing.List[Extension[ExtensionType]] = [], | |
| ) -> None: | |
| self._version = Version.v3 | |
| self._issuer_name = issuer_name | |
| self._subject_name = subject_name | |
| self._public_key = public_key | |
| self._serial_number = serial_number | |
| self._not_valid_before = not_valid_before | |
| self._not_valid_after = not_valid_after | |
| self._extensions = extensions | |
| def issuer_name(self, name: Name) -> "CertificateBuilder": | |
| """ | |
| Sets the CA's distinguished name. | |
| """ | |
| if not isinstance(name, Name): | |
| raise TypeError("Expecting x509.Name object.") | |
| if self._issuer_name is not None: | |
| raise ValueError("The issuer name may only be set once.") | |
| return CertificateBuilder( | |
| name, | |
| self._subject_name, | |
| self._public_key, | |
| self._serial_number, | |
| self._not_valid_before, | |
| self._not_valid_after, | |
| self._extensions, | |
| ) | |
| def subject_name(self, name: Name) -> "CertificateBuilder": | |
| """ | |
| Sets the requestor's distinguished name. | |
| """ | |
| if not isinstance(name, Name): | |
| raise TypeError("Expecting x509.Name object.") | |
| if self._subject_name is not None: | |
| raise ValueError("The subject name may only be set once.") | |
| return CertificateBuilder( | |
| self._issuer_name, | |
| name, | |
| self._public_key, | |
| self._serial_number, | |
| self._not_valid_before, | |
| self._not_valid_after, | |
| self._extensions, | |
| ) | |
| def public_key( | |
| self, | |
| key: CERTIFICATE_PUBLIC_KEY_TYPES, | |
| ) -> "CertificateBuilder": | |
| """ | |
| Sets the requestor's public key (as found in the signing request). | |
| """ | |
| if not isinstance( | |
| key, | |
| ( | |
| dsa.DSAPublicKey, | |
| rsa.RSAPublicKey, | |
| ec.EllipticCurvePublicKey, | |
| ed25519.Ed25519PublicKey, | |
| ed448.Ed448PublicKey, | |
| x25519.X25519PublicKey, | |
| x448.X448PublicKey, | |
| ), | |
| ): | |
| raise TypeError( | |
| "Expecting one of DSAPublicKey, RSAPublicKey," | |
| " EllipticCurvePublicKey, Ed25519PublicKey," | |
| " Ed448PublicKey, X25519PublicKey, or " | |
| "X448PublicKey." | |
| ) | |
| if self._public_key is not None: | |
| raise ValueError("The public key may only be set once.") | |
| return CertificateBuilder( | |
| self._issuer_name, | |
| self._subject_name, | |
| key, | |
| self._serial_number, | |
| self._not_valid_before, | |
| self._not_valid_after, | |
| self._extensions, | |
| ) | |
| def serial_number(self, number: int) -> "CertificateBuilder": | |
| """ | |
| Sets the certificate serial number. | |
| """ | |
| if not isinstance(number, int): | |
| raise TypeError("Serial number must be of integral type.") | |
| if self._serial_number is not None: | |
| raise ValueError("The serial number may only be set once.") | |
| if number <= 0: | |
| raise ValueError("The serial number should be positive.") | |
| # ASN.1 integers are always signed, so most significant bit must be | |
| # zero. | |
| if number.bit_length() >= 160: # As defined in RFC 5280 | |
| raise ValueError( | |
| "The serial number should not be more than 159 " "bits." | |
| ) | |
| return CertificateBuilder( | |
| self._issuer_name, | |
| self._subject_name, | |
| self._public_key, | |
| number, | |
| self._not_valid_before, | |
| self._not_valid_after, | |
| self._extensions, | |
| ) | |
| def not_valid_before( | |
| self, time: datetime.datetime | |
| ) -> "CertificateBuilder": | |
| """ | |
| Sets the certificate activation time. | |
| """ | |
| if not isinstance(time, datetime.datetime): | |
| raise TypeError("Expecting datetime object.") | |
| if self._not_valid_before is not None: | |
| raise ValueError("The not valid before may only be set once.") | |
| time = _convert_to_naive_utc_time(time) | |
| if time < _EARLIEST_UTC_TIME: | |
| raise ValueError( | |
| "The not valid before date must be on or after" | |
| " 1950 January 1)." | |
| ) | |
| if self._not_valid_after is not None and time > self._not_valid_after: | |
| raise ValueError( | |
| "The not valid before date must be before the not valid after " | |
| "date." | |
| ) | |
| return CertificateBuilder( | |
| self._issuer_name, | |
| self._subject_name, | |
| self._public_key, | |
| self._serial_number, | |
| time, | |
| self._not_valid_after, | |
| self._extensions, | |
| ) | |
| def not_valid_after(self, time: datetime.datetime) -> "CertificateBuilder": | |
| """ | |
| Sets the certificate expiration time. | |
| """ | |
| if not isinstance(time, datetime.datetime): | |
| raise TypeError("Expecting datetime object.") | |
| if self._not_valid_after is not None: | |
| raise ValueError("The not valid after may only be set once.") | |
| time = _convert_to_naive_utc_time(time) | |
| if time < _EARLIEST_UTC_TIME: | |
| raise ValueError( | |
| "The not valid after date must be on or after" | |
| " 1950 January 1." | |
| ) | |
| if ( | |
| self._not_valid_before is not None | |
| and time < self._not_valid_before | |
| ): | |
| raise ValueError( | |
| "The not valid after date must be after the not valid before " | |
| "date." | |
| ) | |
| return CertificateBuilder( | |
| self._issuer_name, | |
| self._subject_name, | |
| self._public_key, | |
| self._serial_number, | |
| self._not_valid_before, | |
| time, | |
| self._extensions, | |
| ) | |
| def add_extension( | |
| self, extval: ExtensionType, critical: bool | |
| ) -> "CertificateBuilder": | |
| """ | |
| Adds an X.509 extension to the certificate. | |
| """ | |
| if not isinstance(extval, ExtensionType): | |
| raise TypeError("extension must be an ExtensionType") | |
| extension = Extension(extval.oid, critical, extval) | |
| _reject_duplicate_extension(extension, self._extensions) | |
| return CertificateBuilder( | |
| self._issuer_name, | |
| self._subject_name, | |
| self._public_key, | |
| self._serial_number, | |
| self._not_valid_before, | |
| self._not_valid_after, | |
| self._extensions + [extension], | |
| ) | |
| def sign( | |
| self, | |
| private_key: CERTIFICATE_PRIVATE_KEY_TYPES, | |
| algorithm: typing.Optional[hashes.HashAlgorithm], | |
| backend: typing.Any = None, | |
| ) -> Certificate: | |
| """ | |
| Signs the certificate using the CA's private key. | |
| """ | |
| if self._subject_name is None: | |
| raise ValueError("A certificate must have a subject name") | |
| if self._issuer_name is None: | |
| raise ValueError("A certificate must have an issuer name") | |
| if self._serial_number is None: | |
| raise ValueError("A certificate must have a serial number") | |
| if self._not_valid_before is None: | |
| raise ValueError("A certificate must have a not valid before time") | |
| if self._not_valid_after is None: | |
| raise ValueError("A certificate must have a not valid after time") | |
| if self._public_key is None: | |
| raise ValueError("A certificate must have a public key") | |
| return rust_x509.create_x509_certificate(self, private_key, algorithm) | |
| class CertificateRevocationListBuilder: | |
| _extensions: typing.List[Extension[ExtensionType]] | |
| _revoked_certificates: typing.List[RevokedCertificate] | |
| def __init__( | |
| self, | |
| issuer_name: typing.Optional[Name] = None, | |
| last_update: typing.Optional[datetime.datetime] = None, | |
| next_update: typing.Optional[datetime.datetime] = None, | |
| extensions: typing.List[Extension[ExtensionType]] = [], | |
| revoked_certificates: typing.List[RevokedCertificate] = [], | |
| ): | |
| self._issuer_name = issuer_name | |
| self._last_update = last_update | |
| self._next_update = next_update | |
| self._extensions = extensions | |
| self._revoked_certificates = revoked_certificates | |
| def issuer_name( | |
| self, issuer_name: Name | |
| ) -> "CertificateRevocationListBuilder": | |
| if not isinstance(issuer_name, Name): | |
| raise TypeError("Expecting x509.Name object.") | |
| if self._issuer_name is not None: | |
| raise ValueError("The issuer name may only be set once.") | |
| return CertificateRevocationListBuilder( | |
| issuer_name, | |
| self._last_update, | |
| self._next_update, | |
| self._extensions, | |
| self._revoked_certificates, | |
| ) | |
| def last_update( | |
| self, last_update: datetime.datetime | |
| ) -> "CertificateRevocationListBuilder": | |
| if not isinstance(last_update, datetime.datetime): | |
| raise TypeError("Expecting datetime object.") | |
| if self._last_update is not None: | |
| raise ValueError("Last update may only be set once.") | |
| last_update = _convert_to_naive_utc_time(last_update) | |
| if last_update < _EARLIEST_UTC_TIME: | |
| raise ValueError( | |
| "The last update date must be on or after" " 1950 January 1." | |
| ) | |
| if self._next_update is not None and last_update > self._next_update: | |
| raise ValueError( | |
| "The last update date must be before the next update date." | |
| ) | |
| return CertificateRevocationListBuilder( | |
| self._issuer_name, | |
| last_update, | |
| self._next_update, | |
| self._extensions, | |
| self._revoked_certificates, | |
| ) | |
| def next_update( | |
| self, next_update: datetime.datetime | |
| ) -> "CertificateRevocationListBuilder": | |
| if not isinstance(next_update, datetime.datetime): | |
| raise TypeError("Expecting datetime object.") | |
| if self._next_update is not None: | |
| raise ValueError("Last update may only be set once.") | |
| next_update = _convert_to_naive_utc_time(next_update) | |
| if next_update < _EARLIEST_UTC_TIME: | |
| raise ValueError( | |
| "The last update date must be on or after" " 1950 January 1." | |
| ) | |
| if self._last_update is not None and next_update < self._last_update: | |
| raise ValueError( | |
| "The next update date must be after the last update date." | |
| ) | |
| return CertificateRevocationListBuilder( | |
| self._issuer_name, | |
| self._last_update, | |
| next_update, | |
| self._extensions, | |
| self._revoked_certificates, | |
| ) | |
| def add_extension( | |
| self, extval: ExtensionType, critical: bool | |
| ) -> "CertificateRevocationListBuilder": | |
| """ | |
| Adds an X.509 extension to the certificate revocation list. | |
| """ | |
| if not isinstance(extval, ExtensionType): | |
| raise TypeError("extension must be an ExtensionType") | |
| extension = Extension(extval.oid, critical, extval) | |
| _reject_duplicate_extension(extension, self._extensions) | |
| return CertificateRevocationListBuilder( | |
| self._issuer_name, | |
| self._last_update, | |
| self._next_update, | |
| self._extensions + [extension], | |
| self._revoked_certificates, | |
| ) | |
| def add_revoked_certificate( | |
| self, revoked_certificate: RevokedCertificate | |
| ) -> "CertificateRevocationListBuilder": | |
| """ | |
| Adds a revoked certificate to the CRL. | |
| """ | |
| if not isinstance(revoked_certificate, RevokedCertificate): | |
| raise TypeError("Must be an instance of RevokedCertificate") | |
| return CertificateRevocationListBuilder( | |
| self._issuer_name, | |
| self._last_update, | |
| self._next_update, | |
| self._extensions, | |
| self._revoked_certificates + [revoked_certificate], | |
| ) | |
| def sign( | |
| self, | |
| private_key: CERTIFICATE_PRIVATE_KEY_TYPES, | |
| algorithm: typing.Optional[hashes.HashAlgorithm], | |
| backend: typing.Any = None, | |
| ) -> CertificateRevocationList: | |
| if self._issuer_name is None: | |
| raise ValueError("A CRL must have an issuer name") | |
| if self._last_update is None: | |
| raise ValueError("A CRL must have a last update time") | |
| if self._next_update is None: | |
| raise ValueError("A CRL must have a next update time") | |
| return rust_x509.create_x509_crl(self, private_key, algorithm) | |
| class RevokedCertificateBuilder: | |
| def __init__( | |
| self, | |
| serial_number: typing.Optional[int] = None, | |
| revocation_date: typing.Optional[datetime.datetime] = None, | |
| extensions: typing.List[Extension[ExtensionType]] = [], | |
| ): | |
| self._serial_number = serial_number | |
| self._revocation_date = revocation_date | |
| self._extensions = extensions | |
| def serial_number(self, number: int) -> "RevokedCertificateBuilder": | |
| if not isinstance(number, int): | |
| raise TypeError("Serial number must be of integral type.") | |
| if self._serial_number is not None: | |
| raise ValueError("The serial number may only be set once.") | |
| if number <= 0: | |
| raise ValueError("The serial number should be positive") | |
| # ASN.1 integers are always signed, so most significant bit must be | |
| # zero. | |
| if number.bit_length() >= 160: # As defined in RFC 5280 | |
| raise ValueError( | |
| "The serial number should not be more than 159 " "bits." | |
| ) | |
| return RevokedCertificateBuilder( | |
| number, self._revocation_date, self._extensions | |
| ) | |
| def revocation_date( | |
| self, time: datetime.datetime | |
| ) -> "RevokedCertificateBuilder": | |
| if not isinstance(time, datetime.datetime): | |
| raise TypeError("Expecting datetime object.") | |
| if self._revocation_date is not None: | |
| raise ValueError("The revocation date may only be set once.") | |
| time = _convert_to_naive_utc_time(time) | |
| if time < _EARLIEST_UTC_TIME: | |
| raise ValueError( | |
| "The revocation date must be on or after" " 1950 January 1." | |
| ) | |
| return RevokedCertificateBuilder( | |
| self._serial_number, time, self._extensions | |
| ) | |
| def add_extension( | |
| self, extval: ExtensionType, critical: bool | |
| ) -> "RevokedCertificateBuilder": | |
| if not isinstance(extval, ExtensionType): | |
| raise TypeError("extension must be an ExtensionType") | |
| extension = Extension(extval.oid, critical, extval) | |
| _reject_duplicate_extension(extension, self._extensions) | |
| return RevokedCertificateBuilder( | |
| self._serial_number, | |
| self._revocation_date, | |
| self._extensions + [extension], | |
| ) | |
| def build(self, backend: typing.Any = None) -> RevokedCertificate: | |
| if self._serial_number is None: | |
| raise ValueError("A revoked certificate must have a serial number") | |
| if self._revocation_date is None: | |
| raise ValueError( | |
| "A revoked certificate must have a revocation date" | |
| ) | |
| return _RawRevokedCertificate( | |
| self._serial_number, | |
| self._revocation_date, | |
| Extensions(self._extensions), | |
| ) | |
| def random_serial_number() -> int: | |
| return int.from_bytes(os.urandom(20), "big") >> 1 | |