aboutsummaryrefslogtreecommitdiffstats
path: root/libmproxy/certutils.py
diff options
context:
space:
mode:
Diffstat (limited to 'libmproxy/certutils.py')
-rw-r--r--libmproxy/certutils.py234
1 files changed, 234 insertions, 0 deletions
diff --git a/libmproxy/certutils.py b/libmproxy/certutils.py
new file mode 100644
index 00000000..c1e5d93e
--- /dev/null
+++ b/libmproxy/certutils.py
@@ -0,0 +1,234 @@
+import subprocess, os, ssl, hashlib, socket
+from pyasn1.type import univ, constraint, char, namedtype, tag
+from pyasn1.codec.der.decoder import decode
+import OpenSSL
+import utils
+
+CERT_SLEEP_TIME = 1
+CERT_EXPIRY = str(365 * 3)
+
+
+def dummy_ca(path):
+ """
+ Creates a dummy CA, and writes it to path.
+
+ This function also creates the necessary directories if they don't exist.
+
+ Returns True if operation succeeded, False if not.
+ """
+ dirname = os.path.dirname(path)
+ if not os.path.exists(dirname):
+ os.makedirs(dirname)
+
+ if path.endswith(".pem"):
+ basename, _ = os.path.splitext(path)
+ else:
+ basename = path
+
+ cmd = [
+ "openssl",
+ "req",
+ "-new",
+ "-x509",
+ "-config", utils.pkg_data.path("resources/ca.cnf"),
+ "-nodes",
+ "-days", CERT_EXPIRY,
+ "-out", path,
+ "-newkey", "rsa:1024",
+ "-keyout", path,
+ ]
+ ret = subprocess.call(
+ cmd,
+ stderr=subprocess.PIPE,
+ stdout=subprocess.PIPE,
+ stdin=subprocess.PIPE
+ )
+ # begin nocover
+ if ret:
+ return False
+ # end nocover
+
+ cmd = [
+ "openssl",
+ "pkcs12",
+ "-export",
+ "-password", "pass:",
+ "-nokeys",
+ "-in", path,
+ "-out", os.path.join(dirname, basename + "-cert.p12")
+ ]
+ ret = subprocess.call(
+ cmd,
+ stderr=subprocess.PIPE,
+ stdout=subprocess.PIPE,
+ stdin=subprocess.PIPE
+ )
+ # begin nocover
+ if ret:
+ return False
+ # end nocover
+ cmd = [
+ "openssl",
+ "x509",
+ "-in", path,
+ "-out", os.path.join(dirname, basename + "-cert.pem")
+ ]
+ ret = subprocess.call(
+ cmd,
+ stderr=subprocess.PIPE,
+ stdout=subprocess.PIPE,
+ stdin=subprocess.PIPE
+ )
+ # begin nocover
+ if ret:
+ return False
+ # end nocover
+
+ return True
+
+
+def dummy_cert(certdir, ca, commonname, sans):
+ """
+ certdir: Certificate directory.
+ ca: Path to the certificate authority file, or None.
+ commonname: Common name for the generated certificate.
+
+ Returns cert path if operation succeeded, None if not.
+ """
+ namehash = hashlib.sha256(commonname).hexdigest()
+ certpath = os.path.join(certdir, namehash + ".pem")
+ if os.path.exists(certpath):
+ return certpath
+
+ confpath = os.path.join(certdir, namehash + ".cnf")
+ reqpath = os.path.join(certdir, namehash + ".req")
+
+ template = open(utils.pkg_data.path("resources/cert.cnf")).read()
+
+ ss = []
+ for i, v in enumerate(sans):
+ ss.append("DNS.%s = %s"%(i+1, v))
+ ss = "\n".join(ss)
+
+ f = open(confpath, "w")
+ f.write(
+ template%(
+ dict(
+ commonname=commonname,
+ sans=ss,
+ altnames="subjectAltName = @alt_names" if ss else ""
+ )
+ )
+ )
+ f.close()
+
+ if ca:
+ # Create a dummy signed certificate. Uses same key as the signing CA
+ cmd = [
+ "openssl",
+ "req",
+ "-new",
+ "-config", confpath,
+ "-out", reqpath,
+ "-key", ca,
+ ]
+ ret = subprocess.call(
+ cmd,
+ stderr=subprocess.PIPE,
+ stdout=subprocess.PIPE,
+ stdin=subprocess.PIPE
+ )
+ if ret: return None
+ cmd = [
+ "openssl",
+ "x509",
+ "-req",
+ "-in", reqpath,
+ "-days", CERT_EXPIRY,
+ "-out", certpath,
+ "-CA", ca,
+ "-CAcreateserial",
+ "-extfile", confpath,
+ "-extensions", "v3_cert_req",
+ ]
+ ret = subprocess.call(
+ cmd,
+ stderr=subprocess.PIPE,
+ stdout=subprocess.PIPE,
+ stdin=subprocess.PIPE
+ )
+ if ret: return None
+ else:
+ # Create a new selfsigned certificate + key
+ cmd = [
+ "openssl",
+ "req",
+ "-new",
+ "-x509",
+ "-config", confpath,
+ "-nodes",
+ "-days", CERT_EXPIRY,
+ "-out", certpath,
+ "-newkey", "rsa:1024",
+ "-keyout", certpath,
+ ]
+ ret = subprocess.call(
+ cmd,
+ stderr=subprocess.PIPE,
+ stdout=subprocess.PIPE,
+ stdin=subprocess.PIPE
+ )
+ if ret: return None
+ return certpath
+
+
+class GeneralName(univ.Choice):
+ # We are only interested in dNSNames. We use a default handler to ignore
+ # other types.
+ componentType = namedtype.NamedTypes(
+ namedtype.NamedType('dNSName', char.IA5String().subtype(
+ implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 2)
+ )
+ ),
+ )
+
+
+class GeneralNames(univ.SequenceOf):
+ componentType = GeneralName()
+ sizeSpec = univ.SequenceOf.sizeSpec + constraint.ValueSizeConstraint(1, 1024)
+
+
+class SSLCert:
+ def __init__(self, pemtxt):
+ """
+ Returns a (common name, [subject alternative names]) tuple.
+ """
+ self.cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, pemtxt)
+
+ @property
+ def cn(self):
+ cn = None
+ for i in self.cert.get_subject().get_components():
+ if i[0] == "CN":
+ cn = i[1]
+ return cn
+
+ @property
+ def altnames(self):
+ altnames = []
+ for i in range(self.cert.get_extension_count()):
+ ext = self.cert.get_extension(i)
+ if ext.get_short_name() == "subjectAltName":
+ dec = decode(ext.get_data(), asn1Spec=GeneralNames())
+ for i in dec[0]:
+ altnames.append(i[0])
+ return altnames
+
+
+
+def get_remote_cert(host, port):
+ addr = socket.gethostbyname(host)
+ s = ssl.get_server_certificate((addr, port))
+ return SSLCert(s)
+
+