import Queue
import time
import os.path
from cStringIO import StringIO
import email.utils

import mock

import netlib.utils
from netlib import odict
from netlib.http import CONTENT_MISSING, Headers
from libmproxy import filt, controller, tnetstring, flow
from libmproxy.models import Error
from libmproxy.models import Flow
from libmproxy.models import HTTPFlow
from libmproxy.models import HTTPRequest
from libmproxy.models import HTTPResponse
from libmproxy.proxy.config import HostMatcher
from libmproxy.proxy import ProxyConfig
from libmproxy.proxy.server import DummyServer
from libmproxy.models.connections import ClientConnection
from . import tutils


def test_app_registry():
    ar = flow.AppRegistry()
    ar.add("foo", "domain", 80)

    r = HTTPRequest.wrap(netlib.tutils.treq())
    r.host = "domain"
    r.port = 80
    assert ar.get(r)

    r.port = 81
    assert not ar.get(r)

    r = HTTPRequest.wrap(netlib.tutils.treq())
    r.host = "domain2"
    r.port = 80
    assert not ar.get(r)
    r.headers["host"] = "domain"
    assert ar.get(r)


class TestStickyCookieState:

    def _response(self, cookie, host):
        s = flow.StickyCookieState(filt.parse(".*"))
        f = tutils.tflow(req=netlib.tutils.treq(host=host, port=80), resp=True)
        f.response.headers["Set-Cookie"] = cookie
        s.handle_response(f)
        return s, f

    def test_domain_match(self):
        s = flow.StickyCookieState(filt.parse(".*"))
        assert s.domain_match("www.google.com", ".google.com")
        assert s.domain_match("google.com", ".google.com")

    def test_handle_response(self):
        c = "SSID=mooo; domain=.google.com, FOO=bar; Domain=.google.com; Path=/; "\
            "Expires=Wed, 13-Jan-2021 22:23:01 GMT; Secure; "

        s, f = self._response(c, "host")
        assert not s.jar.keys()

        s, f = self._response(c, "www.google.com")
        assert s.jar.keys()

        s, f = self._response("SSID=mooo", "www.google.com")
        assert s.jar.keys()[0] == ('www.google.com', 80, '/')

    def test_handle_request(self):
        s, f = self._response("SSID=mooo", "www.google.com")
        assert "cookie" not in f.request.headers
        s.handle_request(f)
        assert "cookie" in f.request.headers


class TestStickyAuthState:

    def test_handle_response(self):
        s = flow.StickyAuthState(filt.parse(".*"))
        f = tutils.tflow(resp=True)
        f.request.headers["authorization"] = "foo"
        s.handle_request(f)
        assert "address" in s.hosts

        f = tutils.tflow(resp=True)
        s.handle_request(f)
        assert f.request.headers["authorization"] == "foo"


class TestClientPlaybackState:

    def test_tick(self):
        first = tutils.tflow()
        s = flow.State()
        fm = flow.FlowMaster(None, s)
        fm.start_client_playback([first, tutils.tflow()], True)
        c = fm.client_playback
        c.testing = True

        assert not c.done()
        assert not s.flow_count()
        assert c.count() == 2
        c.tick(fm)
        assert s.flow_count()
        assert c.count() == 1

        c.tick(fm)
        assert c.count() == 1

        c.clear(c.current)
        c.tick(fm)
        assert c.count() == 0
        c.clear(c.current)
        assert c.done()

        q = Queue.Queue()
        fm.state.clear()
        fm.tick(q, timeout=0)

        fm.stop_client_playback()
        assert not fm.client_playback


class TestServerPlaybackState:

    def test_hash(self):
        s = flow.ServerPlaybackState(
            None,
            [],
            False,
            False,
            None,
            False,
            None,
            False)
        r = tutils.tflow()
        r2 = tutils.tflow()

        assert s._hash(r)
        assert s._hash(r) == s._hash(r2)
        r.request.headers["foo"] = "bar"
        assert s._hash(r) == s._hash(r2)
        r.request.path = "voing"
        assert s._hash(r) != s._hash(r2)

        r.request.path = "path?blank_value"
        r2.request.path = "path?"
        assert s._hash(r) != s._hash(r2)

    def test_headers(self):
        s = flow.ServerPlaybackState(
            ["foo"],
            [],
            False,
            False,
            None,
            False,
            None,
            False)
        r = tutils.tflow(resp=True)
        r.request.headers["foo"] = "bar"
        r2 = tutils.tflow(resp=True)
        assert not s._hash(r) == s._hash(r2)
        r2.request.headers["foo"] = "bar"
        assert s._hash(r) == s._hash(r2)
        r2.request.headers["oink"] = "bar"
        assert s._hash(r) == s._hash(r2)

        r = tutils.tflow(resp=True)
        r2 = tutils.tflow(resp=True)
        assert s._hash(r) == s._hash(r2)

    def test_load(self):
        r = tutils.tflow(resp=True)
        r.request.headers["key"] = "one"

        r2 = tutils.tflow(resp=True)
        r2.request.headers["key"] = "two"

        s = flow.ServerPlaybackState(
            None, [
                r, r2], False, False, None, False, None, False)
        assert s.count() == 2
        assert len(s.fmap.keys()) == 1

        n = s.next_flow(r)
        assert n.request.headers["key"] == "one"
        assert s.count() == 1

        n = s.next_flow(r)
        assert n.request.headers["key"] == "two"
        assert s.count() == 0

        assert not s.next_flow(r)

    def test_load_with_nopop(self):
        r = tutils.tflow(resp=True)
        r.request.headers["key"] = "one"

        r2 = tutils.tflow(resp=True)
        r2.request.headers["key"] = "two"

        s = flow.ServerPlaybackState(
            None, [
                r, r2], False, True, None, False, None, False)

        assert s.count() == 2
        s.next_flow(r)
        assert s.count() == 2

    def test_ignore_params(self):
        s = flow.ServerPlaybackState(
            None, [], False, False, [
                "param1", "param2"], False, None, False)
        r = tutils.tflow(resp=True)
        r.request.path = "/test?param1=1"
        r2 = tutils.tflow(resp=True)
        r2.request.path = "/test"
        assert s._hash(r) == s._hash(r2)
        r2.request.path = "/test?param1=2"
        assert s._hash(r) == s._hash(r2)
        r2.request.path = "/test?param2=1"
        assert s._hash(r) == s._hash(r2)
        r2.request.path = "/test?param3=2"
        assert not s._hash(r) == s._hash(r2)

    def test_ignore_payload_params(self):
        s = flow.ServerPlaybackState(
            None, [], False, False, None, False, [
                "param1", "param2"], False)
        r = tutils.tflow(resp=True)
        r.request.headers["Content-Type"] = "application/x-www-form-urlencoded"
        r.request.content = "paramx=x&param1=1"
        r2 = tutils.tflow(resp=True)
        r2.request.headers["Content-Type"] = "application/x-www-form-urlencoded"
        r2.request.content = "paramx=x&param1=1"
        # same parameters
        assert s._hash(r) == s._hash(r2)
        # ignored parameters !=
        r2.request.content = "paramx=x&param1=2"
        assert s._hash(r) == s._hash(r2)
        # missing parameter
        r2.request.content = "paramx=x"
        assert s._hash(r) == s._hash(r2)
        # ignorable parameter added
        r2.request.content = "paramx=x&param1=2"
        assert s._hash(r) == s._hash(r2)
        # not ignorable parameter changed
        r2.request.content = "paramx=y&param1=1"
        assert not s._hash(r) == s._hash(r2)
        # not ignorable parameter missing
        r2.request.content = "param1=1"
        assert not s._hash(r) == s._hash(r2)

    def test_ignore_payload_params_other_content_type(self):
        s = flow.ServerPlaybackState(
            None, [], False, False, None, False, [
                "param1", "param2"], False)
        r = tutils.tflow(resp=True)
        r.request.headers["Content-Type"] = "application/json"
        r.request.content = '{"param1":"1"}'
        r2 = tutils.tflow(resp=True)
        r2.request.headers["Content-Type"] = "application/json"
        r2.request.content = '{"param1":"1"}'
        # same content
        assert s._hash(r) == s._hash(r2)
        # distint content (note only x-www-form-urlencoded payload is analysed)
        r2.request.content = '{"param1":"2"}'
        assert not s._hash(r) == s._hash(r2)

    def test_ignore_payload_wins_over_params(self):
        # NOTE: parameters are mutually exclusive in options
        s = flow.ServerPlaybackState(
            None, [], False, False, None, True, [
                "param1", "param2"], False)
        r = tutils.tflow(resp=True)
        r.request.headers["Content-Type"] = "application/x-www-form-urlencoded"
        r.request.content = "paramx=y"
        r2 = tutils.tflow(resp=True)
        r2.request.headers["Content-Type"] = "application/x-www-form-urlencoded"
        r2.request.content = "paramx=x"
        # same parameters
        assert s._hash(r) == s._hash(r2)

    def test_ignore_content(self):
        s = flow.ServerPlaybackState(
            None,
            [],
            False,
            False,
            None,
            False,
            None,
            False)
        r = tutils.tflow(resp=True)
        r2 = tutils.tflow(resp=True)

        r.request.content = "foo"
        r2.request.content = "foo"
        assert s._hash(r) == s._hash(r2)
        r2.request.content = "bar"
        assert not s._hash(r) == s._hash(r2)

        # now ignoring content
        s = flow.ServerPlaybackState(
            None,
            [],
            False,
            False,
            None,
            True,
            None,
            False)
        r = tutils.tflow(resp=True)
        r2 = tutils.tflow(resp=True)
        r.request.content = "foo"
        r2.request.content = "foo"
        assert s._hash(r) == s._hash(r2)
        r2.request.content = "bar"
        assert s._hash(r) == s._hash(r2)
        r2.request.content = ""
        assert s._hash(r) == s._hash(r2)
        r2.request.content = None
        assert s._hash(r) == s._hash(r2)

    def test_ignore_host(self):
        s = flow.ServerPlaybackState(
            None,
            [],
            False,
            False,
            None,
            False,
            None,
            True)
        r = tutils.tflow(resp=True)
        r2 = tutils.tflow(resp=True)

        r.request.host = "address"
        r2.request.host = "address"
        assert s._hash(r) == s._hash(r2)
        r2.request.host = "wrong_address"
        assert s._hash(r) == s._hash(r2)


class TestFlow(object):

    def test_copy(self):
        f = tutils.tflow(resp=True)
        a0 = f.get_state()
        f2 = f.copy()
        a = f.get_state()
        b = f2.get_state()
        del a["id"]
        del b["id"]
        assert a == b
        assert not f == f2
        assert not f is f2
        assert f.request.get_state() == f2.request.get_state()
        assert not f.request is f2.request
        assert f.request.headers == f2.request.headers
        assert not f.request.headers is f2.request.headers
        assert f.response.get_state() == f2.response.get_state()
        assert not f.response is f2.response

        f = tutils.tflow(err=True)
        f2 = f.copy()
        assert not f is f2
        assert not f.request is f2.request
        assert f.request.headers == f2.request.headers
        assert not f.request.headers is f2.request.headers
        assert f.error.get_state() == f2.error.get_state()
        assert not f.error is f2.error

    def test_match(self):
        f = tutils.tflow(resp=True)
        assert not f.match("~b test")
        assert f.match(None)
        assert not f.match("~b test")

        f = tutils.tflow(err=True)
        assert f.match("~e")

        tutils.raises(ValueError, f.match, "~")

    def test_backup(self):
        f = tutils.tflow()
        f.response = HTTPResponse.wrap(netlib.tutils.tresp())
        f.request.content = "foo"
        assert not f.modified()
        f.backup()
        f.request.content = "bar"
        assert f.modified()
        f.revert()
        assert f.request.content == "foo"

    def test_backup_idempotence(self):
        f = tutils.tflow(resp=True)
        f.backup()
        f.revert()
        f.backup()
        f.revert()

    def test_getset_state(self):
        f = tutils.tflow(resp=True)
        state = f.get_state()
        assert f.get_state() == HTTPFlow.from_state(
            state).get_state()

        f.response = None
        f.error = Error("error")
        state = f.get_state()
        assert f.get_state() == HTTPFlow.from_state(
            state).get_state()

        f2 = f.copy()
        f2.id = f.id  # copy creates a different uuid
        assert f.get_state() == f2.get_state()
        assert not f == f2
        f2.error = Error("e2")
        assert not f == f2
        f.load_state(f2.get_state())
        assert f.get_state() == f2.get_state()

    def test_kill(self):
        s = flow.State()
        fm = flow.FlowMaster(None, s)
        f = tutils.tflow()
        f.intercept(mock.Mock())
        assert not f.reply.acked
        f.kill(fm)
        assert f.reply.acked

    def test_killall(self):
        s = flow.State()
        fm = flow.FlowMaster(None, s)

        f = tutils.tflow()
        fm.handle_request(f)

        f = tutils.tflow()
        fm.handle_request(f)

        for i in s.view:
            assert not i.reply.acked
        s.killall(fm)
        for i in s.view:
            assert i.reply.acked

    def test_accept_intercept(self):
        f = tutils.tflow()

        f.intercept(mock.Mock())
        assert not f.reply.acked
        f.accept_intercept(mock.Mock())
        assert f.reply.acked

    def test_replace_unicode(self):
        f = tutils.tflow(resp=True)
        f.response.content = "\xc2foo"
        f.replace("foo", u"bar")

    def test_replace(self):
        f = tutils.tflow(resp=True)
        f.request.headers["foo"] = "foo"
        f.request.content = "afoob"

        f.response.headers["foo"] = "foo"
        f.response.content = "afoob"

        assert f.replace("foo", "bar") == 6

        assert f.request.headers["bar"] == "bar"
        assert f.request.content == "abarb"
        assert f.response.headers["bar"] == "bar"
        assert f.response.content == "abarb"

    def test_replace_encoded(self):
        f = tutils.tflow(resp=True)
        f.request.content = "afoob"
        f.request.encode("gzip")
        f.response.content = "afoob"
        f.response.encode("gzip")

        f.replace("foo", "bar")

        assert f.request.content != "abarb"
        f.request.decode()
        assert f.request.content == "abarb"

        assert f.response.content != "abarb"
        f.response.decode()
        assert f.response.content == "abarb"


class TestState:

    def test_backup(self):
        c = flow.State()
        f = tutils.tflow()
        c.add_flow(f)
        f.backup()
        c.revert(f)

    def test_flow(self):
        """
            normal flow:

                connect -> request -> response
        """
        c = flow.State()
        f = tutils.tflow()
        c.add_flow(f)
        assert f
        assert c.flow_count() == 1
        assert c.active_flow_count() == 1

        newf = tutils.tflow()
        assert c.add_flow(newf)
        assert c.active_flow_count() == 2

        f.response = HTTPResponse.wrap(netlib.tutils.tresp())
        assert c.update_flow(f)
        assert c.flow_count() == 2
        assert c.active_flow_count() == 1

        _ = HTTPResponse.wrap(netlib.tutils.tresp())
        assert not c.update_flow(None)
        assert c.active_flow_count() == 1

        newf.response = HTTPResponse.wrap(netlib.tutils.tresp())
        assert c.update_flow(newf)
        assert c.active_flow_count() == 0

    def test_err(self):
        c = flow.State()
        f = tutils.tflow()
        c.add_flow(f)
        f.error = Error("message")
        assert c.update_flow(f)

        c = flow.State()
        f = tutils.tflow()
        c.add_flow(f)
        c.set_limit("~e")
        assert not c.view
        f.error = tutils.terr()
        assert c.update_flow(f)
        assert c.view

    def test_set_limit(self):
        c = flow.State()

        f = tutils.tflow()
        assert len(c.view) == 0

        c.add_flow(f)
        assert len(c.view) == 1

        c.set_limit("~s")
        assert c.limit_txt == "~s"
        assert len(c.view) == 0
        f.response = HTTPResponse.wrap(netlib.tutils.tresp())
        c.update_flow(f)
        assert len(c.view) == 1
        c.set_limit(None)
        assert len(c.view) == 1

        f = tutils.tflow()
        c.add_flow(f)
        assert len(c.view) == 2
        c.set_limit("~q")
        assert len(c.view) == 1
        c.set_limit("~s")
        assert len(c.view) == 1

        assert "Invalid" in c.set_limit("~")

    def test_set_intercept(self):
        c = flow.State()
        assert not c.set_intercept("~q")
        assert c.intercept_txt == "~q"
        assert "Invalid" in c.set_intercept("~")
        assert not c.set_intercept(None)
        assert c.intercept_txt is None

    def _add_request(self, state):
        f = tutils.tflow()
        state.add_flow(f)
        return f

    def _add_response(self, state):
        f = tutils.tflow()
        state.add_flow(f)
        f.response = HTTPResponse.wrap(netlib.tutils.tresp())
        state.update_flow(f)

    def _add_error(self, state):
        f = tutils.tflow(err=True)
        state.add_flow(f)

    def test_clear(self):
        c = flow.State()
        f = self._add_request(c)
        f.intercepted = True

        c.clear()
        assert c.flow_count() == 0

    def test_dump_flows(self):
        c = flow.State()
        self._add_request(c)
        self._add_response(c)
        self._add_request(c)
        self._add_response(c)
        self._add_request(c)
        self._add_response(c)
        self._add_error(c)

        flows = c.view[:]
        c.clear()

        c.load_flows(flows)
        assert isinstance(c.flows[0], Flow)

    def test_accept_all(self):
        c = flow.State()
        self._add_request(c)
        self._add_response(c)
        self._add_request(c)
        c.accept_all(mock.Mock())


class TestSerialize:

    def _treader(self):
        sio = StringIO()
        w = flow.FlowWriter(sio)
        for i in range(3):
            f = tutils.tflow(resp=True)
            w.add(f)
        for i in range(3):
            f = tutils.tflow(err=True)
            w.add(f)

        sio.seek(0)
        return flow.FlowReader(sio)

    def test_roundtrip(self):
        sio = StringIO()
        f = tutils.tflow()
        f.request.content = "".join(chr(i) for i in range(255))
        w = flow.FlowWriter(sio)
        w.add(f)

        sio.seek(0)
        r = flow.FlowReader(sio)
        l = list(r.stream())
        assert len(l) == 1

        f2 = l[0]
        assert f2.get_state() == f.get_state()
        assert f2.request == f.request

    def test_load_flows(self):
        r = self._treader()
        s = flow.State()
        fm = flow.FlowMaster(None, s)
        fm.load_flows(r)
        assert len(s.flows) == 6

    def test_load_flows_reverse(self):
        r = self._treader()
        s = flow.State()
        conf = ProxyConfig(
            mode="reverse",
            upstream_server=("https", ("use-this-domain", 80))
        )
        fm = flow.FlowMaster(DummyServer(conf), s)
        fm.load_flows(r)
        assert s.flows[0].request.host == "use-this-domain"

    def test_filter(self):
        sio = StringIO()
        fl = filt.parse("~c 200")
        w = flow.FilteredFlowWriter(sio, fl)

        f = tutils.tflow(resp=True)
        f.response.status_code = 200
        w.add(f)

        f = tutils.tflow(resp=True)
        f.response.status_code = 201
        w.add(f)

        sio.seek(0)
        r = flow.FlowReader(sio)
        assert len(list(r.stream()))

    def test_error(self):
        sio = StringIO()
        sio.write("bogus")
        sio.seek(0)
        r = flow.FlowReader(sio)
        tutils.raises(flow.FlowReadError, list, r.stream())

        f = flow.FlowReadError("foo")
        assert f.strerror == "foo"

    def test_versioncheck(self):
        f = tutils.tflow()
        d = f.get_state()
        d["version"] = (0, 0)
        sio = StringIO()
        tnetstring.dump(d, sio)
        sio.seek(0)

        r = flow.FlowReader(sio)
        tutils.raises("version", list, r.stream())


class TestFlowMaster:

    def test_load_script(self):
        s = flow.State()
        fm = flow.FlowMaster(None, s)
        assert not fm.load_script(tutils.test_data.path("scripts/a.py"))
        assert not fm.load_script(tutils.test_data.path("scripts/a.py"))
        assert not fm.unload_scripts()
        assert fm.load_script("nonexistent")
        assert "ValueError" in fm.load_script(
            tutils.test_data.path("scripts/starterr.py"))
        assert len(fm.scripts) == 0

    def test_getset_ignore(self):
        p = mock.Mock()
        p.config.check_ignore = HostMatcher()
        fm = flow.FlowMaster(p, flow.State())
        assert not fm.get_ignore_filter()
        fm.set_ignore_filter(["^apple\.com:", ":443$"])
        assert fm.get_ignore_filter()

    def test_replay(self):
        s = flow.State()
        fm = flow.FlowMaster(None, s)
        f = tutils.tflow(resp=True)
        f.request.content = CONTENT_MISSING
        assert "missing" in fm.replay_request(f)

        f.intercepted = True
        assert "intercepting" in fm.replay_request(f)

        f.live = True
        assert "live" in fm.replay_request(f, run_scripthooks=True)

    def test_script_reqerr(self):
        s = flow.State()
        fm = flow.FlowMaster(None, s)
        assert not fm.load_script(tutils.test_data.path("scripts/reqerr.py"))
        f = tutils.tflow()
        fm.handle_clientconnect(f.client_conn)
        assert fm.handle_request(f)

    def test_script(self):
        s = flow.State()
        fm = flow.FlowMaster(None, s)
        assert not fm.load_script(tutils.test_data.path("scripts/all.py"))
        f = tutils.tflow(resp=True)

        fm.handle_clientconnect(f.client_conn)
        assert fm.scripts[0].ns["log"][-1] == "clientconnect"
        fm.handle_serverconnect(f.server_conn)
        assert fm.scripts[0].ns["log"][-1] == "serverconnect"
        fm.handle_request(f)
        assert fm.scripts[0].ns["log"][-1] == "request"
        fm.handle_response(f)
        assert fm.scripts[0].ns["log"][-1] == "response"
        # load second script
        assert not fm.load_script(tutils.test_data.path("scripts/all.py"))
        assert len(fm.scripts) == 2
        fm.handle_clientdisconnect(f.server_conn)
        assert fm.scripts[0].ns["log"][-1] == "clientdisconnect"
        assert fm.scripts[1].ns["log"][-1] == "clientdisconnect"

        # unload first script
        fm.unload_scripts()
        assert len(fm.scripts) == 0
        assert not fm.load_script(tutils.test_data.path("scripts/all.py"))

        f.error = tutils.terr()
        fm.handle_error(f)
        assert fm.scripts[0].ns["log"][-1] == "error"

    def test_duplicate_flow(self):
        s = flow.State()
        fm = flow.FlowMaster(None, s)
        f = tutils.tflow(resp=True)
        f = fm.load_flow(f)
        assert s.flow_count() == 1
        f2 = fm.duplicate_flow(f)
        assert f2.response
        assert s.flow_count() == 2
        assert s.index(f2) == 1

    def test_all(self):
        s = flow.State()
        fm = flow.FlowMaster(None, s)
        fm.anticache = True
        fm.anticomp = True
        f = tutils.tflow(req=None)
        fm.handle_clientconnect(f.client_conn)
        f.request = HTTPRequest.wrap(netlib.tutils.treq())
        fm.handle_request(f)
        assert s.flow_count() == 1

        f.response = HTTPResponse.wrap(netlib.tutils.tresp())
        fm.handle_response(f)
        assert not fm.handle_response(None)
        assert s.flow_count() == 1

        fm.handle_clientdisconnect(f.client_conn)

        f.error = Error("msg")
        f.error.reply = controller.DummyReply()
        fm.handle_error(f)

        fm.load_script(tutils.test_data.path("scripts/a.py"))
        fm.shutdown()

    def test_client_playback(self):
        s = flow.State()

        f = tutils.tflow(resp=True)
        pb = [tutils.tflow(resp=True), f]
        fm = flow.FlowMaster(DummyServer(ProxyConfig()), s)
        assert not fm.start_server_playback(
            pb,
            False,
            [],
            False,
            False,
            None,
            False,
            None,
            False)
        assert not fm.start_client_playback(pb, False)
        fm.client_playback.testing = True

        q = Queue.Queue()
        assert not fm.state.flow_count()
        fm.tick(q, 0)
        assert fm.state.flow_count()

        f.error = Error("error")
        fm.handle_error(f)

    def test_server_playback(self):
        s = flow.State()

        f = tutils.tflow()
        f.response = HTTPResponse.wrap(netlib.tutils.tresp(content=f.request))
        pb = [f]

        fm = flow.FlowMaster(None, s)
        fm.refresh_server_playback = True
        assert not fm.do_server_playback(tutils.tflow())

        fm.start_server_playback(
            pb,
            False,
            [],
            False,
            False,
            None,
            False,
            None,
            False)
        assert fm.do_server_playback(tutils.tflow())

        fm.start_server_playback(
            pb,
            False,
            [],
            True,
            False,
            None,
            False,
            None,
            False)
        r = tutils.tflow()
        r.request.content = "gibble"
        assert not fm.do_server_playback(r)
        assert fm.do_server_playback(tutils.tflow())

        fm.start_server_playback(
            pb,
            False,
            [],
            True,
            False,
            None,
            False,
            None,
            False)
        q = Queue.Queue()
        fm.tick(q, 0)
        assert fm.should_exit.is_set()

        fm.stop_server_playback()
        assert not fm.server_playback

    def test_server_playback_kill(self):
        s = flow.State()
        f = tutils.tflow()
        f.response = HTTPResponse.wrap(netlib.tutils.tresp(content=f.request))
        pb = [f]
        fm = flow.FlowMaster(None, s)
        fm.refresh_server_playback = True
        fm.start_server_playback(
            pb,
            True,
            [],
            False,
            False,
            None,
            False,
            None,
            False)

        f = tutils.tflow()
        f.request.host = "nonexistent"
        fm.process_new_request(f)
        assert "killed" in f.error.msg

    def test_stickycookie(self):
        s = flow.State()
        fm = flow.FlowMaster(None, s)
        assert "Invalid" in fm.set_stickycookie("~h")
        fm.set_stickycookie(".*")
        assert fm.stickycookie_state
        fm.set_stickycookie(None)
        assert not fm.stickycookie_state

        fm.set_stickycookie(".*")
        f = tutils.tflow(resp=True)
        f.response.headers["set-cookie"] = "foo=bar"
        fm.handle_request(f)
        fm.handle_response(f)
        assert fm.stickycookie_state.jar
        assert not "cookie" in f.request.headers
        f = f.copy()
        fm.handle_request(f)
        assert f.request.headers["cookie"] == "foo=bar"

    def test_stickyauth(self):
        s = flow.State()
        fm = flow.FlowMaster(None, s)
        assert "Invalid" in fm.set_stickyauth("~h")
        fm.set_stickyauth(".*")
        assert fm.stickyauth_state
        fm.set_stickyauth(None)
        assert not fm.stickyauth_state

        fm.set_stickyauth(".*")
        f = tutils.tflow(resp=True)
        f.request.headers["authorization"] = "foo"
        fm.handle_request(f)

        f = tutils.tflow(resp=True)
        assert fm.stickyauth_state.hosts
        assert not "authorization" in f.request.headers
        fm.handle_request(f)
        assert f.request.headers["authorization"] == "foo"

    def test_stream(self):
        with tutils.tmpdir() as tdir:
            p = os.path.join(tdir, "foo")

            def r():
                r = flow.FlowReader(open(p, "rb"))
                return list(r.stream())

            s = flow.State()
            fm = flow.FlowMaster(None, s)
            f = tutils.tflow(resp=True)

            fm.start_stream(file(p, "ab"), None)
            fm.handle_request(f)
            fm.handle_response(f)
            fm.stop_stream()

            assert r()[0].response

            f = tutils.tflow()
            fm.start_stream(file(p, "ab"), None)
            fm.handle_request(f)
            fm.shutdown()

            assert not r()[1].response


class TestRequest:

    def test_simple(self):
        f = tutils.tflow()
        r = f.request
        u = r.url
        r.url = u
        tutils.raises(ValueError, setattr, r, "url", "")
        assert r.url == u
        r2 = r.copy()
        assert r.get_state() == r2.get_state()

    def test_get_url(self):
        r = HTTPRequest.wrap(netlib.tutils.treq())

        assert r.url == "http://address:22/path"

        r.scheme = "https"
        assert r.url == "https://address:22/path"

        r.host = "host"
        r.port = 42
        assert r.url == "https://host:42/path"

        r.host = "address"
        r.port = 22
        assert r.url == "https://address:22/path"

        assert r.pretty_url == "https://address:22/path"
        r.headers["Host"] = "foo.com"
        assert r.url == "https://address:22/path"
        assert r.pretty_url == "https://foo.com:22/path"

    def test_path_components(self):
        r = HTTPRequest.wrap(netlib.tutils.treq())
        r.path = "/"
        assert r.get_path_components() == []
        r.path = "/foo/bar"
        assert r.get_path_components() == ["foo", "bar"]
        q = odict.ODict()
        q["test"] = ["123"]
        r.set_query(q)
        assert r.get_path_components() == ["foo", "bar"]

        r.set_path_components([])
        assert r.get_path_components() == []
        r.set_path_components(["foo"])
        assert r.get_path_components() == ["foo"]
        r.set_path_components(["/oo"])
        assert r.get_path_components() == ["/oo"]
        assert "%2F" in r.path

    def test_getset_form_urlencoded(self):
        d = odict.ODict([("one", "two"), ("three", "four")])
        r = HTTPRequest.wrap(netlib.tutils.treq(content=netlib.utils.urlencode(d.lst)))
        r.headers["content-type"] = "application/x-www-form-urlencoded"
        assert r.get_form_urlencoded() == d

        d = odict.ODict([("x", "y")])
        r.set_form_urlencoded(d)
        assert r.get_form_urlencoded() == d

        r.headers["content-type"] = "foo"
        assert not r.get_form_urlencoded()

    def test_getset_query(self):
        r = HTTPRequest.wrap(netlib.tutils.treq())
        r.path = "/foo?x=y&a=b"
        q = r.get_query()
        assert q.lst == [("x", "y"), ("a", "b")]

        r.path = "/"
        q = r.get_query()
        assert not q

        r.path = "/?adsfa"
        q = r.get_query()
        assert q.lst == [("adsfa", "")]

        r.path = "/foo?x=y&a=b"
        assert r.get_query()
        r.set_query(odict.ODict([]))
        assert not r.get_query()
        qv = odict.ODict([("a", "b"), ("c", "d")])
        r.set_query(qv)
        assert r.get_query() == qv

    def test_anticache(self):
        r = HTTPRequest.wrap(netlib.tutils.treq())
        r.headers = Headers()
        r.headers["if-modified-since"] = "test"
        r.headers["if-none-match"] = "test"
        r.anticache()
        assert not "if-modified-since" in r.headers
        assert not "if-none-match" in r.headers

    def test_replace(self):
        r = HTTPRequest.wrap(netlib.tutils.treq())
        r.path = "path/foo"
        r.headers["Foo"] = "fOo"
        r.content = "afoob"
        assert r.replace("foo(?i)", "boo") == 4
        assert r.path == "path/boo"
        assert not "foo" in r.content
        assert r.headers["boo"] == "boo"

    def test_constrain_encoding(self):
        r = HTTPRequest.wrap(netlib.tutils.treq())
        r.headers["accept-encoding"] = "gzip, oink"
        r.constrain_encoding()
        assert "oink" not in r.headers["accept-encoding"]

        r.headers.set_all("accept-encoding", ["gzip", "oink"])
        r.constrain_encoding()
        assert "oink" not in r.headers["accept-encoding"]

    def test_get_decoded_content(self):
        r = HTTPRequest.wrap(netlib.tutils.treq())
        r.content = None
        r.headers["content-encoding"] = "identity"
        assert r.get_decoded_content() is None

        r.content = "falafel"
        r.encode("gzip")
        assert r.get_decoded_content() == "falafel"

    def test_get_content_type(self):
        resp = HTTPResponse.wrap(netlib.tutils.tresp())
        resp.headers = Headers(content_type="text/plain")
        assert resp.headers["content-type"] == "text/plain"


class TestResponse:

    def test_simple(self):
        f = tutils.tflow(resp=True)
        resp = f.response
        resp2 = resp.copy()
        assert resp2.get_state() == resp.get_state()

    def test_refresh(self):
        r = HTTPResponse.wrap(netlib.tutils.tresp())
        n = time.time()
        r.headers["date"] = email.utils.formatdate(n)
        pre = r.headers["date"]
        r.refresh(n)
        assert pre == r.headers["date"]
        r.refresh(n + 60)

        d = email.utils.parsedate_tz(r.headers["date"])
        d = email.utils.mktime_tz(d)
        # Weird that this is not exact...
        assert abs(60 - (d - n)) <= 1

        r.headers["set-cookie"] = "MOO=BAR; Expires=Tue, 08-Mar-2011 00:20:38 GMT; Path=foo.com; Secure"
        r.refresh()

    def test_refresh_cookie(self):
        r = HTTPResponse.wrap(netlib.tutils.tresp())

        # Invalid expires format, sent to us by Reddit.
        c = "rfoo=bar; Domain=reddit.com; expires=Thu, 31 Dec 2037 23:59:59 GMT; Path=/"
        assert r._refresh_cookie(c, 60)

        c = "MOO=BAR; Expires=Tue, 08-Mar-2011 00:20:38 GMT; Path=foo.com; Secure"
        assert "00:21:38" in r._refresh_cookie(c, 60)

        # https://github.com/mitmproxy/mitmproxy/issues/773
        c = ">=A"
        with tutils.raises(ValueError):
            r._refresh_cookie(c, 60)

    def test_replace(self):
        r = HTTPResponse.wrap(netlib.tutils.tresp())
        r.headers["Foo"] = "fOo"
        r.content = "afoob"
        assert r.replace("foo(?i)", "boo") == 3
        assert not "foo" in r.content
        assert r.headers["boo"] == "boo"

    def test_get_content_type(self):
        resp = HTTPResponse.wrap(netlib.tutils.tresp())
        resp.headers = Headers(content_type="text/plain")
        assert resp.headers["content-type"] == "text/plain"


class TestError:

    def test_getset_state(self):
        e = Error("Error")
        state = e.get_state()
        assert Error.from_state(state).get_state() == e.get_state()

        assert e.copy()

        e2 = Error("bar")
        assert not e == e2
        e.load_state(e2.get_state())
        assert e.get_state() == e2.get_state()

        e3 = e.copy()
        assert e3.get_state() == e.get_state()


class TestClientConnection:

    def test_state(self):

        c = tutils.tclient_conn()
        assert ClientConnection.from_state(c.get_state()).get_state() ==\
            c.get_state()

        c2 = tutils.tclient_conn()
        c2.address.address = (c2.address.host, 4242)
        assert not c == c2

        c2.timestamp_start = 42
        c.load_state(c2.get_state())
        assert c.timestamp_start == 42

        c3 = c.copy()
        assert c3.get_state() == c.get_state()

        assert str(c)


def test_replacehooks():
    h = flow.ReplaceHooks()
    h.add("~q", "foo", "bar")
    assert h.lst

    h.set(
        [
            (".*", "one", "two"),
            (".*", "three", "four"),
        ]
    )
    assert h.count() == 2

    h.clear()
    assert not h.lst

    h.add("~q", "foo", "bar")
    h.add("~s", "foo", "bar")

    v = h.get_specs()
    assert v == [('~q', 'foo', 'bar'), ('~s', 'foo', 'bar')]
    assert h.count() == 2
    h.clear()
    assert h.count() == 0

    f = tutils.tflow()
    f.request.content = "foo"
    h.add("~s", "foo", "bar")
    h.run(f)
    assert f.request.content == "foo"

    f = tutils.tflow(resp=True)
    f.request.content = "foo"
    f.response.content = "foo"
    h.run(f)
    assert f.response.content == "bar"
    assert f.request.content == "foo"

    f = tutils.tflow()
    h.clear()
    h.add("~q", "foo", "bar")
    f.request.content = "foo"
    h.run(f)
    assert f.request.content == "bar"

    assert not h.add("~", "foo", "bar")
    assert not h.add("foo", "*", "bar")


def test_setheaders():
    h = flow.SetHeaders()
    h.add("~q", "foo", "bar")
    assert h.lst

    h.set(
        [
            (".*", "one", "two"),
            (".*", "three", "four"),
        ]
    )
    assert h.count() == 2

    h.clear()
    assert not h.lst

    h.add("~q", "foo", "bar")
    h.add("~s", "foo", "bar")

    v = h.get_specs()
    assert v == [('~q', 'foo', 'bar'), ('~s', 'foo', 'bar')]
    assert h.count() == 2
    h.clear()
    assert h.count() == 0

    f = tutils.tflow()
    f.request.content = "foo"
    h.add("~s", "foo", "bar")
    h.run(f)
    assert f.request.content == "foo"

    h.clear()
    h.add("~s", "one", "two")
    h.add("~s", "one", "three")
    f = tutils.tflow(resp=True)
    f.request.headers["one"] = "xxx"
    f.response.headers["one"] = "xxx"
    h.run(f)
    assert f.request.headers["one"] == "xxx"
    assert f.response.headers.get_all("one") == ["two", "three"]

    h.clear()
    h.add("~q", "one", "two")
    h.add("~q", "one", "three")
    f = tutils.tflow()
    f.request.headers["one"] = "xxx"
    h.run(f)
    assert f.request.headers.get_all("one") == ["two", "three"]

    assert not h.add("~", "foo", "bar")