diff options
author | Maximilian Hils <git@maximilianhils.com> | 2015-09-11 12:13:39 +0200 |
---|---|---|
committer | Maximilian Hils <git@maximilianhils.com> | 2015-09-11 12:13:39 +0200 |
commit | 7c186a4edbb0c6bc1f471d0db62dfc60008160a2 (patch) | |
tree | 89fb7e5b14cdfe9a0b61da828ddac8f355a3cdd0 /examples/tls_passthrough.py | |
parent | b62498e125191beca3b49841eb5f1fb9a93a868a (diff) | |
parent | dd414e485212e3cab612a66d5d858c1a766ace04 (diff) | |
download | mitmproxy-7c186a4edbb0c6bc1f471d0db62dfc60008160a2.tar.gz mitmproxy-7c186a4edbb0c6bc1f471d0db62dfc60008160a2.tar.bz2 mitmproxy-7c186a4edbb0c6bc1f471d0db62dfc60008160a2.zip |
Merge branch 'master' into contentviews
Diffstat (limited to 'examples/tls_passthrough.py')
-rw-r--r-- | examples/tls_passthrough.py | 136 |
1 files changed, 136 insertions, 0 deletions
diff --git a/examples/tls_passthrough.py b/examples/tls_passthrough.py new file mode 100644 index 00000000..7b4dec62 --- /dev/null +++ b/examples/tls_passthrough.py @@ -0,0 +1,136 @@ +""" +This inline script allows conditional TLS Interception based +on a user-defined strategy. + +Example: + + > mitmdump -s tls_passthrough.py + + 1. curl --proxy http://localhost:8080 https://example.com --insecure + // works - we'll also see the contents in mitmproxy + + 2. curl --proxy http://localhost:8080 https://example.com --insecure + // still works - we'll also see the contents in mitmproxy + + 3. curl --proxy http://localhost:8080 https://example.com + // fails with a certificate error, which we will also see in mitmproxy + + 4. curl --proxy http://localhost:8080 https://example.com + // works again, but mitmproxy does not intercept and we do *not* see the contents + +Authors: Maximilian Hils, Matthew Tuusberg +""" +from __future__ import (absolute_import, print_function, division) +import collections +import random + +from enum import Enum + +from libmproxy.exceptions import TlsException +from libmproxy.protocol import TlsLayer, RawTCPLayer + + +class InterceptionResult(Enum): + success = True + failure = False + skipped = None + + +class _TlsStrategy(object): + """ + Abstract base class for interception strategies. + """ + def __init__(self): + # A server_address -> interception results mapping + self.history = collections.defaultdict(lambda: collections.deque(maxlen=200)) + + def should_intercept(self, server_address): + """ + Returns: + True, if we should attempt to intercept the connection. + False, if we want to employ pass-through instead. + """ + raise NotImplementedError() + + def record_success(self, server_address): + self.history[server_address].append(InterceptionResult.success) + + def record_failure(self, server_address): + self.history[server_address].append(InterceptionResult.failure) + + def record_skipped(self, server_address): + self.history[server_address].append(InterceptionResult.skipped) + + +class ConservativeStrategy(_TlsStrategy): + """ + Conservative Interception Strategy - only intercept if there haven't been any failed attempts + in the history. + """ + + def should_intercept(self, server_address): + if InterceptionResult.failure in self.history[server_address]: + return False + return True + + +class ProbabilisticStrategy(_TlsStrategy): + """ + Fixed probability that we intercept a given connection. + """ + def __init__(self, p): + self.p = p + super(ProbabilisticStrategy, self).__init__() + + def should_intercept(self, server_address): + return random.uniform(0, 1) < self.p + + +class TlsFeedback(TlsLayer): + """ + Monkey-patch _establish_tls_with_client to get feedback if TLS could be established + successfully on the client connection (which may fail due to cert pinning). + """ + + def _establish_tls_with_client(self): + server_address = self.server_conn.address + tls_strategy = self.script_context.tls_strategy + + try: + super(TlsFeedback, self)._establish_tls_with_client() + except TlsException as e: + tls_strategy.record_failure(server_address) + raise e + else: + tls_strategy.record_success(server_address) + + +# inline script hooks below. + + +def start(context, argv): + if len(argv) == 2: + context.tls_strategy = ProbabilisticStrategy(float(argv[1])) + else: + context.tls_strategy = ConservativeStrategy() + + +def next_layer(context, next_layer): + """ + This hook does the actual magic - if the next layer is planned to be a TLS layer, + we check if we want to enter pass-through mode instead. + """ + if isinstance(next_layer, TlsLayer) and next_layer._client_tls: + server_address = next_layer.server_conn.address + + if context.tls_strategy.should_intercept(server_address): + # We try to intercept. + # Monkey-Patch the layer to get feedback from the TLSLayer if interception worked. + next_layer.__class__ = TlsFeedback + next_layer.script_context = context + else: + # We don't intercept - reply with a pass-through layer and add a "skipped" entry. + context.log("TLS passthrough for %s" % repr(next_layer.server_conn.address), "info") + next_layer_replacement = RawTCPLayer(next_layer.ctx, logging=False) + next_layer.reply(next_layer_replacement) + context.tls_strategy.record_skipped(server_address) |