aboutsummaryrefslogtreecommitdiffstats
path: root/test
diff options
context:
space:
mode:
Diffstat (limited to 'test')
-rw-r--r--test/.pry5
-rw-r--r--test/data/serverkey.pem32
-rw-r--r--test/data/testkey.pem32
-rw-r--r--test/handler.py25
-rw-r--r--test/serv.py10
-rw-r--r--test/sslserv.py22
-rw-r--r--test/test_console.py269
-rw-r--r--test/test_filt.py220
-rw-r--r--test/test_proxy.py259
-rw-r--r--test/test_utils.py221
-rwxr-xr-xtest/tserv30
11 files changed, 1125 insertions, 0 deletions
diff --git a/test/.pry b/test/.pry
new file mode 100644
index 00000000..0e4b2e23
--- /dev/null
+++ b/test/.pry
@@ -0,0 +1,5 @@
+base = ..
+coverage = ../libmproxy
+exclude = ../libmproxy/pyparsing.py
+ .
+
diff --git a/test/data/serverkey.pem b/test/data/serverkey.pem
new file mode 100644
index 00000000..289bfa71
--- /dev/null
+++ b/test/data/serverkey.pem
@@ -0,0 +1,32 @@
+-----BEGIN RSA PRIVATE KEY-----
+MIICXQIBAAKBgQC+N+9bv1YC0GKbGdv2wMuuWTGSNwE/Hq5IIxYN1eITsvbD1GgB
+69x++XJd6KTIthnta0KCpCAtbaYbCkhUfxCVv2bP+iQt2AjwMOZlgRZ+RGJ25dBu
+AjAxQmqDJcAdS6MoRHWziomnUNfNogVrfqjpvJor+1iRnrj2q00ab9WYCwIDAQAB
+AoGBAIM7V9l2UcKzPbQ/zO+Z52urgXWcmTGQ2zBNdIOrEcQBbhmAyxi4PnEja3G6
+dSU77PtNSp+S19g/k5+IIoqY9zkGigdaPhRVRKJgBTAzFzMz+WHpQIffDojFKCnL
+gyDnzMRJY8+cnsCqbHRY4hqFiCr8Rq9sCdlynAytdtrnxzqhAkEA9bha6MO+L0JA
+6IEEbVY1vtaUO9Xg5DUDjRxQcfniSJACb/2IvF0tvxAnG7I/S8AavCXqtlDPtYkI
+WOxY5Sd62QJBAMYtKUxGka4XxwCyBK8EUNaN8m9C++mpjoHD1kFri9B1bXm91nCO
+iGWqtqdarwyEc/pAHw5UGzVyBXticPIcs4MCQQCcPvsHsZhYoq91aLyw7bXFQNsH
+ZUvYsOEuNIfuwa+i5ne2UKhG5pU1PgcwNFrNRz140D98aMx7KcS2DqvEIyOZAkBF
+6Yi4L+0Uza6WwDaGx679AfaU6byVIgv0G3JqgdZBJCwK1r3f12im9SKax5MZh2Ci
+2Bwcoe83W5IzhPbzcsyhAkBo8O2U2vig5PQWQ0BUKJrCGHLq//D/ttdLVtmc6eWc
+zqssCF3Unkk3bOq35swSKeAx8WotPPVsALWr87N2hCB+
+-----END RSA PRIVATE KEY-----
+-----BEGIN CERTIFICATE-----
+MIICsDCCAhmgAwIBAgIJANwogM9sqMHLMA0GCSqGSIb3DQEBBQUAMEUxCzAJBgNV
+BAYTAkFVMRMwEQYDVQQIEwpTb21lLVN0YXRlMSEwHwYDVQQKExhJbnRlcm5ldCBX
+aWRnaXRzIFB0eSBMdGQwHhcNMTAwMTMxMDEzOTEzWhcNMTEwMTMxMDEzOTEzWjBF
+MQswCQYDVQQGEwJBVTETMBEGA1UECBMKU29tZS1TdGF0ZTEhMB8GA1UEChMYSW50
+ZXJuZXQgV2lkZ2l0cyBQdHkgTHRkMIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKB
+gQC+N+9bv1YC0GKbGdv2wMuuWTGSNwE/Hq5IIxYN1eITsvbD1GgB69x++XJd6KTI
+thnta0KCpCAtbaYbCkhUfxCVv2bP+iQt2AjwMOZlgRZ+RGJ25dBuAjAxQmqDJcAd
+S6MoRHWziomnUNfNogVrfqjpvJor+1iRnrj2q00ab9WYCwIDAQABo4GnMIGkMB0G
+A1UdDgQWBBTTnBZyw7ZZsb8+/6gvZFIHhVgtDzB1BgNVHSMEbjBsgBTTnBZyw7ZZ
+sb8+/6gvZFIHhVgtD6FJpEcwRTELMAkGA1UEBhMCQVUxEzARBgNVBAgTClNvbWUt
+U3RhdGUxITAfBgNVBAoTGEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZIIJANwogM9s
+qMHLMAwGA1UdEwQFMAMBAf8wDQYJKoZIhvcNAQEFBQADgYEApz428aOar0EBuAib
+I+liefRlK4I3MQQxq3tOeB1dgAIo0ivKtdVJGi1kPg8EO0KMvFfn6IRtssUmFgCp
+JBD+HoDzFxwI1bLMVni+g7OzaNSwL3nQ94lZUdpWMYDxqY4bLUv3goX1TlN9lmpG
+8FiBLYUC0RNTCCRDFGfDr/wUT/M=
+-----END CERTIFICATE-----
diff --git a/test/data/testkey.pem b/test/data/testkey.pem
new file mode 100644
index 00000000..af8d9d8f
--- /dev/null
+++ b/test/data/testkey.pem
@@ -0,0 +1,32 @@
+-----BEGIN RSA PRIVATE KEY-----
+MIICXQIBAAKBgQC+6rG6A/BGD0dI+mh2FZIqQZn82z/pGs4f3pyxbHb+ROxjjQOr
+fDCw2jc11XDxK7CXpDQAnkO6au/sQ5t50vSZ+PGhFD+t558VV2ausB5OYZsR7RRx
+gl1jsxWdde3EHGjxSK+aXRgFpVrZzPLSy6dl8tMoqUMWIBi0u1WTbmyYjwIDAQAB
+AoGBAKyqhmK9/Sjf2JDgKGnjyHX/Ls3JXVvtqk6Yfw7YEiaVH1ZJyu/lOgQ414YQ
+rDzyTpxXHdERUh/fZ24/FvZvHFgy5gWEQjQPpprIxvqCLKJhX73L2+TnXmfYDApb
+J7V/JfnTeOaK9LTpHsofB98A1s9DWX/ccOgKTtZIYMjYpdoBAkEA9hLvtixbO2A2
+ZgDcA9ftVX2WwdpRH+mYXl1G60Fem5nlO3Rl3FDoafRvSQNZiqyOlObvKbbYh/S2
+L7ihEMMNYQJBAMaeLnAc9jO/z4ApTqSBGUpM9b7ul16aSgq56saUI0VULIZcXeo3
+3BwdL2fEOOnzjNy6NpH2BW63h/+2t7lV++8CQQDK+S+1Sr0uKtx0Iv1YRkHEJMW3
+vQbxldNS8wnOf6s0GisVcZubsTkkPLWWuiaf1ln9xMc9106gRmAI2PgyRVHBAkA6
+iI+C9uYP5i1Oxd2pWWqMnRWnSUVO2gWMF7J7B1lFq0Lb7gi3Z/L0Th2UZR2oxN/0
+hORkK676LBhmYgDPG+n9AkAJOnPIFQVAEBAO9bAxFrje8z6GRt332IlgxuiTeDE3
+EAlH9tmZma4Tri4sWnhJwCsxl+5hWamI8NL4EIeXRvPw
+-----END RSA PRIVATE KEY-----
+-----BEGIN CERTIFICATE-----
+MIICsDCCAhmgAwIBAgIJAI7G7a/d5YwEMA0GCSqGSIb3DQEBBQUAMEUxCzAJBgNV
+BAYTAkFVMRMwEQYDVQQIEwpTb21lLVN0YXRlMSEwHwYDVQQKExhJbnRlcm5ldCBX
+aWRnaXRzIFB0eSBMdGQwHhcNMTAwMjAyMDM0MTExWhcNMTEwMjAyMDM0MTExWjBF
+MQswCQYDVQQGEwJBVTETMBEGA1UECBMKU29tZS1TdGF0ZTEhMB8GA1UEChMYSW50
+ZXJuZXQgV2lkZ2l0cyBQdHkgTHRkMIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKB
+gQC+6rG6A/BGD0dI+mh2FZIqQZn82z/pGs4f3pyxbHb+ROxjjQOrfDCw2jc11XDx
+K7CXpDQAnkO6au/sQ5t50vSZ+PGhFD+t558VV2ausB5OYZsR7RRxgl1jsxWdde3E
+HGjxSK+aXRgFpVrZzPLSy6dl8tMoqUMWIBi0u1WTbmyYjwIDAQABo4GnMIGkMB0G
+A1UdDgQWBBS+MFJTsriCPNYsj8/4f+PympPEkzB1BgNVHSMEbjBsgBS+MFJTsriC
+PNYsj8/4f+PympPEk6FJpEcwRTELMAkGA1UEBhMCQVUxEzARBgNVBAgTClNvbWUt
+U3RhdGUxITAfBgNVBAoTGEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZIIJAI7G7a/d
+5YwEMAwGA1UdEwQFMAMBAf8wDQYJKoZIhvcNAQEFBQADgYEAlpan/QX2fpXVRihV
+lQic2DktF4xd5unrZnFC8X8ScNX1ClU+AO79ejaobt4YGjeVYs0iQQsUL2E0G43c
+mOXfsq1b970Ep6xRS76EmZ+tTdFBd86tFTIhZJrOi67gs+twj5V2elyp3tQpg2ze
+G/jwDQS8V1X9CbfqBQriL7x5Tk4=
+-----END CERTIFICATE-----
diff --git a/test/handler.py b/test/handler.py
new file mode 100644
index 00000000..5803b4d1
--- /dev/null
+++ b/test/handler.py
@@ -0,0 +1,25 @@
+import socket
+from BaseHTTPServer import BaseHTTPRequestHandler
+
+
+class TestRequestHandler(BaseHTTPRequestHandler):
+ default_request_version = "HTTP/1.1"
+ def setup(self):
+ self.connection = self.request
+ self.rfile = socket._fileobject(self.request, "rb", self.rbufsize)
+ self.wfile = socket._fileobject(self.request, "wb", self.wbufsize)
+
+ def log_message(self, *args, **kwargs):
+ pass
+
+ def do_GET(self):
+ data = "data: %s\npath: %s\n"%(self.headers, self.path)
+ self.send_response(200)
+ self.send_header("proxtest", "testing")
+ self.send_header("Content-type", "text-html")
+ self.send_header("Content-length", len(data))
+ self.end_headers()
+ self.wfile.write(data)
+
+
+
diff --git a/test/serv.py b/test/serv.py
new file mode 100644
index 00000000..9e43c08f
--- /dev/null
+++ b/test/serv.py
@@ -0,0 +1,10 @@
+import socket, os, cStringIO, tempfile
+from SocketServer import BaseServer
+from BaseHTTPServer import HTTPServer
+import handler
+
+def make(port):
+ server_address = ('', port)
+ return HTTPServer(server_address, handler.TestRequestHandler)
+
+
diff --git a/test/sslserv.py b/test/sslserv.py
new file mode 100644
index 00000000..5153d0da
--- /dev/null
+++ b/test/sslserv.py
@@ -0,0 +1,22 @@
+import socket, os, cStringIO, tempfile
+from SocketServer import BaseServer
+from BaseHTTPServer import HTTPServer
+import ssl
+import handler
+
+
+class SecureHTTPServer(HTTPServer):
+ def __init__(self, server_address, HandlerClass):
+ BaseServer.__init__(self, server_address, HandlerClass)
+ self.socket = ssl.wrap_socket(
+ socket.socket(self.address_family, self.socket_type),
+ keyfile = "data/serverkey.pem",
+ certfile = "data/serverkey.pem"
+ )
+ self.server_bind()
+ self.server_activate()
+
+
+def make(port):
+ server_address = ('', port)
+ return SecureHTTPServer(server_address, handler.TestRequestHandler)
diff --git a/test/test_console.py b/test/test_console.py
new file mode 100644
index 00000000..50780aa5
--- /dev/null
+++ b/test/test_console.py
@@ -0,0 +1,269 @@
+from libmproxy import console, proxy, utils, filt
+import libpry
+
+def treq(conn=None):
+ if not conn:
+ conn = proxy.BrowserConnection("address", 22)
+ headers = utils.Headers()
+ headers["header"] = ["qvalue"]
+ return proxy.Request(conn, "host", 80, "http", "GET", "/path", headers, "content")
+
+
+def tresp(req=None):
+ if not req:
+ req = treq()
+ headers = utils.Headers()
+ headers["header_response"] = ["svalue"]
+ return proxy.Response(req, 200, "HTTP/1.1", "message", headers, "content_response")
+
+
+def tflow():
+ bc = proxy.BrowserConnection("address", 22)
+ return console.Flow(bc)
+
+
+class uState(libpry.AutoTree):
+ def test_backup(self):
+ bc = proxy.BrowserConnection("address", 22)
+ c = console.State()
+ f = console.Flow(bc)
+ c.add_browserconnect(f)
+
+ f.backup()
+ c.revert(f)
+
+ def test_flow(self):
+ """
+ normal flow:
+
+ connect -> request -> response
+ """
+ bc = proxy.BrowserConnection("address", 22)
+ c = console.State()
+ f = console.Flow(bc)
+ c.add_browserconnect(f)
+ assert c.lookup(bc)
+ assert c.get_focus() == (f, 0)
+
+ req = treq(bc)
+ assert c.add_request(req)
+ assert len(c.flow_list) == 1
+ assert c.lookup(req)
+
+ newreq = treq()
+ assert not c.add_request(newreq)
+ assert not c.lookup(newreq)
+
+ resp = tresp(req)
+ assert c.add_response(resp)
+ assert len(c.flow_list) == 1
+ assert f.waiting == False
+ assert c.lookup(resp)
+
+ newresp = tresp()
+ assert not c.add_response(newresp)
+ assert not c.lookup(newresp)
+
+ def test_err(self):
+ bc = proxy.BrowserConnection("address", 22)
+ c = console.State()
+ f = console.Flow(bc)
+ c.add_browserconnect(f)
+ e = proxy.Error(bc, "message")
+ assert c.add_error(e)
+
+ e = proxy.Error(proxy.BrowserConnection("address", 22), "message")
+ assert not c.add_error(e)
+
+ def test_view(self):
+ c = console.State()
+
+ f = tflow()
+ c.add_browserconnect(f)
+ assert len(c.view) == 1
+ c.set_limit(filt.parse("~q"))
+ assert len(c.view) == 0
+ c.set_limit(None)
+
+
+ f = tflow()
+ req = treq(f.connection)
+ c.add_browserconnect(f)
+ c.add_request(req)
+ assert len(c.view) == 2
+ c.set_limit(filt.parse("~q"))
+ assert len(c.view) == 1
+ c.set_limit(filt.parse("~s"))
+ assert len(c.view) == 0
+
+ def test_focus(self):
+ """
+ normal flow:
+
+ connect -> request -> response
+ """
+ c = console.State()
+
+ bc = proxy.BrowserConnection("address", 22)
+ f = console.Flow(bc)
+ c.add_browserconnect(f)
+ assert c.get_focus() == (f, 0)
+ assert c.get_from_pos(0) == (f, 0)
+ assert c.get_from_pos(1) == (None, None)
+ assert c.get_next(0) == (None, None)
+
+ bc2 = proxy.BrowserConnection("address", 22)
+ f2 = console.Flow(bc2)
+ c.add_browserconnect(f2)
+ assert c.get_focus() == (f, 1)
+ assert c.get_next(0) == (f, 1)
+ assert c.get_prev(1) == (f2, 0)
+ assert c.get_next(1) == (None, None)
+
+ c.set_focus(0)
+ assert c.get_focus() == (f2, 0)
+ c.set_focus(-1)
+ assert c.get_focus() == (f2, 0)
+
+ c.delete_flow(f2)
+ assert c.get_focus() == (f, 0)
+ c.delete_flow(f)
+ assert c.get_focus() == (None, None)
+
+ def _add_request(self, state):
+ f = tflow()
+ state.add_browserconnect(f)
+ q = treq(f.connection)
+ state.add_request(q)
+ return f
+
+ def _add_response(self, state):
+ f = self._add_request(state)
+ r = tresp(f.request)
+ state.add_response(r)
+
+ def test_focus_view(self):
+ c = console.State()
+ self._add_request(c)
+ self._add_response(c)
+ self._add_request(c)
+ self._add_response(c)
+ self._add_request(c)
+ self._add_response(c)
+ c.set_limit(filt.parse("~q"))
+ assert len(c.view) == 3
+ assert c.focus == 2
+
+ def test_delete_last(self):
+ c = console.State()
+ f1 = tflow()
+ f2 = tflow()
+ c.add_browserconnect(f1)
+ c.add_browserconnect(f2)
+ c.set_focus(1)
+ c.delete_flow(f1)
+ assert c.focus == 0
+
+ def test_kill_flow(self):
+ c = console.State()
+ f = tflow()
+ c.add_browserconnect(f)
+ c.kill_flow(f)
+ assert not c.flow_list
+
+ def test_clear(self):
+ c = console.State()
+ f = tflow()
+ c.add_browserconnect(f)
+ f.intercepting = True
+
+ c.clear()
+ assert len(c.flow_list) == 1
+ f.intercepting = False
+ c.clear()
+ assert len(c.flow_list) == 0
+
+
+class uFlow(libpry.AutoTree):
+ def test_match(self):
+ f = tflow()
+ f.response = tresp()
+ f.request = f.response.request
+ assert not f.match(filt.parse("~b test"))
+
+ def test_backup(self):
+ f = tflow()
+ f.backup()
+ f.revert()
+
+ def test_simple(self):
+ f = tflow()
+ assert f.get_text()
+
+ f.request = treq()
+ assert f.get_text()
+
+ f.response = tresp()
+ f.response.headers["content-type"] = ["text/html"]
+ assert f.get_text()
+ f.response.code = 404
+ assert f.get_text()
+
+ f.focus = True
+ assert f.get_text()
+
+ f.connection = console.ReplayConnection()
+ assert f.get_text()
+
+ f.response = None
+ assert f.get_text()
+
+ f.error = proxy.Error(200, "test")
+ assert f.get_text()
+
+ def test_kill(self):
+ f = tflow()
+ f.request = treq()
+ f.intercept()
+ assert not f.request.acked
+ f.kill()
+ assert f.request.acked
+ f.intercept()
+ f.response = tresp()
+ f.request = f.response.request
+ f.request.ack()
+ assert not f.response.acked
+ f.kill()
+ assert f.response.acked
+
+ def test_accept_intercept(self):
+ f = tflow()
+ f.request = treq()
+ f.intercept()
+ assert not f.request.acked
+ f.accept_intercept()
+ assert f.request.acked
+ f.response = tresp()
+ f.request = f.response.request
+ f.intercept()
+ f.request.ack()
+ assert not f.response.acked
+ f.accept_intercept()
+ assert f.response.acked
+
+
+class uformat_keyvals(libpry.AutoTree):
+ def test_simple(self):
+ assert console.format_keyvals(
+ [
+ ("aa", "bb"),
+ ("cc", "dd"),
+ ]
+ )
+
+
+tests = [
+ uFlow(),
+ uformat_keyvals(),
+ uState()
+]
diff --git a/test/test_filt.py b/test/test_filt.py
new file mode 100644
index 00000000..3cf0f6cd
--- /dev/null
+++ b/test/test_filt.py
@@ -0,0 +1,220 @@
+import cStringIO
+from libmproxy import filt, proxy, utils
+import libpry
+
+
+class uParsing(libpry.AutoTree):
+ def _dump(self, x):
+ c = cStringIO.StringIO()
+ x.dump(fp=c)
+ assert c.getvalue()
+
+ def test_simple(self):
+ assert not filt.parse("~b")
+ assert filt.parse("~q")
+ assert filt.parse("~c 10")
+ assert filt.parse("~u foobar")
+ assert filt.parse("~q ~c 10")
+ p = filt.parse("~q ~c 10")
+ self._dump(p)
+ assert len(p.lst) == 2
+
+ def test_naked_url(self):
+ #a = filt.parse("foobar")
+ #assert a.lst[0].expr == "foobar"
+
+ a = filt.parse("foobar ~h rex")
+ assert a.lst[0].expr == "foobar"
+ assert a.lst[1].expr == "rex"
+ self._dump(a)
+
+ def test_quoting(self):
+ a = filt.parse("~u 'foo ~u bar' ~u voing")
+ assert a.lst[0].expr == "foo ~u bar"
+ assert a.lst[1].expr == "voing"
+ self._dump(a)
+
+ a = filt.parse("~u foobar")
+ assert a.expr == "foobar"
+
+ a = filt.parse(r"~u 'foobar\"\''")
+ assert a.expr == "foobar\"'"
+
+ a = filt.parse(r'~u "foo \'bar"')
+ assert a.expr == "foo 'bar"
+
+ def test_nesting(self):
+ a = filt.parse("(~u foobar & ~h voing)")
+ assert a.lst[0].expr == "foobar"
+ self._dump(a)
+
+ def test_not(self):
+ a = filt.parse("!~h test")
+ assert a.itm.expr == "test"
+ a = filt.parse("!(~u test & ~h bar)")
+ assert a.itm.lst[0].expr == "test"
+ self._dump(a)
+
+ def test_binaryops(self):
+ a = filt.parse("~u foobar | ~h voing")
+ isinstance(a, filt.FOr)
+ self._dump(a)
+
+ a = filt.parse("~u foobar & ~h voing")
+ isinstance(a, filt.FAnd)
+ self._dump(a)
+
+ def test_wideops(self):
+ a = filt.parse("~hq 'header: qvalue'")
+ assert isinstance(a, filt.FHeadRequest)
+ self._dump(a)
+
+
+class uMatching(libpry.AutoTree):
+ def req(self):
+ conn = proxy.BrowserConnection("one", 2222)
+ headers = utils.Headers()
+ headers["header"] = ["qvalue"]
+ return proxy.Request(
+ conn,
+ "host",
+ 80,
+ "http",
+ "GET",
+ "/path",
+ headers,
+ "content_request"
+ )
+
+ def resp(self):
+ q = self.req()
+ headers = utils.Headers()
+ headers["header_response"] = ["svalue"]
+ return proxy.Response(
+ q,
+ 200,
+ "HTTP/1.1",
+ "message",
+ headers,
+ "content_response"
+ )
+
+ def q(self, q, o):
+ return filt.parse(q)(o)
+
+ def test_fcontenttype(self):
+ q = self.req()
+ s = self.resp()
+ assert not self.q("~t content", q)
+ assert not self.q("~t content", s)
+
+ q.headers["content-type"] = ["text/json"]
+ assert self.q("~t json", q)
+ assert self.q("~tq json", q)
+ assert not self.q("~ts json", q)
+
+ s.headers["content-type"] = ["text/json"]
+ assert self.q("~t json", s)
+
+ del s.headers["content-type"]
+ s.request.headers["content-type"] = ["text/json"]
+ assert self.q("~t json", s)
+ assert self.q("~tq json", s)
+ assert not self.q("~ts json", s)
+
+ def test_freq_fresp(self):
+ q = self.req()
+ s = self.resp()
+
+ assert self.q("~q", q)
+ assert not self.q("~q", s)
+
+ assert not self.q("~s", q)
+ assert self.q("~s", s)
+
+ def test_head(self):
+ q = self.req()
+ s = self.resp()
+ assert not self.q("~h nonexistent", q)
+ assert self.q("~h qvalue", q)
+ assert self.q("~h header", q)
+ assert self.q("~h 'header: qvalue'", q)
+
+ assert self.q("~h 'header: qvalue'", s)
+ assert self.q("~h 'header_response: svalue'", s)
+
+ assert self.q("~hq 'header: qvalue'", s)
+ assert not self.q("~hq 'header_response: svalue'", s)
+
+ assert self.q("~hq 'header: qvalue'", q)
+ assert not self.q("~hq 'header_request: svalue'", q)
+
+ assert not self.q("~hs 'header: qvalue'", s)
+ assert self.q("~hs 'header_response: svalue'", s)
+ assert not self.q("~hs 'header: qvalue'", q)
+
+ def test_body(self):
+ q = self.req()
+ s = self.resp()
+ assert not self.q("~b nonexistent", q)
+ assert self.q("~b content", q)
+ assert self.q("~b response", s)
+ assert self.q("~b content_request", s)
+
+ assert self.q("~bq content", q)
+ assert self.q("~bq content", s)
+ assert not self.q("~bq response", q)
+ assert not self.q("~bq response", s)
+
+ assert not self.q("~bs content", q)
+ assert self.q("~bs content", s)
+ assert not self.q("~bs nomatch", s)
+ assert not self.q("~bs response", q)
+ assert self.q("~bs response", s)
+
+ def test_url(self):
+ q = self.req()
+ s = self.resp()
+ assert self.q("~u host", q)
+ assert self.q("~u host/path", q)
+ assert not self.q("~u moo/path", q)
+
+ assert self.q("~u host", s)
+ assert self.q("~u host/path", s)
+ assert not self.q("~u moo/path", s)
+
+ def test_code(self):
+ q = self.req()
+ s = self.resp()
+ assert not self.q("~c 200", q)
+ assert self.q("~c 200", s)
+ assert not self.q("~c 201", s)
+
+ def test_and(self):
+ s = self.resp()
+ assert self.q("~c 200 & ~h head", s)
+ assert not self.q("~c 200 & ~h nohead", s)
+ assert self.q("(~c 200 & ~h head) & ~b content", s)
+ assert not self.q("(~c 200 & ~h head) & ~b nonexistent", s)
+ assert not self.q("(~c 200 & ~h nohead) & ~b content", s)
+
+ def test_or(self):
+ s = self.resp()
+ assert self.q("~c 200 | ~h nohead", s)
+ assert self.q("~c 201 | ~h head", s)
+ assert not self.q("~c 201 | ~h nohead", s)
+ assert self.q("(~c 201 | ~h nohead) | ~s", s)
+ assert not self.q("(~c 201 | ~h nohead) | ~q", s)
+
+ def test_not(self):
+ s = self.resp()
+ assert not self.q("! ~c 200", s)
+ assert self.q("! ~c 201", s)
+ assert self.q("!~c 201 !~c 202", s)
+ assert not self.q("!~c 201 !~c 200", s)
+
+
+tests = [
+ uMatching(),
+ uParsing()
+]
diff --git a/test/test_proxy.py b/test/test_proxy.py
new file mode 100644
index 00000000..90cfbbfb
--- /dev/null
+++ b/test/test_proxy.py
@@ -0,0 +1,259 @@
+import threading, urllib, Queue, urllib2, cStringIO
+import libpry
+import serv, sslserv
+from libmproxy import proxy, controller, utils
+import random
+
+# Yes, the random ports are horrible. During development, sockets are often not
+# properly closed during error conditions, which means you have to wait until
+# you can re-bind to the same port. This is a pain in the ass, so we just pick
+# a random port and keep moving.
+PROXL_PORT = random.randint(10000, 20000)
+HTTP_PORT = random.randint(20000, 30000)
+HTTPS_PORT = random.randint(30000, 40000)
+
+
+class TestMaster(controller.Master):
+ def __init__(self, port, testq):
+ serv = proxy.ProxyServer(port)
+ controller.Master.__init__(self, serv)
+ self.testq = testq
+ self.log = []
+
+ def clear(self):
+ self.log = []
+
+ def handle(self, m):
+ self.log.append(m)
+ m.ack()
+
+
+class ProxyThread(threading.Thread):
+ def __init__(self, port, testq):
+ self.tmaster = TestMaster(port, testq)
+ threading.Thread.__init__(self)
+
+ def run(self):
+ self.tmaster.run()
+
+ def shutdown(self):
+ self.tmaster.shutdown()
+
+
+class ServerThread(threading.Thread):
+ def __init__(self, server):
+ self.server = server
+ threading.Thread.__init__(self)
+
+ def run(self):
+ self.server.serve_forever()
+
+ def shutdown(self):
+ self.server.shutdown()
+
+
+class _TestServers(libpry.TestContainer):
+ def setUpAll(self):
+ proxy.config = proxy.Config("data/testkey.pem")
+ self.tqueue = Queue.Queue()
+ # We don't make any concurrent requests, so we can access
+ # the attributes on this object safely.
+ self.proxthread = ProxyThread(PROXL_PORT, self.tqueue)
+ self.threads = [
+ ServerThread(serv.make(HTTP_PORT)),
+ ServerThread(sslserv.make(HTTPS_PORT)),
+ self.proxthread
+ ]
+ for i in self.threads:
+ i.start()
+
+ def setUp(self):
+ self.proxthread.tmaster.clear()
+
+ def tearDownAll(self):
+ for i in self.threads:
+ i.shutdown()
+
+
+class _ProxTests(libpry.AutoTree):
+ def log(self):
+ pthread = self.findAttr("proxthread")
+ return pthread.tmaster.log
+
+
+class uSanity(_ProxTests):
+ def test_http(self):
+ """
+ Just check that the HTTP server is running.
+ """
+ f = urllib.urlopen("http://127.0.0.1:%s"%HTTP_PORT)
+ assert f.read()
+
+ def test_https(self):
+ """
+ Just check that the HTTPS server is running.
+ """
+ f = urllib.urlopen("https://127.0.0.1:%s"%HTTPS_PORT)
+ assert f.read()
+
+
+class uProxy(_ProxTests):
+ HOST = "127.0.0.1"
+ def _get(self, host=HOST):
+ r = urllib2.Request("http://%s:%s"%(host, HTTP_PORT))
+ r.set_proxy("127.0.0.1:%s"%PROXL_PORT, "http")
+ return urllib2.urlopen(r)
+
+ def _sget(self, host=HOST):
+ proxy_support = urllib2.ProxyHandler(
+ {"https" : "https://127.0.0.1:%s"%PROXL_PORT}
+ )
+ opener = urllib2.build_opener(proxy_support)
+ r = urllib2.Request("https://%s:%s"%(host, HTTPS_PORT))
+ return opener.open(r)
+
+ def test_http(self):
+ f = self._get()
+ assert f.code == 200
+ assert f.read()
+ f.close()
+
+ l = self.log()
+ assert l[0].address
+ assert l[1].headers.has_key("host")
+ assert l[2].code == 200
+
+ def test_https(self):
+ f = self._sget()
+ assert f.code == 200
+ assert f.read()
+ f.close()
+
+ l = self.log()
+ assert l[0].address
+ assert l[1].headers.has_key("host")
+ assert l[2].code == 200
+
+ # Disable these two for now: they take a long time.
+ def _test_http_nonexistent(self):
+ f = self._get("nonexistent")
+ assert f.code == 200
+ assert "Error" in f.read()
+
+ def _test_https_nonexistent(self):
+ f = self._sget("nonexistent")
+ assert f.code == 200
+ assert "Error" in f.read()
+
+
+
+class u_parse_proxy_request(libpry.AutoTree):
+ def test_simple(self):
+ libpry.raises(proxy.ProxyError, proxy.parse_proxy_request, "")
+
+ u = "GET ... HTTP/1.1"
+ libpry.raises("invalid url", proxy.parse_proxy_request, u)
+
+ u = "MORK / HTTP/1.1"
+ libpry.raises("unknown request method", proxy.parse_proxy_request, u)
+
+ u = "GET http://foo.com:8888/test HTTP/1.1"
+ m, s, h, po, pa = proxy.parse_proxy_request(u)
+ assert m == "GET"
+ assert s == "http"
+ assert h == "foo.com"
+ assert po == 8888
+ assert pa == "/test"
+
+ def test_connect(self):
+ u = "CONNECT host.com:443 HTTP/1.0"
+ expected = ('CONNECT', None, 'host.com', 443, None)
+ ret = proxy.parse_proxy_request(u)
+ assert expected == ret
+
+ def test_inner(self):
+ u = "GET / HTTP/1.1"
+ assert proxy.parse_proxy_request(u) == ('GET', None, None, None, '/')
+
+
+class u_parse_url(libpry.AutoTree):
+ def test_simple(self):
+ assert not proxy.parse_url("")
+
+ u = "http://foo.com:8888/test"
+ s, h, po, pa = proxy.parse_url(u)
+ assert s == "http"
+ assert h == "foo.com"
+ assert po == 8888
+ assert pa == "/test"
+
+ s, h, po, pa = proxy.parse_url("http://foo/bar")
+ assert s == "http"
+ assert h == "foo"
+ assert po == 80
+ assert pa == "/bar"
+
+ s, h, po, pa = proxy.parse_url("http://foo")
+ assert pa == "/"
+
+
+class uConfig(libpry.AutoTree):
+ def test_pem(self):
+ c = proxy.Config(pemfile="data/testkey.pem")
+ assert c.pemfile
+
+
+class uFileLike(libpry.AutoTree):
+ def test_wrap(self):
+ s = cStringIO.StringIO("foobar\nfoobar")
+ s = proxy.FileLike(s)
+ s.flush()
+ assert s.readline() == "foobar\n"
+ assert s.readline() == "foobar"
+
+
+class uRequest(libpry.AutoTree):
+ def test_simple(self):
+ h = utils.Headers()
+ h["test"] = ["test"]
+ c = proxy.BrowserConnection("addr", 2222)
+ r = proxy.Request(c, "host", 22, "https", "GET", "/", h, "content")
+ u = r.url()
+ assert r.set_url(u)
+ assert not r.set_url("")
+ assert r.url() == u
+ assert r.short()
+ assert r.assemble()
+
+
+class uResponse(libpry.AutoTree):
+ def test_simple(self):
+ h = utils.Headers()
+ h["test"] = ["test"]
+ c = proxy.BrowserConnection("addr", 2222)
+ req = proxy.Request(c, "host", 22, "https", "GET", "/", h, "content")
+ resp = proxy.Response(req, 200, "HTTP", "msg", h.copy(), "content")
+ assert resp.short()
+ assert resp.assemble()
+
+
+class uProxyError(libpry.AutoTree):
+ def test_simple(self):
+ p = proxy.ProxyError(111, "msg")
+ assert repr(p)
+
+
+
+tests = [
+ uProxyError(),
+ uRequest(),
+ uResponse(),
+ uFileLike(),
+ uConfig(),
+ u_parse_proxy_request(),
+ u_parse_url(),
+ _TestServers(), [
+ uSanity(),
+ uProxy(),
+ ]
+]
diff --git a/test/test_utils.py b/test/test_utils.py
new file mode 100644
index 00000000..8a4da968
--- /dev/null
+++ b/test/test_utils.py
@@ -0,0 +1,221 @@
+import textwrap, cStringIO, os
+import libpry
+from libmproxy import utils
+
+
+class uisBin(libpry.AutoTree):
+ def test_simple(self):
+ assert not utils.isBin("testing\n\r")
+ assert utils.isBin("testing\x01")
+ assert utils.isBin("testing\x0e")
+ assert utils.isBin("testing\x7f")
+
+
+class uhexdump(libpry.AutoTree):
+ def test_simple(self):
+ assert utils.hexdump("one\0"*10)
+
+
+class upretty_size(libpry.AutoTree):
+ def test_simple(self):
+ assert utils.pretty_size(100) == "100B"
+ assert utils.pretty_size(1024) == "1kB"
+ assert utils.pretty_size(1024 + (1024/2)) == "1.5kB"
+ assert utils.pretty_size(1024*1024) == "1M"
+
+
+class uData(libpry.AutoTree):
+ def test_nonexistent(self):
+ libpry.raises("does not exist", utils.data.path, "nonexistent")
+
+
+class uMultiDict(libpry.AutoTree):
+ def setUp(self):
+ self.md = utils.MultiDict()
+
+ def test_setget(self):
+ assert not self.md.has_key("foo")
+ self.md.append("foo", 1)
+ assert self.md["foo"] == [1]
+ assert self.md.has_key("foo")
+
+ def test_del(self):
+ self.md.append("foo", 1)
+ del self.md["foo"]
+ assert not self.md.has_key("foo")
+
+ def test_extend(self):
+ self.md.append("foo", 1)
+ self.md.extend("foo", [2, 3])
+ assert self.md["foo"] == [1, 2, 3]
+
+ def test_extend_err(self):
+ self.md.append("foo", 1)
+ libpry.raises("not iterable", self.md.extend, "foo", 2)
+
+ def test_get(self):
+ self.md.append("foo", 1)
+ self.md.append("foo", 2)
+ assert self.md.get("foo") == [1, 2]
+ assert self.md.get("bar") == None
+
+ def test_caseSensitivity(self):
+ self.md._helper = (utils._caseless,)
+ self.md["foo"] = [1]
+ self.md.append("FOO", 2)
+ assert self.md["foo"] == [1, 2]
+ assert self.md["FOO"] == [1, 2]
+ assert self.md.has_key("FoO")
+
+ def test_dict(self):
+ self.md.append("foo", 1)
+ self.md.append("foo", 2)
+ self.md["bar"] = [3]
+ assert self.md == self.md
+ assert dict(self.md) == self.md
+
+ def test_copy(self):
+ self.md["foo"] = [1, 2]
+ self.md["bar"] = [3, 4]
+ md2 = self.md.copy()
+ assert md2 == self.md
+ assert id(md2) != id(self.md)
+
+ def test_clear(self):
+ self.md["foo"] = [1, 2]
+ self.md["bar"] = [3, 4]
+ self.md.clear()
+ assert not self.md.keys()
+
+ def test_setitem(self):
+ libpry.raises(ValueError, self.md.__setitem__, "foo", "bar")
+ self.md["foo"] = ["bar"]
+ assert self.md["foo"] == ["bar"]
+
+ def test_itemPairs(self):
+ self.md.append("foo", 1)
+ self.md.append("foo", 2)
+ self.md.append("bar", 3)
+ l = list(self.md.itemPairs())
+ assert len(l) == 3
+ assert ("foo", 1) in l
+ assert ("foo", 2) in l
+ assert ("bar", 3) in l
+
+
+class uHeaders(libpry.AutoTree):
+ def setUp(self):
+ self.hd = utils.Headers()
+
+ def test_read_simple(self):
+ data = """
+ Header: one
+ Header2: two
+ \r\n
+ """
+ data = textwrap.dedent(data)
+ data = data.strip()
+ s = cStringIO.StringIO(data)
+ self.hd.read(s)
+ assert self.hd["header"] == ["one"]
+ assert self.hd["header2"] == ["two"]
+
+ def test_read_multi(self):
+ data = """
+ Header: one
+ Header: two
+ \r\n
+ """
+ data = textwrap.dedent(data)
+ data = data.strip()
+ s = cStringIO.StringIO(data)
+ self.hd.read(s)
+ assert self.hd["header"] == ["one", "two"]
+
+ def test_read_continued(self):
+ data = """
+ Header: one
+ \ttwo
+ Header2: three
+ \r\n
+ """
+ data = textwrap.dedent(data)
+ data = data.strip()
+ s = cStringIO.StringIO(data)
+ self.hd.read(s)
+ assert self.hd["header"] == ['one\r\n two']
+
+ def test_dictToHeader1(self):
+ self.hd.append("one", "uno")
+ self.hd.append("two", "due")
+ self.hd.append("two", "tre")
+ expected = [
+ "one: uno\r\n",
+ "two: due\r\n",
+ "two: tre\r\n",
+ "\r\n"
+ ]
+ out = repr(self.hd)
+ for i in expected:
+ assert out.find(i) >= 0
+
+ def test_dictToHeader2(self):
+ self.hd["one"] = ["uno"]
+ expected1 = "one: uno\r\n"
+ expected2 = "\r\n"
+ out = repr(self.hd)
+ assert out.find(expected1) >= 0
+ assert out.find(expected2) >= 0
+
+ def test_match_re(self):
+ h = utils.Headers()
+ h.append("one", "uno")
+ h.append("two", "due")
+ h.append("two", "tre")
+ assert h.match_re("uno")
+ assert h.match_re("two: due")
+ assert not h.match_re("nonono")
+
+
+
+class uisStringLike(libpry.AutoTree):
+ def test_all(self):
+ assert utils.isStringLike("foo")
+ assert not utils.isStringLike([1, 2, 3])
+ assert not utils.isStringLike((1, 2, 3))
+ assert not utils.isStringLike(["1", "2", "3"])
+
+
+class uisSequenceLike(libpry.AutoTree):
+ def test_all(self):
+ assert utils.isSequenceLike([1, 2, 3])
+ assert utils.isSequenceLike((1, 2, 3))
+ assert not utils.isSequenceLike("foobar")
+ assert utils.isSequenceLike(["foobar", "foo"])
+ x = iter([1, 2, 3])
+ assert utils.isSequenceLike(x)
+ assert not utils.isSequenceLike(1)
+
+
+class umake_bogus_cert(libpry.AutoTree):
+ def test_all(self):
+ d = self.tmpdir()
+ path = os.path.join(d, "foo", "cert")
+ utils.make_bogus_cert(path)
+
+ d = open(path).read()
+ assert "PRIVATE KEY" in d
+ assert "CERTIFICATE" in d
+
+
+tests = [
+ umake_bogus_cert(),
+ uisBin(),
+ uhexdump(),
+ upretty_size(),
+ uisStringLike(),
+ uisSequenceLike(),
+ uMultiDict(),
+ uHeaders(),
+ uData(),
+]
diff --git a/test/tserv b/test/tserv
new file mode 100755
index 00000000..5b35b72a
--- /dev/null
+++ b/test/tserv
@@ -0,0 +1,30 @@
+#!/usr/bin/env python
+"""
+ A simple program for testing the test HTTP/S servers.
+"""
+from optparse import OptionParser, OptionGroup
+import sslserv, serv
+
+if __name__ == "__main__":
+ parser = OptionParser(
+ usage = "%prog [options] output",
+ version="%prog 0.1",
+ )
+ parser.add_option(
+ "-s", "--ssl", action="store_true",
+ dest="ssl", default=False
+ )
+ options, args = parser.parse_args()
+
+ if options.ssl:
+ port = 8443
+ print "Running on port %s"%port
+ s = sslserv.make(port)
+ else:
+ port = 8080
+ print "Running on port %s"%port
+ s = serv.make(port)
+ try:
+ s.serve_forever()
+ except KeyboardInterrupt:
+ pass