diff options
-rw-r--r-- | libmproxy/authentication.py | 122 | ||||
-rw-r--r-- | libmproxy/contrib/md5crypt.py | 94 | ||||
-rw-r--r-- | libmproxy/proxy.py | 13 | ||||
-rw-r--r-- | test/test_authentication.py | 58 | ||||
-rw-r--r-- | test/test_proxy.py | 8 | ||||
-rw-r--r-- | test/test_server.py | 2 |
6 files changed, 15 insertions, 282 deletions
diff --git a/libmproxy/authentication.py b/libmproxy/authentication.py deleted file mode 100644 index 500ead6b..00000000 --- a/libmproxy/authentication.py +++ /dev/null @@ -1,122 +0,0 @@ -import binascii -import contrib.md5crypt as md5crypt - -class NullProxyAuth(): - """ - No proxy auth at all (returns empty challange headers) - """ - def __init__(self, password_manager): - self.password_manager = password_manager - self.username = "" - - def clean(self, headers): - """ - Clean up authentication headers, so they're not passed upstream. - """ - pass - - def authenticate(self, headers): - """ - Tests that the user is allowed to use the proxy - """ - return True - - def auth_challenge_headers(self): - """ - Returns a dictionary containing the headers require to challenge the user - """ - return {} - - -class BasicProxyAuth(NullProxyAuth): - CHALLENGE_HEADER = 'Proxy-Authenticate' - AUTH_HEADER = 'Proxy-Authorization' - def __init__(self, password_manager, realm): - NullProxyAuth.__init__(self, password_manager) - self.realm = realm - - def clean(self, headers): - del headers[self.AUTH_HEADER] - - def authenticate(self, headers): - auth_value = headers.get(self.AUTH_HEADER, []) - if not auth_value: - return False - try: - scheme, username, password = self.parse_auth_value(auth_value[0]) - except ValueError: - return False - if scheme.lower()!='basic': - return False - if not self.password_manager.test(username, password): - return False - self.username = username - return True - - def auth_challenge_headers(self): - return {self.CHALLENGE_HEADER:'Basic realm="%s"'%self.realm} - - def unparse_auth_value(self, scheme, username, password): - v = binascii.b2a_base64(username + ":" + password) - return scheme + " " + v - - def parse_auth_value(self, auth_value): - words = auth_value.split() - if len(words) != 2: - raise ValueError("Invalid basic auth credential.") - scheme = words[0] - try: - user = binascii.a2b_base64(words[1]) - except binascii.Error: - raise ValueError("Invalid basic auth credential: user:password pair not valid base64: %s"%words[1]) - parts = user.split(':') - if len(parts) != 2: - raise ValueError("Invalid basic auth credential: decoded user:password pair not valid: %s"%user) - return scheme, parts[0], parts[1] - - -class PasswordManager(): - def __init__(self): - pass - - def test(self, username, password_token): - return False - - -class PermissivePasswordManager(PasswordManager): - def __init__(self): - PasswordManager.__init__(self) - - def test(self, username, password_token): - if username: - return True - return False - - -class HtpasswdPasswordManager(PasswordManager): - """ - Read usernames and passwords from a file created by Apache htpasswd - """ - def __init__(self, filehandle): - PasswordManager.__init__(self) - entries = (line.strip().split(':') for line in filehandle) - valid_entries = (entry for entry in entries if len(entry)==2) - self.usernames = {username:token for username,token in valid_entries} - - def test(self, username, password_token): - if username not in self.usernames: - return False - full_token = self.usernames[username] - dummy, magic, salt, hashed_password = full_token.split('$') - expected = md5crypt.md5crypt(password_token, salt, '$'+magic+'$') - return expected==full_token - - -class SingleUserPasswordManager(PasswordManager): - def __init__(self, username, password): - PasswordManager.__init__(self) - self.username = username - self.password = password - - def test(self, username, password_token): - return self.username==username and self.password==password_token diff --git a/libmproxy/contrib/md5crypt.py b/libmproxy/contrib/md5crypt.py deleted file mode 100644 index d64ea8ac..00000000 --- a/libmproxy/contrib/md5crypt.py +++ /dev/null @@ -1,94 +0,0 @@ -# Based on FreeBSD src/lib/libcrypt/crypt.c 1.2 -# http://www.freebsd.org/cgi/cvsweb.cgi/~checkout~/src/lib/libcrypt/crypt.c?rev=1.2&content-type=text/plain - -# Original license: -# * "THE BEER-WARE LICENSE" (Revision 42): -# * <phk@login.dknet.dk> wrote this file. As long as you retain this notice you -# * can do whatever you want with this stuff. If we meet some day, and you think -# * this stuff is worth it, you can buy me a beer in return. Poul-Henning Kamp - -# This port adds no further stipulations. I forfeit any copyright interest. - -import md5 - -def md5crypt(password, salt, magic='$1$'): - # /* The password first, since that is what is most unknown */ /* Then our magic string */ /* Then the raw salt */ - m = md5.new() - m.update(password + magic + salt) - - # /* Then just as many characters of the MD5(pw,salt,pw) */ - mixin = md5.md5(password + salt + password).digest() - for i in range(0, len(password)): - m.update(mixin[i % 16]) - - # /* Then something really weird... */ - # Also really broken, as far as I can tell. -m - i = len(password) - while i: - if i & 1: - m.update('\x00') - else: - m.update(password[0]) - i >>= 1 - - final = m.digest() - - # /* and now, just to make sure things don't run too fast */ - for i in range(1000): - m2 = md5.md5() - if i & 1: - m2.update(password) - else: - m2.update(final) - - if i % 3: - m2.update(salt) - - if i % 7: - m2.update(password) - - if i & 1: - m2.update(final) - else: - m2.update(password) - - final = m2.digest() - - # This is the bit that uses to64() in the original code. - - itoa64 = './0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz' - - rearranged = '' - for a, b, c in ((0, 6, 12), (1, 7, 13), (2, 8, 14), (3, 9, 15), (4, 10, 5)): - v = ord(final[a]) << 16 | ord(final[b]) << 8 | ord(final[c]) - for i in range(4): - rearranged += itoa64[v & 0x3f]; v >>= 6 - - v = ord(final[11]) - for i in range(2): - rearranged += itoa64[v & 0x3f]; v >>= 6 - - return magic + salt + '$' + rearranged - -if __name__ == '__main__': - - def test(clear_password, the_hash): - magic, salt = the_hash[1:].split('$')[:2] - magic = '$' + magic + '$' - return md5crypt(clear_password, salt, magic) == the_hash - - test_cases = ( - (' ', '$1$yiiZbNIH$YiCsHZjcTkYd31wkgW8JF.'), - ('pass', '$1$YeNsbWdH$wvOF8JdqsoiLix754LTW90'), - ('____fifteen____', '$1$s9lUWACI$Kk1jtIVVdmT01p0z3b/hw1'), - ('____sixteen_____', '$1$dL3xbVZI$kkgqhCanLdxODGq14g/tW1'), - ('____seventeen____', '$1$NaH5na7J$j7y8Iss0hcRbu3kzoJs5V.'), - ('__________thirty-three___________', '$1$HO7Q6vzJ$yGwp2wbL5D7eOVzOmxpsy.'), - ('apache', '$apr1$J.w5a/..$IW9y6DR0oO/ADuhlMF5/X1') - ) - - for clearpw, hashpw in test_cases: - if test(clearpw, hashpw): - print '%s: pass' % clearpw - else: - print '%s: FAIL' % clearpw diff --git a/libmproxy/proxy.py b/libmproxy/proxy.py index 458ea2b5..1bf57b8a 100644 --- a/libmproxy/proxy.py +++ b/libmproxy/proxy.py @@ -16,9 +16,8 @@ import sys, os, string, socket, time import shutil, tempfile, threading import SocketServer from OpenSSL import SSL -from netlib import odict, tcp, http, wsgi, certutils, http_status +from netlib import odict, tcp, http, wsgi, certutils, http_status, http_auth import utils, flow, version, platform, controller -import authentication KILL = 0 @@ -619,14 +618,14 @@ def process_proxy_options(parser, options): if len(options.auth_singleuser.split(':')) != 2: parser.error("Please specify user in the format username:password") username, password = options.auth_singleuser.split(':') - password_manager = authentication.SingleUserPasswordManager(username, password) + password_manager = http_auth.PassManSingleUser(username, password) elif options.auth_nonanonymous: - password_manager = authentication.PermissivePasswordManager() + password_manager = http_auth.PassManNonAnon() elif options.auth_htpasswd: - password_manager = authentication.HtpasswdPasswordManager(options.auth_htpasswd) - authenticator = authentication.BasicProxyAuth(password_manager, "mitmproxy") + password_manager = http_auth.PassManHtpasswd(options.auth_htpasswd) + authenticator = http_auth.BasicProxyAuth(password_manager, "mitmproxy") else: - authenticator = authentication.NullProxyAuth(None) + authenticator = http_auth.NullProxyAuth(None) return ProxyConfig( certfile = options.cert, diff --git a/test/test_authentication.py b/test/test_authentication.py deleted file mode 100644 index f7a5ecd3..00000000 --- a/test/test_authentication.py +++ /dev/null @@ -1,58 +0,0 @@ -import binascii -from libmproxy import authentication -from netlib import odict -import tutils - - -class TestNullProxyAuth: - def test_simple(self): - na = authentication.NullProxyAuth(authentication.PermissivePasswordManager()) - assert not na.auth_challenge_headers() - assert na.authenticate("foo") - na.clean({}) - - -class TestBasicProxyAuth: - def test_simple(self): - ba = authentication.BasicProxyAuth(authentication.PermissivePasswordManager(), "test") - h = odict.ODictCaseless() - assert ba.auth_challenge_headers() - assert not ba.authenticate(h) - - def test_parse_auth_value(self): - ba = authentication.BasicProxyAuth(authentication.PermissivePasswordManager(), "test") - vals = ("basic", "foo", "bar") - assert ba.parse_auth_value(ba.unparse_auth_value(*vals)) == vals - tutils.raises(ValueError, ba.parse_auth_value, "") - tutils.raises(ValueError, ba.parse_auth_value, "foo bar") - - v = "basic " + binascii.b2a_base64("foo") - tutils.raises(ValueError, ba.parse_auth_value, v) - - def test_authenticate_clean(self): - ba = authentication.BasicProxyAuth(authentication.PermissivePasswordManager(), "test") - - hdrs = odict.ODictCaseless() - vals = ("basic", "foo", "bar") - hdrs[ba.AUTH_HEADER] = [ba.unparse_auth_value(*vals)] - assert ba.authenticate(hdrs) - - ba.clean(hdrs) - assert not ba.AUTH_HEADER in hdrs - - - hdrs[ba.AUTH_HEADER] = [""] - assert not ba.authenticate(hdrs) - - hdrs[ba.AUTH_HEADER] = ["foo"] - assert not ba.authenticate(hdrs) - - vals = ("foo", "foo", "bar") - hdrs[ba.AUTH_HEADER] = [ba.unparse_auth_value(*vals)] - assert not ba.authenticate(hdrs) - - ba = authentication.BasicProxyAuth(authentication.PasswordManager(), "test") - vals = ("basic", "foo", "bar") - hdrs[ba.AUTH_HEADER] = [ba.unparse_auth_value(*vals)] - assert not ba.authenticate(hdrs) - diff --git a/test/test_proxy.py b/test/test_proxy.py index 0788edbf..2babe51c 100644 --- a/test/test_proxy.py +++ b/test/test_proxy.py @@ -58,3 +58,11 @@ class TestServerConnection: sc.connection.close = mock.Mock(side_effect=IOError) sc.terminate() + +class TestProcessOptions: + def test_auth(self): + parser = mock.MagicMock() + + + + diff --git a/test/test_server.py b/test/test_server.py index 3a1b019f..0af4bae8 100644 --- a/test/test_server.py +++ b/test/test_server.py @@ -132,7 +132,7 @@ class TestHTTP(tservers.HTTPProxTest, CommonMixin): def test_invalid_headers(self): p = self.pathoc() req = p.request("get:'http://foo':h':foo'='bar'") - print req + assert req.status_code == 400 class TestHTTPConnectSSLError(tservers.HTTPProxTest): |