diff options
Diffstat (limited to 'test')
-rw-r--r-- | test/.pry | 5 | ||||
-rw-r--r-- | test/data/serverkey.pem | 32 | ||||
-rw-r--r-- | test/data/testkey.pem | 32 | ||||
-rw-r--r-- | test/handler.py | 25 | ||||
-rw-r--r-- | test/serv.py | 10 | ||||
-rw-r--r-- | test/sslserv.py | 22 | ||||
-rw-r--r-- | test/test_console.py | 269 | ||||
-rw-r--r-- | test/test_filt.py | 220 | ||||
-rw-r--r-- | test/test_proxy.py | 259 | ||||
-rw-r--r-- | test/test_utils.py | 221 | ||||
-rwxr-xr-x | test/tserv | 30 |
11 files changed, 1125 insertions, 0 deletions
diff --git a/test/.pry b/test/.pry new file mode 100644 index 00000000..0e4b2e23 --- /dev/null +++ b/test/.pry @@ -0,0 +1,5 @@ +base = .. +coverage = ../libmproxy +exclude = ../libmproxy/pyparsing.py + . + diff --git a/test/data/serverkey.pem b/test/data/serverkey.pem new file mode 100644 index 00000000..289bfa71 --- /dev/null +++ b/test/data/serverkey.pem @@ -0,0 +1,32 @@ +-----BEGIN RSA PRIVATE KEY----- +MIICXQIBAAKBgQC+N+9bv1YC0GKbGdv2wMuuWTGSNwE/Hq5IIxYN1eITsvbD1GgB +69x++XJd6KTIthnta0KCpCAtbaYbCkhUfxCVv2bP+iQt2AjwMOZlgRZ+RGJ25dBu +AjAxQmqDJcAdS6MoRHWziomnUNfNogVrfqjpvJor+1iRnrj2q00ab9WYCwIDAQAB +AoGBAIM7V9l2UcKzPbQ/zO+Z52urgXWcmTGQ2zBNdIOrEcQBbhmAyxi4PnEja3G6 +dSU77PtNSp+S19g/k5+IIoqY9zkGigdaPhRVRKJgBTAzFzMz+WHpQIffDojFKCnL +gyDnzMRJY8+cnsCqbHRY4hqFiCr8Rq9sCdlynAytdtrnxzqhAkEA9bha6MO+L0JA +6IEEbVY1vtaUO9Xg5DUDjRxQcfniSJACb/2IvF0tvxAnG7I/S8AavCXqtlDPtYkI +WOxY5Sd62QJBAMYtKUxGka4XxwCyBK8EUNaN8m9C++mpjoHD1kFri9B1bXm91nCO +iGWqtqdarwyEc/pAHw5UGzVyBXticPIcs4MCQQCcPvsHsZhYoq91aLyw7bXFQNsH +ZUvYsOEuNIfuwa+i5ne2UKhG5pU1PgcwNFrNRz140D98aMx7KcS2DqvEIyOZAkBF +6Yi4L+0Uza6WwDaGx679AfaU6byVIgv0G3JqgdZBJCwK1r3f12im9SKax5MZh2Ci +2Bwcoe83W5IzhPbzcsyhAkBo8O2U2vig5PQWQ0BUKJrCGHLq//D/ttdLVtmc6eWc +zqssCF3Unkk3bOq35swSKeAx8WotPPVsALWr87N2hCB+ +-----END RSA PRIVATE KEY----- +-----BEGIN CERTIFICATE----- +MIICsDCCAhmgAwIBAgIJANwogM9sqMHLMA0GCSqGSIb3DQEBBQUAMEUxCzAJBgNV +BAYTAkFVMRMwEQYDVQQIEwpTb21lLVN0YXRlMSEwHwYDVQQKExhJbnRlcm5ldCBX +aWRnaXRzIFB0eSBMdGQwHhcNMTAwMTMxMDEzOTEzWhcNMTEwMTMxMDEzOTEzWjBF +MQswCQYDVQQGEwJBVTETMBEGA1UECBMKU29tZS1TdGF0ZTEhMB8GA1UEChMYSW50 +ZXJuZXQgV2lkZ2l0cyBQdHkgTHRkMIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKB +gQC+N+9bv1YC0GKbGdv2wMuuWTGSNwE/Hq5IIxYN1eITsvbD1GgB69x++XJd6KTI +thnta0KCpCAtbaYbCkhUfxCVv2bP+iQt2AjwMOZlgRZ+RGJ25dBuAjAxQmqDJcAd +S6MoRHWziomnUNfNogVrfqjpvJor+1iRnrj2q00ab9WYCwIDAQABo4GnMIGkMB0G +A1UdDgQWBBTTnBZyw7ZZsb8+/6gvZFIHhVgtDzB1BgNVHSMEbjBsgBTTnBZyw7ZZ +sb8+/6gvZFIHhVgtD6FJpEcwRTELMAkGA1UEBhMCQVUxEzARBgNVBAgTClNvbWUt +U3RhdGUxITAfBgNVBAoTGEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZIIJANwogM9s +qMHLMAwGA1UdEwQFMAMBAf8wDQYJKoZIhvcNAQEFBQADgYEApz428aOar0EBuAib +I+liefRlK4I3MQQxq3tOeB1dgAIo0ivKtdVJGi1kPg8EO0KMvFfn6IRtssUmFgCp +JBD+HoDzFxwI1bLMVni+g7OzaNSwL3nQ94lZUdpWMYDxqY4bLUv3goX1TlN9lmpG +8FiBLYUC0RNTCCRDFGfDr/wUT/M= +-----END CERTIFICATE----- diff --git a/test/data/testkey.pem b/test/data/testkey.pem new file mode 100644 index 00000000..af8d9d8f --- /dev/null +++ b/test/data/testkey.pem @@ -0,0 +1,32 @@ +-----BEGIN RSA PRIVATE KEY----- +MIICXQIBAAKBgQC+6rG6A/BGD0dI+mh2FZIqQZn82z/pGs4f3pyxbHb+ROxjjQOr +fDCw2jc11XDxK7CXpDQAnkO6au/sQ5t50vSZ+PGhFD+t558VV2ausB5OYZsR7RRx +gl1jsxWdde3EHGjxSK+aXRgFpVrZzPLSy6dl8tMoqUMWIBi0u1WTbmyYjwIDAQAB +AoGBAKyqhmK9/Sjf2JDgKGnjyHX/Ls3JXVvtqk6Yfw7YEiaVH1ZJyu/lOgQ414YQ +rDzyTpxXHdERUh/fZ24/FvZvHFgy5gWEQjQPpprIxvqCLKJhX73L2+TnXmfYDApb +J7V/JfnTeOaK9LTpHsofB98A1s9DWX/ccOgKTtZIYMjYpdoBAkEA9hLvtixbO2A2 +ZgDcA9ftVX2WwdpRH+mYXl1G60Fem5nlO3Rl3FDoafRvSQNZiqyOlObvKbbYh/S2 +L7ihEMMNYQJBAMaeLnAc9jO/z4ApTqSBGUpM9b7ul16aSgq56saUI0VULIZcXeo3 +3BwdL2fEOOnzjNy6NpH2BW63h/+2t7lV++8CQQDK+S+1Sr0uKtx0Iv1YRkHEJMW3 +vQbxldNS8wnOf6s0GisVcZubsTkkPLWWuiaf1ln9xMc9106gRmAI2PgyRVHBAkA6 +iI+C9uYP5i1Oxd2pWWqMnRWnSUVO2gWMF7J7B1lFq0Lb7gi3Z/L0Th2UZR2oxN/0 +hORkK676LBhmYgDPG+n9AkAJOnPIFQVAEBAO9bAxFrje8z6GRt332IlgxuiTeDE3 +EAlH9tmZma4Tri4sWnhJwCsxl+5hWamI8NL4EIeXRvPw +-----END RSA PRIVATE KEY----- +-----BEGIN CERTIFICATE----- +MIICsDCCAhmgAwIBAgIJAI7G7a/d5YwEMA0GCSqGSIb3DQEBBQUAMEUxCzAJBgNV +BAYTAkFVMRMwEQYDVQQIEwpTb21lLVN0YXRlMSEwHwYDVQQKExhJbnRlcm5ldCBX +aWRnaXRzIFB0eSBMdGQwHhcNMTAwMjAyMDM0MTExWhcNMTEwMjAyMDM0MTExWjBF +MQswCQYDVQQGEwJBVTETMBEGA1UECBMKU29tZS1TdGF0ZTEhMB8GA1UEChMYSW50 +ZXJuZXQgV2lkZ2l0cyBQdHkgTHRkMIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKB +gQC+6rG6A/BGD0dI+mh2FZIqQZn82z/pGs4f3pyxbHb+ROxjjQOrfDCw2jc11XDx +K7CXpDQAnkO6au/sQ5t50vSZ+PGhFD+t558VV2ausB5OYZsR7RRxgl1jsxWdde3E +HGjxSK+aXRgFpVrZzPLSy6dl8tMoqUMWIBi0u1WTbmyYjwIDAQABo4GnMIGkMB0G +A1UdDgQWBBS+MFJTsriCPNYsj8/4f+PympPEkzB1BgNVHSMEbjBsgBS+MFJTsriC +PNYsj8/4f+PympPEk6FJpEcwRTELMAkGA1UEBhMCQVUxEzARBgNVBAgTClNvbWUt +U3RhdGUxITAfBgNVBAoTGEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZIIJAI7G7a/d +5YwEMAwGA1UdEwQFMAMBAf8wDQYJKoZIhvcNAQEFBQADgYEAlpan/QX2fpXVRihV +lQic2DktF4xd5unrZnFC8X8ScNX1ClU+AO79ejaobt4YGjeVYs0iQQsUL2E0G43c +mOXfsq1b970Ep6xRS76EmZ+tTdFBd86tFTIhZJrOi67gs+twj5V2elyp3tQpg2ze +G/jwDQS8V1X9CbfqBQriL7x5Tk4= +-----END CERTIFICATE----- diff --git a/test/handler.py b/test/handler.py new file mode 100644 index 00000000..5803b4d1 --- /dev/null +++ b/test/handler.py @@ -0,0 +1,25 @@ +import socket +from BaseHTTPServer import BaseHTTPRequestHandler + + +class TestRequestHandler(BaseHTTPRequestHandler): + default_request_version = "HTTP/1.1" + def setup(self): + self.connection = self.request + self.rfile = socket._fileobject(self.request, "rb", self.rbufsize) + self.wfile = socket._fileobject(self.request, "wb", self.wbufsize) + + def log_message(self, *args, **kwargs): + pass + + def do_GET(self): + data = "data: %s\npath: %s\n"%(self.headers, self.path) + self.send_response(200) + self.send_header("proxtest", "testing") + self.send_header("Content-type", "text-html") + self.send_header("Content-length", len(data)) + self.end_headers() + self.wfile.write(data) + + + diff --git a/test/serv.py b/test/serv.py new file mode 100644 index 00000000..9e43c08f --- /dev/null +++ b/test/serv.py @@ -0,0 +1,10 @@ +import socket, os, cStringIO, tempfile +from SocketServer import BaseServer +from BaseHTTPServer import HTTPServer +import handler + +def make(port): + server_address = ('', port) + return HTTPServer(server_address, handler.TestRequestHandler) + + diff --git a/test/sslserv.py b/test/sslserv.py new file mode 100644 index 00000000..5153d0da --- /dev/null +++ b/test/sslserv.py @@ -0,0 +1,22 @@ +import socket, os, cStringIO, tempfile +from SocketServer import BaseServer +from BaseHTTPServer import HTTPServer +import ssl +import handler + + +class SecureHTTPServer(HTTPServer): + def __init__(self, server_address, HandlerClass): + BaseServer.__init__(self, server_address, HandlerClass) + self.socket = ssl.wrap_socket( + socket.socket(self.address_family, self.socket_type), + keyfile = "data/serverkey.pem", + certfile = "data/serverkey.pem" + ) + self.server_bind() + self.server_activate() + + +def make(port): + server_address = ('', port) + return SecureHTTPServer(server_address, handler.TestRequestHandler) diff --git a/test/test_console.py b/test/test_console.py new file mode 100644 index 00000000..50780aa5 --- /dev/null +++ b/test/test_console.py @@ -0,0 +1,269 @@ +from libmproxy import console, proxy, utils, filt +import libpry + +def treq(conn=None): + if not conn: + conn = proxy.BrowserConnection("address", 22) + headers = utils.Headers() + headers["header"] = ["qvalue"] + return proxy.Request(conn, "host", 80, "http", "GET", "/path", headers, "content") + + +def tresp(req=None): + if not req: + req = treq() + headers = utils.Headers() + headers["header_response"] = ["svalue"] + return proxy.Response(req, 200, "HTTP/1.1", "message", headers, "content_response") + + +def tflow(): + bc = proxy.BrowserConnection("address", 22) + return console.Flow(bc) + + +class uState(libpry.AutoTree): + def test_backup(self): + bc = proxy.BrowserConnection("address", 22) + c = console.State() + f = console.Flow(bc) + c.add_browserconnect(f) + + f.backup() + c.revert(f) + + def test_flow(self): + """ + normal flow: + + connect -> request -> response + """ + bc = proxy.BrowserConnection("address", 22) + c = console.State() + f = console.Flow(bc) + c.add_browserconnect(f) + assert c.lookup(bc) + assert c.get_focus() == (f, 0) + + req = treq(bc) + assert c.add_request(req) + assert len(c.flow_list) == 1 + assert c.lookup(req) + + newreq = treq() + assert not c.add_request(newreq) + assert not c.lookup(newreq) + + resp = tresp(req) + assert c.add_response(resp) + assert len(c.flow_list) == 1 + assert f.waiting == False + assert c.lookup(resp) + + newresp = tresp() + assert not c.add_response(newresp) + assert not c.lookup(newresp) + + def test_err(self): + bc = proxy.BrowserConnection("address", 22) + c = console.State() + f = console.Flow(bc) + c.add_browserconnect(f) + e = proxy.Error(bc, "message") + assert c.add_error(e) + + e = proxy.Error(proxy.BrowserConnection("address", 22), "message") + assert not c.add_error(e) + + def test_view(self): + c = console.State() + + f = tflow() + c.add_browserconnect(f) + assert len(c.view) == 1 + c.set_limit(filt.parse("~q")) + assert len(c.view) == 0 + c.set_limit(None) + + + f = tflow() + req = treq(f.connection) + c.add_browserconnect(f) + c.add_request(req) + assert len(c.view) == 2 + c.set_limit(filt.parse("~q")) + assert len(c.view) == 1 + c.set_limit(filt.parse("~s")) + assert len(c.view) == 0 + + def test_focus(self): + """ + normal flow: + + connect -> request -> response + """ + c = console.State() + + bc = proxy.BrowserConnection("address", 22) + f = console.Flow(bc) + c.add_browserconnect(f) + assert c.get_focus() == (f, 0) + assert c.get_from_pos(0) == (f, 0) + assert c.get_from_pos(1) == (None, None) + assert c.get_next(0) == (None, None) + + bc2 = proxy.BrowserConnection("address", 22) + f2 = console.Flow(bc2) + c.add_browserconnect(f2) + assert c.get_focus() == (f, 1) + assert c.get_next(0) == (f, 1) + assert c.get_prev(1) == (f2, 0) + assert c.get_next(1) == (None, None) + + c.set_focus(0) + assert c.get_focus() == (f2, 0) + c.set_focus(-1) + assert c.get_focus() == (f2, 0) + + c.delete_flow(f2) + assert c.get_focus() == (f, 0) + c.delete_flow(f) + assert c.get_focus() == (None, None) + + def _add_request(self, state): + f = tflow() + state.add_browserconnect(f) + q = treq(f.connection) + state.add_request(q) + return f + + def _add_response(self, state): + f = self._add_request(state) + r = tresp(f.request) + state.add_response(r) + + def test_focus_view(self): + c = console.State() + self._add_request(c) + self._add_response(c) + self._add_request(c) + self._add_response(c) + self._add_request(c) + self._add_response(c) + c.set_limit(filt.parse("~q")) + assert len(c.view) == 3 + assert c.focus == 2 + + def test_delete_last(self): + c = console.State() + f1 = tflow() + f2 = tflow() + c.add_browserconnect(f1) + c.add_browserconnect(f2) + c.set_focus(1) + c.delete_flow(f1) + assert c.focus == 0 + + def test_kill_flow(self): + c = console.State() + f = tflow() + c.add_browserconnect(f) + c.kill_flow(f) + assert not c.flow_list + + def test_clear(self): + c = console.State() + f = tflow() + c.add_browserconnect(f) + f.intercepting = True + + c.clear() + assert len(c.flow_list) == 1 + f.intercepting = False + c.clear() + assert len(c.flow_list) == 0 + + +class uFlow(libpry.AutoTree): + def test_match(self): + f = tflow() + f.response = tresp() + f.request = f.response.request + assert not f.match(filt.parse("~b test")) + + def test_backup(self): + f = tflow() + f.backup() + f.revert() + + def test_simple(self): + f = tflow() + assert f.get_text() + + f.request = treq() + assert f.get_text() + + f.response = tresp() + f.response.headers["content-type"] = ["text/html"] + assert f.get_text() + f.response.code = 404 + assert f.get_text() + + f.focus = True + assert f.get_text() + + f.connection = console.ReplayConnection() + assert f.get_text() + + f.response = None + assert f.get_text() + + f.error = proxy.Error(200, "test") + assert f.get_text() + + def test_kill(self): + f = tflow() + f.request = treq() + f.intercept() + assert not f.request.acked + f.kill() + assert f.request.acked + f.intercept() + f.response = tresp() + f.request = f.response.request + f.request.ack() + assert not f.response.acked + f.kill() + assert f.response.acked + + def test_accept_intercept(self): + f = tflow() + f.request = treq() + f.intercept() + assert not f.request.acked + f.accept_intercept() + assert f.request.acked + f.response = tresp() + f.request = f.response.request + f.intercept() + f.request.ack() + assert not f.response.acked + f.accept_intercept() + assert f.response.acked + + +class uformat_keyvals(libpry.AutoTree): + def test_simple(self): + assert console.format_keyvals( + [ + ("aa", "bb"), + ("cc", "dd"), + ] + ) + + +tests = [ + uFlow(), + uformat_keyvals(), + uState() +] diff --git a/test/test_filt.py b/test/test_filt.py new file mode 100644 index 00000000..3cf0f6cd --- /dev/null +++ b/test/test_filt.py @@ -0,0 +1,220 @@ +import cStringIO +from libmproxy import filt, proxy, utils +import libpry + + +class uParsing(libpry.AutoTree): + def _dump(self, x): + c = cStringIO.StringIO() + x.dump(fp=c) + assert c.getvalue() + + def test_simple(self): + assert not filt.parse("~b") + assert filt.parse("~q") + assert filt.parse("~c 10") + assert filt.parse("~u foobar") + assert filt.parse("~q ~c 10") + p = filt.parse("~q ~c 10") + self._dump(p) + assert len(p.lst) == 2 + + def test_naked_url(self): + #a = filt.parse("foobar") + #assert a.lst[0].expr == "foobar" + + a = filt.parse("foobar ~h rex") + assert a.lst[0].expr == "foobar" + assert a.lst[1].expr == "rex" + self._dump(a) + + def test_quoting(self): + a = filt.parse("~u 'foo ~u bar' ~u voing") + assert a.lst[0].expr == "foo ~u bar" + assert a.lst[1].expr == "voing" + self._dump(a) + + a = filt.parse("~u foobar") + assert a.expr == "foobar" + + a = filt.parse(r"~u 'foobar\"\''") + assert a.expr == "foobar\"'" + + a = filt.parse(r'~u "foo \'bar"') + assert a.expr == "foo 'bar" + + def test_nesting(self): + a = filt.parse("(~u foobar & ~h voing)") + assert a.lst[0].expr == "foobar" + self._dump(a) + + def test_not(self): + a = filt.parse("!~h test") + assert a.itm.expr == "test" + a = filt.parse("!(~u test & ~h bar)") + assert a.itm.lst[0].expr == "test" + self._dump(a) + + def test_binaryops(self): + a = filt.parse("~u foobar | ~h voing") + isinstance(a, filt.FOr) + self._dump(a) + + a = filt.parse("~u foobar & ~h voing") + isinstance(a, filt.FAnd) + self._dump(a) + + def test_wideops(self): + a = filt.parse("~hq 'header: qvalue'") + assert isinstance(a, filt.FHeadRequest) + self._dump(a) + + +class uMatching(libpry.AutoTree): + def req(self): + conn = proxy.BrowserConnection("one", 2222) + headers = utils.Headers() + headers["header"] = ["qvalue"] + return proxy.Request( + conn, + "host", + 80, + "http", + "GET", + "/path", + headers, + "content_request" + ) + + def resp(self): + q = self.req() + headers = utils.Headers() + headers["header_response"] = ["svalue"] + return proxy.Response( + q, + 200, + "HTTP/1.1", + "message", + headers, + "content_response" + ) + + def q(self, q, o): + return filt.parse(q)(o) + + def test_fcontenttype(self): + q = self.req() + s = self.resp() + assert not self.q("~t content", q) + assert not self.q("~t content", s) + + q.headers["content-type"] = ["text/json"] + assert self.q("~t json", q) + assert self.q("~tq json", q) + assert not self.q("~ts json", q) + + s.headers["content-type"] = ["text/json"] + assert self.q("~t json", s) + + del s.headers["content-type"] + s.request.headers["content-type"] = ["text/json"] + assert self.q("~t json", s) + assert self.q("~tq json", s) + assert not self.q("~ts json", s) + + def test_freq_fresp(self): + q = self.req() + s = self.resp() + + assert self.q("~q", q) + assert not self.q("~q", s) + + assert not self.q("~s", q) + assert self.q("~s", s) + + def test_head(self): + q = self.req() + s = self.resp() + assert not self.q("~h nonexistent", q) + assert self.q("~h qvalue", q) + assert self.q("~h header", q) + assert self.q("~h 'header: qvalue'", q) + + assert self.q("~h 'header: qvalue'", s) + assert self.q("~h 'header_response: svalue'", s) + + assert self.q("~hq 'header: qvalue'", s) + assert not self.q("~hq 'header_response: svalue'", s) + + assert self.q("~hq 'header: qvalue'", q) + assert not self.q("~hq 'header_request: svalue'", q) + + assert not self.q("~hs 'header: qvalue'", s) + assert self.q("~hs 'header_response: svalue'", s) + assert not self.q("~hs 'header: qvalue'", q) + + def test_body(self): + q = self.req() + s = self.resp() + assert not self.q("~b nonexistent", q) + assert self.q("~b content", q) + assert self.q("~b response", s) + assert self.q("~b content_request", s) + + assert self.q("~bq content", q) + assert self.q("~bq content", s) + assert not self.q("~bq response", q) + assert not self.q("~bq response", s) + + assert not self.q("~bs content", q) + assert self.q("~bs content", s) + assert not self.q("~bs nomatch", s) + assert not self.q("~bs response", q) + assert self.q("~bs response", s) + + def test_url(self): + q = self.req() + s = self.resp() + assert self.q("~u host", q) + assert self.q("~u host/path", q) + assert not self.q("~u moo/path", q) + + assert self.q("~u host", s) + assert self.q("~u host/path", s) + assert not self.q("~u moo/path", s) + + def test_code(self): + q = self.req() + s = self.resp() + assert not self.q("~c 200", q) + assert self.q("~c 200", s) + assert not self.q("~c 201", s) + + def test_and(self): + s = self.resp() + assert self.q("~c 200 & ~h head", s) + assert not self.q("~c 200 & ~h nohead", s) + assert self.q("(~c 200 & ~h head) & ~b content", s) + assert not self.q("(~c 200 & ~h head) & ~b nonexistent", s) + assert not self.q("(~c 200 & ~h nohead) & ~b content", s) + + def test_or(self): + s = self.resp() + assert self.q("~c 200 | ~h nohead", s) + assert self.q("~c 201 | ~h head", s) + assert not self.q("~c 201 | ~h nohead", s) + assert self.q("(~c 201 | ~h nohead) | ~s", s) + assert not self.q("(~c 201 | ~h nohead) | ~q", s) + + def test_not(self): + s = self.resp() + assert not self.q("! ~c 200", s) + assert self.q("! ~c 201", s) + assert self.q("!~c 201 !~c 202", s) + assert not self.q("!~c 201 !~c 200", s) + + +tests = [ + uMatching(), + uParsing() +] diff --git a/test/test_proxy.py b/test/test_proxy.py new file mode 100644 index 00000000..90cfbbfb --- /dev/null +++ b/test/test_proxy.py @@ -0,0 +1,259 @@ +import threading, urllib, Queue, urllib2, cStringIO +import libpry +import serv, sslserv +from libmproxy import proxy, controller, utils +import random + +# Yes, the random ports are horrible. During development, sockets are often not +# properly closed during error conditions, which means you have to wait until +# you can re-bind to the same port. This is a pain in the ass, so we just pick +# a random port and keep moving. +PROXL_PORT = random.randint(10000, 20000) +HTTP_PORT = random.randint(20000, 30000) +HTTPS_PORT = random.randint(30000, 40000) + + +class TestMaster(controller.Master): + def __init__(self, port, testq): + serv = proxy.ProxyServer(port) + controller.Master.__init__(self, serv) + self.testq = testq + self.log = [] + + def clear(self): + self.log = [] + + def handle(self, m): + self.log.append(m) + m.ack() + + +class ProxyThread(threading.Thread): + def __init__(self, port, testq): + self.tmaster = TestMaster(port, testq) + threading.Thread.__init__(self) + + def run(self): + self.tmaster.run() + + def shutdown(self): + self.tmaster.shutdown() + + +class ServerThread(threading.Thread): + def __init__(self, server): + self.server = server + threading.Thread.__init__(self) + + def run(self): + self.server.serve_forever() + + def shutdown(self): + self.server.shutdown() + + +class _TestServers(libpry.TestContainer): + def setUpAll(self): + proxy.config = proxy.Config("data/testkey.pem") + self.tqueue = Queue.Queue() + # We don't make any concurrent requests, so we can access + # the attributes on this object safely. + self.proxthread = ProxyThread(PROXL_PORT, self.tqueue) + self.threads = [ + ServerThread(serv.make(HTTP_PORT)), + ServerThread(sslserv.make(HTTPS_PORT)), + self.proxthread + ] + for i in self.threads: + i.start() + + def setUp(self): + self.proxthread.tmaster.clear() + + def tearDownAll(self): + for i in self.threads: + i.shutdown() + + +class _ProxTests(libpry.AutoTree): + def log(self): + pthread = self.findAttr("proxthread") + return pthread.tmaster.log + + +class uSanity(_ProxTests): + def test_http(self): + """ + Just check that the HTTP server is running. + """ + f = urllib.urlopen("http://127.0.0.1:%s"%HTTP_PORT) + assert f.read() + + def test_https(self): + """ + Just check that the HTTPS server is running. + """ + f = urllib.urlopen("https://127.0.0.1:%s"%HTTPS_PORT) + assert f.read() + + +class uProxy(_ProxTests): + HOST = "127.0.0.1" + def _get(self, host=HOST): + r = urllib2.Request("http://%s:%s"%(host, HTTP_PORT)) + r.set_proxy("127.0.0.1:%s"%PROXL_PORT, "http") + return urllib2.urlopen(r) + + def _sget(self, host=HOST): + proxy_support = urllib2.ProxyHandler( + {"https" : "https://127.0.0.1:%s"%PROXL_PORT} + ) + opener = urllib2.build_opener(proxy_support) + r = urllib2.Request("https://%s:%s"%(host, HTTPS_PORT)) + return opener.open(r) + + def test_http(self): + f = self._get() + assert f.code == 200 + assert f.read() + f.close() + + l = self.log() + assert l[0].address + assert l[1].headers.has_key("host") + assert l[2].code == 200 + + def test_https(self): + f = self._sget() + assert f.code == 200 + assert f.read() + f.close() + + l = self.log() + assert l[0].address + assert l[1].headers.has_key("host") + assert l[2].code == 200 + + # Disable these two for now: they take a long time. + def _test_http_nonexistent(self): + f = self._get("nonexistent") + assert f.code == 200 + assert "Error" in f.read() + + def _test_https_nonexistent(self): + f = self._sget("nonexistent") + assert f.code == 200 + assert "Error" in f.read() + + + +class u_parse_proxy_request(libpry.AutoTree): + def test_simple(self): + libpry.raises(proxy.ProxyError, proxy.parse_proxy_request, "") + + u = "GET ... HTTP/1.1" + libpry.raises("invalid url", proxy.parse_proxy_request, u) + + u = "MORK / HTTP/1.1" + libpry.raises("unknown request method", proxy.parse_proxy_request, u) + + u = "GET http://foo.com:8888/test HTTP/1.1" + m, s, h, po, pa = proxy.parse_proxy_request(u) + assert m == "GET" + assert s == "http" + assert h == "foo.com" + assert po == 8888 + assert pa == "/test" + + def test_connect(self): + u = "CONNECT host.com:443 HTTP/1.0" + expected = ('CONNECT', None, 'host.com', 443, None) + ret = proxy.parse_proxy_request(u) + assert expected == ret + + def test_inner(self): + u = "GET / HTTP/1.1" + assert proxy.parse_proxy_request(u) == ('GET', None, None, None, '/') + + +class u_parse_url(libpry.AutoTree): + def test_simple(self): + assert not proxy.parse_url("") + + u = "http://foo.com:8888/test" + s, h, po, pa = proxy.parse_url(u) + assert s == "http" + assert h == "foo.com" + assert po == 8888 + assert pa == "/test" + + s, h, po, pa = proxy.parse_url("http://foo/bar") + assert s == "http" + assert h == "foo" + assert po == 80 + assert pa == "/bar" + + s, h, po, pa = proxy.parse_url("http://foo") + assert pa == "/" + + +class uConfig(libpry.AutoTree): + def test_pem(self): + c = proxy.Config(pemfile="data/testkey.pem") + assert c.pemfile + + +class uFileLike(libpry.AutoTree): + def test_wrap(self): + s = cStringIO.StringIO("foobar\nfoobar") + s = proxy.FileLike(s) + s.flush() + assert s.readline() == "foobar\n" + assert s.readline() == "foobar" + + +class uRequest(libpry.AutoTree): + def test_simple(self): + h = utils.Headers() + h["test"] = ["test"] + c = proxy.BrowserConnection("addr", 2222) + r = proxy.Request(c, "host", 22, "https", "GET", "/", h, "content") + u = r.url() + assert r.set_url(u) + assert not r.set_url("") + assert r.url() == u + assert r.short() + assert r.assemble() + + +class uResponse(libpry.AutoTree): + def test_simple(self): + h = utils.Headers() + h["test"] = ["test"] + c = proxy.BrowserConnection("addr", 2222) + req = proxy.Request(c, "host", 22, "https", "GET", "/", h, "content") + resp = proxy.Response(req, 200, "HTTP", "msg", h.copy(), "content") + assert resp.short() + assert resp.assemble() + + +class uProxyError(libpry.AutoTree): + def test_simple(self): + p = proxy.ProxyError(111, "msg") + assert repr(p) + + + +tests = [ + uProxyError(), + uRequest(), + uResponse(), + uFileLike(), + uConfig(), + u_parse_proxy_request(), + u_parse_url(), + _TestServers(), [ + uSanity(), + uProxy(), + ] +] diff --git a/test/test_utils.py b/test/test_utils.py new file mode 100644 index 00000000..8a4da968 --- /dev/null +++ b/test/test_utils.py @@ -0,0 +1,221 @@ +import textwrap, cStringIO, os +import libpry +from libmproxy import utils + + +class uisBin(libpry.AutoTree): + def test_simple(self): + assert not utils.isBin("testing\n\r") + assert utils.isBin("testing\x01") + assert utils.isBin("testing\x0e") + assert utils.isBin("testing\x7f") + + +class uhexdump(libpry.AutoTree): + def test_simple(self): + assert utils.hexdump("one\0"*10) + + +class upretty_size(libpry.AutoTree): + def test_simple(self): + assert utils.pretty_size(100) == "100B" + assert utils.pretty_size(1024) == "1kB" + assert utils.pretty_size(1024 + (1024/2)) == "1.5kB" + assert utils.pretty_size(1024*1024) == "1M" + + +class uData(libpry.AutoTree): + def test_nonexistent(self): + libpry.raises("does not exist", utils.data.path, "nonexistent") + + +class uMultiDict(libpry.AutoTree): + def setUp(self): + self.md = utils.MultiDict() + + def test_setget(self): + assert not self.md.has_key("foo") + self.md.append("foo", 1) + assert self.md["foo"] == [1] + assert self.md.has_key("foo") + + def test_del(self): + self.md.append("foo", 1) + del self.md["foo"] + assert not self.md.has_key("foo") + + def test_extend(self): + self.md.append("foo", 1) + self.md.extend("foo", [2, 3]) + assert self.md["foo"] == [1, 2, 3] + + def test_extend_err(self): + self.md.append("foo", 1) + libpry.raises("not iterable", self.md.extend, "foo", 2) + + def test_get(self): + self.md.append("foo", 1) + self.md.append("foo", 2) + assert self.md.get("foo") == [1, 2] + assert self.md.get("bar") == None + + def test_caseSensitivity(self): + self.md._helper = (utils._caseless,) + self.md["foo"] = [1] + self.md.append("FOO", 2) + assert self.md["foo"] == [1, 2] + assert self.md["FOO"] == [1, 2] + assert self.md.has_key("FoO") + + def test_dict(self): + self.md.append("foo", 1) + self.md.append("foo", 2) + self.md["bar"] = [3] + assert self.md == self.md + assert dict(self.md) == self.md + + def test_copy(self): + self.md["foo"] = [1, 2] + self.md["bar"] = [3, 4] + md2 = self.md.copy() + assert md2 == self.md + assert id(md2) != id(self.md) + + def test_clear(self): + self.md["foo"] = [1, 2] + self.md["bar"] = [3, 4] + self.md.clear() + assert not self.md.keys() + + def test_setitem(self): + libpry.raises(ValueError, self.md.__setitem__, "foo", "bar") + self.md["foo"] = ["bar"] + assert self.md["foo"] == ["bar"] + + def test_itemPairs(self): + self.md.append("foo", 1) + self.md.append("foo", 2) + self.md.append("bar", 3) + l = list(self.md.itemPairs()) + assert len(l) == 3 + assert ("foo", 1) in l + assert ("foo", 2) in l + assert ("bar", 3) in l + + +class uHeaders(libpry.AutoTree): + def setUp(self): + self.hd = utils.Headers() + + def test_read_simple(self): + data = """ + Header: one + Header2: two + \r\n + """ + data = textwrap.dedent(data) + data = data.strip() + s = cStringIO.StringIO(data) + self.hd.read(s) + assert self.hd["header"] == ["one"] + assert self.hd["header2"] == ["two"] + + def test_read_multi(self): + data = """ + Header: one + Header: two + \r\n + """ + data = textwrap.dedent(data) + data = data.strip() + s = cStringIO.StringIO(data) + self.hd.read(s) + assert self.hd["header"] == ["one", "two"] + + def test_read_continued(self): + data = """ + Header: one + \ttwo + Header2: three + \r\n + """ + data = textwrap.dedent(data) + data = data.strip() + s = cStringIO.StringIO(data) + self.hd.read(s) + assert self.hd["header"] == ['one\r\n two'] + + def test_dictToHeader1(self): + self.hd.append("one", "uno") + self.hd.append("two", "due") + self.hd.append("two", "tre") + expected = [ + "one: uno\r\n", + "two: due\r\n", + "two: tre\r\n", + "\r\n" + ] + out = repr(self.hd) + for i in expected: + assert out.find(i) >= 0 + + def test_dictToHeader2(self): + self.hd["one"] = ["uno"] + expected1 = "one: uno\r\n" + expected2 = "\r\n" + out = repr(self.hd) + assert out.find(expected1) >= 0 + assert out.find(expected2) >= 0 + + def test_match_re(self): + h = utils.Headers() + h.append("one", "uno") + h.append("two", "due") + h.append("two", "tre") + assert h.match_re("uno") + assert h.match_re("two: due") + assert not h.match_re("nonono") + + + +class uisStringLike(libpry.AutoTree): + def test_all(self): + assert utils.isStringLike("foo") + assert not utils.isStringLike([1, 2, 3]) + assert not utils.isStringLike((1, 2, 3)) + assert not utils.isStringLike(["1", "2", "3"]) + + +class uisSequenceLike(libpry.AutoTree): + def test_all(self): + assert utils.isSequenceLike([1, 2, 3]) + assert utils.isSequenceLike((1, 2, 3)) + assert not utils.isSequenceLike("foobar") + assert utils.isSequenceLike(["foobar", "foo"]) + x = iter([1, 2, 3]) + assert utils.isSequenceLike(x) + assert not utils.isSequenceLike(1) + + +class umake_bogus_cert(libpry.AutoTree): + def test_all(self): + d = self.tmpdir() + path = os.path.join(d, "foo", "cert") + utils.make_bogus_cert(path) + + d = open(path).read() + assert "PRIVATE KEY" in d + assert "CERTIFICATE" in d + + +tests = [ + umake_bogus_cert(), + uisBin(), + uhexdump(), + upretty_size(), + uisStringLike(), + uisSequenceLike(), + uMultiDict(), + uHeaders(), + uData(), +] diff --git a/test/tserv b/test/tserv new file mode 100755 index 00000000..5b35b72a --- /dev/null +++ b/test/tserv @@ -0,0 +1,30 @@ +#!/usr/bin/env python +""" + A simple program for testing the test HTTP/S servers. +""" +from optparse import OptionParser, OptionGroup +import sslserv, serv + +if __name__ == "__main__": + parser = OptionParser( + usage = "%prog [options] output", + version="%prog 0.1", + ) + parser.add_option( + "-s", "--ssl", action="store_true", + dest="ssl", default=False + ) + options, args = parser.parse_args() + + if options.ssl: + port = 8443 + print "Running on port %s"%port + s = sslserv.make(port) + else: + port = 8080 + print "Running on port %s"%port + s = serv.make(port) + try: + s.serve_forever() + except KeyboardInterrupt: + pass |