aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAldo Cortesi <aldo@corte.si>2017-04-30 22:30:29 +1200
committerGitHub <noreply@github.com>2017-04-30 22:30:29 +1200
commitb10a3992d8a66d993894b7dc1755a741229357fa (patch)
tree136b6d7a0309d707c22c1fbd91ba30bdfc250642
parent82f87919e2494576d073f930ce0adaf31106c3e9 (diff)
parenta570caccbda28f19e637231df10b28550e8919af (diff)
downloadmitmproxy-b10a3992d8a66d993894b7dc1755a741229357fa.tar.gz
mitmproxy-b10a3992d8a66d993894b7dc1755a741229357fa.tar.bz2
mitmproxy-b10a3992d8a66d993894b7dc1755a741229357fa.zip
Merge pull request #2296 from cortesi/viewload
commands: view.load
-rw-r--r--mitmproxy/addons/__init__.py2
-rw-r--r--mitmproxy/addons/export.py75
-rw-r--r--mitmproxy/addons/view.py12
-rw-r--r--mitmproxy/command.py46
-rw-r--r--mitmproxy/export.py183
-rw-r--r--mitmproxy/tools/console/commandeditor.py15
-rw-r--r--mitmproxy/tools/console/common.py28
-rw-r--r--mitmproxy/tools/console/flowlist.py29
-rw-r--r--mitmproxy/tools/console/flowview.py47
-rw-r--r--mitmproxy/tools/console/master.py40
-rw-r--r--mitmproxy/types/multidict.py16
-rw-r--r--mitmproxy/utils/typecheck.py6
-rw-r--r--setup.cfg3
-rw-r--r--test/mitmproxy/addons/test_export.py109
-rw-r--r--test/mitmproxy/addons/test_view.py21
-rw-r--r--test/mitmproxy/data/test_flow_export/locust_get.py35
-rw-r--r--test/mitmproxy/data/test_flow_export/locust_patch.py37
-rw-r--r--test/mitmproxy/data/test_flow_export/locust_post.py26
-rw-r--r--test/mitmproxy/data/test_flow_export/locust_task_get.py20
-rw-r--r--test/mitmproxy/data/test_flow_export/locust_task_patch.py22
-rw-r--r--test/mitmproxy/data/test_flow_export/locust_task_post.py11
-rw-r--r--test/mitmproxy/data/test_flow_export/python_get.py9
-rw-r--r--test/mitmproxy/data/test_flow_export/python_patch.py10
-rw-r--r--test/mitmproxy/data/test_flow_export/python_post.py17
-rw-r--r--test/mitmproxy/data/test_flow_export/python_post_json.py9
-rw-r--r--test/mitmproxy/test_command.py54
-rw-r--r--test/mitmproxy/test_export.py133
-rw-r--r--test/mitmproxy/utils/test_typecheck.py23
28 files changed, 385 insertions, 653 deletions
diff --git a/mitmproxy/addons/__init__.py b/mitmproxy/addons/__init__.py
index 204d61e0..783a2c94 100644
--- a/mitmproxy/addons/__init__.py
+++ b/mitmproxy/addons/__init__.py
@@ -7,6 +7,7 @@ from mitmproxy.addons import core_option_validation
from mitmproxy.addons import core
from mitmproxy.addons import cut
from mitmproxy.addons import disable_h2c
+from mitmproxy.addons import export
from mitmproxy.addons import onboarding
from mitmproxy.addons import proxyauth
from mitmproxy.addons import replace
@@ -31,6 +32,7 @@ def default_addons():
clientplayback.ClientPlayback(),
cut.Cut(),
disable_h2c.DisableH2C(),
+ export.Export(),
onboarding.Onboarding(),
proxyauth.ProxyAuth(),
replace.Replace(),
diff --git a/mitmproxy/addons/export.py b/mitmproxy/addons/export.py
new file mode 100644
index 00000000..fd0c830e
--- /dev/null
+++ b/mitmproxy/addons/export.py
@@ -0,0 +1,75 @@
+import typing
+
+from mitmproxy import command
+from mitmproxy import flow
+from mitmproxy import exceptions
+from mitmproxy.utils import strutils
+from mitmproxy.net.http.http1 import assemble
+
+import pyperclip
+
+
+def curl_command(f: flow.Flow) -> str:
+ if not hasattr(f, "request"):
+ raise exceptions.CommandError("Can't export flow with no request.")
+ data = "curl "
+ request = f.request.copy() # type: ignore
+ request.decode(strict=False)
+ for k, v in request.headers.items(multi=True):
+ data += "-H '%s:%s' " % (k, v)
+ if request.method != "GET":
+ data += "-X %s " % request.method
+ data += "'%s'" % request.url
+ if request.content:
+ data += " --data-binary '%s'" % strutils.bytes_to_escaped_str(
+ request.content,
+ escape_single_quotes=True
+ )
+ return data
+
+
+def raw(f: flow.Flow) -> bytes:
+ if not hasattr(f, "request"):
+ raise exceptions.CommandError("Can't export flow with no request.")
+ return assemble.assemble_request(f.request) # type: ignore
+
+
+formats = dict(
+ curl = curl_command,
+ raw = raw,
+)
+
+
+class Export():
+ @command.command("export.formats")
+ def formats(self) -> typing.Sequence[str]:
+ """
+ Return a list of the supported export formats.
+ """
+ return list(sorted(formats.keys()))
+
+ @command.command("export.file")
+ def file(self, fmt: str, f: flow.Flow, path: str) -> None:
+ """
+ Export a flow to path.
+ """
+ if fmt not in formats:
+ raise exceptions.CommandError("No such export format: %s" % fmt)
+ func = formats[fmt] # type: typing.Any
+ v = func(f)
+ with open(path, "wb") as fp:
+ if isinstance(v, bytes):
+ fp.write(v)
+ else:
+ fp.write(v.encode("utf-8"))
+
+ @command.command("export.clip")
+ def clip(self, fmt: str, f: flow.Flow) -> None:
+ """
+ Export a flow to the system clipboard.
+ """
+ if fmt not in formats:
+ raise exceptions.CommandError("No such export format: %s" % fmt)
+ func = formats[fmt] # type: typing.Any
+ v = strutils.always_str(func(f))
+ pyperclip.copy(v)
diff --git a/mitmproxy/addons/view.py b/mitmproxy/addons/view.py
index 794e7617..a629ceb9 100644
--- a/mitmproxy/addons/view.py
+++ b/mitmproxy/addons/view.py
@@ -20,6 +20,7 @@ from mitmproxy import flowfilter
from mitmproxy import exceptions
from mitmproxy import command
from mitmproxy import ctx
+from mitmproxy import io
from mitmproxy import http # noqa
# The underlying sorted list implementation expects the sort key to be stable
@@ -265,6 +266,17 @@ class View(collections.Sequence):
"""
return self._store.get(flow_id)
+ @command.command("view.load")
+ def load_file(self, path: str) -> None:
+ """
+ Load flows into the view, without processing them with addons.
+ """
+ for i in io.FlowReader(open(path, "rb")).stream():
+ # Do this to get a new ID, so we can load the same file N times and
+ # get new flows each time. It would be more efficient to just have a
+ # .newid() method or something.
+ self.add([i.copy()])
+
@command.command("view.go")
def go(self, dst: int) -> None:
"""
diff --git a/mitmproxy/command.py b/mitmproxy/command.py
index 337afd76..82b8fae4 100644
--- a/mitmproxy/command.py
+++ b/mitmproxy/command.py
@@ -1,3 +1,6 @@
+"""
+ This module manges and invokes typed commands.
+"""
import inspect
import typing
import shlex
@@ -17,13 +20,15 @@ Cuts = typing.Sequence[
def typename(t: type, ret: bool) -> str:
"""
- Translates a type to an explanatory string. Ifl ret is True, we're
+ Translates a type to an explanatory string. If ret is True, we're
looking at a return type, else we're looking at a parameter type.
"""
- if t in (str, int, bool):
+ if issubclass(t, (str, int, bool)):
return t.__name__
elif t == typing.Sequence[flow.Flow]:
return "[flow]" if ret else "flowspec"
+ elif t == typing.Sequence[str]:
+ return "[str]"
elif t == Cuts:
return "[cuts]" if ret else "cutspec"
elif t == flow.Flow:
@@ -42,11 +47,20 @@ class Command:
if func.__doc__:
txt = func.__doc__.strip()
self.help = "\n".join(textwrap.wrap(txt))
+
+ self.has_positional = False
+ for i in sig.parameters.values():
+ # This is the kind for *args paramters
+ if i.kind == i.VAR_POSITIONAL:
+ self.has_positional = True
self.paramtypes = [v.annotation for v in sig.parameters.values()]
self.returntype = sig.return_annotation
def paramnames(self) -> typing.Sequence[str]:
- return [typename(i, False) for i in self.paramtypes]
+ v = [typename(i, False) for i in self.paramtypes]
+ if self.has_positional:
+ v[-1] = "*" + v[-1][1:-1]
+ return v
def retname(self) -> str:
return typename(self.returntype, True) if self.returntype else ""
@@ -62,17 +76,31 @@ class Command:
"""
Call the command with a set of arguments. At this point, all argumets are strings.
"""
- if len(self.paramtypes) != len(args):
+ if not self.has_positional and (len(self.paramtypes) != len(args)):
raise exceptions.CommandError("Usage: %s" % self.signature_help())
+ remainder = [] # type: typing.Sequence[str]
+ if self.has_positional:
+ remainder = args[len(self.paramtypes) - 1:]
+ args = args[:len(self.paramtypes) - 1]
+
pargs = []
for i in range(len(args)):
- pargs.append(parsearg(self.manager, args[i], self.paramtypes[i]))
+ if typecheck.check_command_type(args[i], self.paramtypes[i]):
+ pargs.append(args[i])
+ else:
+ pargs.append(parsearg(self.manager, args[i], self.paramtypes[i]))
+
+ if remainder:
+ if typecheck.check_command_type(remainder, self.paramtypes[-1]):
+ pargs.extend(remainder)
+ else:
+ raise exceptions.CommandError("Invalid value type.")
with self.manager.master.handlecontext():
ret = self.func(*pargs)
- if not typecheck.check_command_return_type(ret, self.returntype):
+ if not typecheck.check_command_type(ret, self.returntype):
raise exceptions.CommandError("Command returned unexpected data")
return ret
@@ -124,7 +152,7 @@ def parsearg(manager: CommandManager, spec: str, argtype: type) -> typing.Any:
"""
Convert a string to a argument to the appropriate type.
"""
- if argtype == str:
+ if issubclass(argtype, str):
return spec
elif argtype == bool:
if spec == "true":
@@ -135,7 +163,7 @@ def parsearg(manager: CommandManager, spec: str, argtype: type) -> typing.Any:
raise exceptions.CommandError(
"Booleans are 'true' or 'false', got %s" % spec
)
- elif argtype == int:
+ elif issubclass(argtype, int):
try:
return int(spec)
except ValueError as e:
@@ -151,6 +179,8 @@ def parsearg(manager: CommandManager, spec: str, argtype: type) -> typing.Any:
"Command requires one flow, specification matched %s." % len(flows)
)
return flows[0]
+ elif argtype == typing.Sequence[str]:
+ return [i.strip() for i in spec.split(",")]
else:
raise exceptions.CommandError("Unsupported argument type: %s" % argtype)
diff --git a/mitmproxy/export.py b/mitmproxy/export.py
deleted file mode 100644
index efa08874..00000000
--- a/mitmproxy/export.py
+++ /dev/null
@@ -1,183 +0,0 @@
-import io
-import json
-import pprint
-import re
-import textwrap
-from typing import Any
-
-from mitmproxy import http
-from mitmproxy.utils import strutils
-
-
-def curl_command(flow: http.HTTPFlow) -> str:
- data = "curl "
-
- request = flow.request.copy()
- request.decode(strict=False)
-
- for k, v in request.headers.items(multi=True):
- data += "-H '%s:%s' " % (k, v)
-
- if request.method != "GET":
- data += "-X %s " % request.method
-
- data += "'%s'" % request.url
-
- if request.content:
- data += " --data-binary '%s'" % strutils.bytes_to_escaped_str(
- request.content,
- escape_single_quotes=True
- )
-
- return data
-
-
-def python_arg(arg: str, val: Any) -> str:
- if not val:
- return ""
- if arg:
- arg += "="
- arg_str = "{}{},\n".format(
- arg,
- pprint.pformat(val, 79 - len(arg))
- )
- return textwrap.indent(arg_str, " " * 4)
-
-
-def python_code(flow: http.HTTPFlow):
- code = io.StringIO()
-
- def writearg(arg, val):
- code.write(python_arg(arg, val))
-
- code.write("import requests\n")
- code.write("\n")
- if flow.request.method.lower() in ("get", "post", "put", "head", "delete", "patch"):
- code.write("response = requests.{}(\n".format(flow.request.method.lower()))
- else:
- code.write("response = requests.request(\n")
- writearg("", flow.request.method)
- url_without_query = flow.request.url.split("?", 1)[0]
- writearg("", url_without_query)
-
- writearg("params", list(flow.request.query.fields))
-
- headers = flow.request.headers.copy()
- # requests adds those by default.
- for x in (":authority", "host", "content-length"):
- headers.pop(x, None)
- writearg("headers", dict(headers))
- try:
- if "json" not in flow.request.headers.get("content-type", ""):
- raise ValueError()
- writearg("json", json.loads(flow.request.text))
- except ValueError:
- writearg("data", flow.request.content)
-
- code.seek(code.tell() - 2) # remove last comma
- code.write("\n)\n")
- code.write("\n")
- code.write("print(response.text)")
-
- return code.getvalue()
-
-
-def locust_code(flow):
- code = textwrap.dedent("""
- from locust import HttpLocust, TaskSet, task
-
- class UserBehavior(TaskSet):
- def on_start(self):
- ''' on_start is called when a Locust start before any task is scheduled '''
- self.{name}()
-
- @task()
- def {name}(self):
- url = self.locust.host + '{path}'
- {headers}{params}{data}
- self.response = self.client.request(
- method='{method}',
- url=url,{args}
- )
-
- ### Additional tasks can go here ###
-
-
- class WebsiteUser(HttpLocust):
- task_set = UserBehavior
- min_wait = 1000
- max_wait = 3000
-""").strip()
-
- name = re.sub('\W|^(?=\d)', '_', flow.request.path.strip("/").split("?", 1)[0])
- if not name:
- new_name = "_".join([str(flow.request.host), str(flow.request.timestamp_start)])
- name = re.sub('\W|^(?=\d)', '_', new_name)
-
- path_without_query = flow.request.path.split("?")[0]
-
- args = ""
- headers = ""
-
- def conv(x):
- return strutils.bytes_to_escaped_str(x, escape_single_quotes=True)
-
- if flow.request.headers:
- lines = [
- (conv(k), conv(v)) for k, v in flow.request.headers.fields
- if conv(k).lower() not in [":authority", "host", "cookie"]
- ]
- lines = [" '%s': '%s',\n" % (k, v) for k, v in lines]
- headers += "\n headers = {\n%s }\n" % "".join(lines)
- args += "\n headers=headers,"
-
- params = ""
- if flow.request.query:
- lines = [
- " %s: %s,\n" % (repr(k), repr(v))
- for k, v in
- flow.request.query.collect()
- ]
- params = "\n params = {\n%s }\n" % "".join(lines)
- args += "\n params=params,"
-
- data = ""
- if flow.request.content:
- data = "\n data = '''%s'''\n" % conv(flow.request.content)
- args += "\n data=data,"
-
- code = code.format(
- name=name,
- path=path_without_query,
- headers=headers,
- params=params,
- data=data,
- method=flow.request.method,
- args=args,
- )
-
- return code
-
-
-def locust_task(flow):
- code = locust_code(flow)
- start_task = len(code.split('@task')[0]) - 4
- end_task = -19 - len(code.split('### Additional')[1])
- task_code = code[start_task:end_task]
-
- return task_code
-
-
-def url(flow):
- return flow.request.url
-
-
-EXPORTERS = [
- ("content", "c", None),
- ("headers+content", "h", None),
- ("url", "u", url),
- ("as curl command", "r", curl_command),
- ("as python code", "p", python_code),
- ("as locust code", "l", locust_code),
- ("as locust task", "t", locust_task),
-]
diff --git a/mitmproxy/tools/console/commandeditor.py b/mitmproxy/tools/console/commandeditor.py
index fd7d12ac..17d1506b 100644
--- a/mitmproxy/tools/console/commandeditor.py
+++ b/mitmproxy/tools/console/commandeditor.py
@@ -1,6 +1,8 @@
+import typing
import urwid
from mitmproxy import exceptions
+from mitmproxy import flow
from mitmproxy.tools.console import signals
@@ -23,5 +25,14 @@ class CommandExecutor:
except exceptions.CommandError as v:
signals.status_message.send(message=str(v))
else:
- if type(ret) == str:
- signals.status_message.send(message=ret)
+ if ret:
+ if type(ret) == typing.Sequence[flow.Flow]:
+ signals.status_message.send(
+ message="Command returned %s flows" % len(ret)
+ )
+ elif len(str(ret)) < 50:
+ signals.status_message.send(message=str(ret))
+ else:
+ signals.status_message.send(
+ message="Command returned too much data to display."
+ )
diff --git a/mitmproxy/tools/console/common.py b/mitmproxy/tools/console/common.py
index ec637cbc..812ca7a8 100644
--- a/mitmproxy/tools/console/common.py
+++ b/mitmproxy/tools/console/common.py
@@ -9,7 +9,6 @@ import urwid.util
import mitmproxy.net
from functools import lru_cache
from mitmproxy.tools.console import signals
-from mitmproxy import export
from mitmproxy.utils import human
try:
@@ -306,28 +305,6 @@ def ask_save_body(scope, flow):
signals.status_message.send(message="No content.")
-def export_to_clip_or_file(key, scope, flow, writer):
- """
- Export selected flow to clipboard or a file.
-
- key: _c_ontent, _h_eaders+content, _u_rl,
- cu_r_l_command, _p_ython_code,
- _l_ocust_code, locust_t_ask
- scope: None, _a_ll, re_q_uest, re_s_ponse
- writer: copy_to_clipboard_or_prompt, ask_save_path
- """
-
- for _, exp_key, exporter in export.EXPORTERS:
- if key == exp_key:
- if exporter is None: # 'c' & 'h'
- if scope is None:
- ask_scope_and_callback(flow, handle_flow_data, key, writer)
- else:
- handle_flow_data(scope, flow, key, writer)
- else: # other keys
- writer(exporter(flow))
-
-
@lru_cache(maxsize=800)
def raw_format_flow(f, flow):
f = dict(f)
@@ -418,13 +395,16 @@ def raw_format_flow(f, flow):
def format_flow(f, focus, extended=False, hostheader=False, max_url_len=False):
+ acked = False
+ if f.reply and f.reply.state == "committed":
+ acked = True
d = dict(
focus=focus,
extended=extended,
max_url_len=max_url_len,
intercepted = f.intercepted,
- acked = f.reply.state == "committed",
+ acked = acked,
req_timestamp = f.request.timestamp_start,
req_is_replay = f.request.is_replay,
diff --git a/mitmproxy/tools/console/flowlist.py b/mitmproxy/tools/console/flowlist.py
index 898c1478..4ffed15f 100644
--- a/mitmproxy/tools/console/flowlist.py
+++ b/mitmproxy/tools/console/flowlist.py
@@ -3,7 +3,6 @@ import urwid
from mitmproxy.tools.console import common
from mitmproxy.tools.console import signals
from mitmproxy.addons import view
-from mitmproxy import export
import mitmproxy.tools.console.master # noqa
@@ -140,25 +139,7 @@ class FlowItem(urwid.WidgetWrap):
def keypress(self, xxx_todo_changeme, key):
(maxcol,) = xxx_todo_changeme
- key = common.shortcuts(key)
- if key == "E":
- signals.status_prompt_onekey.send(
- self,
- prompt = "Export to file",
- keys = [(e[0], e[1]) for e in export.EXPORTERS],
- callback = common.export_to_clip_or_file,
- args = (None, self.flow, common.ask_save_path)
- )
- # elif key == "C":
- # signals.status_prompt_onekey.send(
- # self,
- # prompt = "Export to clipboard",
- # keys = [(e[0], e[1]) for e in export.EXPORTERS],
- # callback = common.export_to_clip_or_file,
- # args = (None, self.flow, common.copy_to_clipboard_or_prompt)
- # )
- else:
- return key
+ return common.shortcuts(key)
class FlowListWalker(urwid.ListWalker):
@@ -246,13 +227,7 @@ class FlowListBox(urwid.ListBox):
def keypress(self, size, key):
key = common.shortcuts(key)
- if key == "L":
- signals.status_prompt_path.send(
- self,
- prompt = "Load flows",
- callback = self.master.load_flows_callback
- )
- elif key == "M":
+ if key == "M":
self.master.view.toggle_marked()
elif key == "n":
signals.status_prompt_onekey.send(
diff --git a/mitmproxy/tools/console/flowview.py b/mitmproxy/tools/console/flowview.py
index 33c8f2ac..b7b7053f 100644
--- a/mitmproxy/tools/console/flowview.py
+++ b/mitmproxy/tools/console/flowview.py
@@ -8,7 +8,6 @@ import urwid
from mitmproxy import contentviews
from mitmproxy import exceptions
-from mitmproxy import export
from mitmproxy import http
from mitmproxy.net.http import Headers
from mitmproxy.net.http import status_codes
@@ -619,29 +618,31 @@ class FlowView(tabs.Tabs):
)
)
elif key == "E":
- if self.tab_offset == TAB_REQ:
- scope = "q"
- else:
- scope = "s"
- signals.status_prompt_onekey.send(
- self,
- prompt = "Export to file",
- keys = [(e[0], e[1]) for e in export.EXPORTERS],
- callback = common.export_to_clip_or_file,
- args = (scope, self.flow, common.ask_save_path)
- )
+ pass
+ # if self.tab_offset == TAB_REQ:
+ # scope = "q"
+ # else:
+ # scope = "s"
+ # signals.status_prompt_onekey.send(
+ # self,
+ # prompt = "Export to file",
+ # keys = [(e[0], e[1]) for e in export.EXPORTERS],
+ # callback = common.export_to_clip_or_file,
+ # args = (scope, self.flow, common.ask_save_path)
+ # )
elif key == "C":
- if self.tab_offset == TAB_REQ:
- scope = "q"
- else:
- scope = "s"
- signals.status_prompt_onekey.send(
- self,
- prompt = "Export to clipboard",
- keys = [(e[0], e[1]) for e in export.EXPORTERS],
- callback = common.export_to_clip_or_file,
- args = (scope, self.flow, common.copy_to_clipboard_or_prompt)
- )
+ pass
+ # if self.tab_offset == TAB_REQ:
+ # scope = "q"
+ # else:
+ # scope = "s"
+ # signals.status_prompt_onekey.send(
+ # self,
+ # prompt = "Export to clipboard",
+ # keys = [(e[0], e[1]) for e in export.EXPORTERS],
+ # callback = common.export_to_clip_or_file,
+ # args = (scope, self.flow, common.copy_to_clipboard_or_prompt)
+ # )
elif key == "x":
conn.content = None
signals.flow_change.send(self, flow=self.flow)
diff --git a/mitmproxy/tools/console/master.py b/mitmproxy/tools/console/master.py
index b9de2733..457f3721 100644
--- a/mitmproxy/tools/console/master.py
+++ b/mitmproxy/tools/console/master.py
@@ -9,9 +9,11 @@ import subprocess
import sys
import tempfile
import traceback
+import typing
import urwid
+from mitmproxy import ctx
from mitmproxy import addons
from mitmproxy import command
from mitmproxy import master
@@ -84,12 +86,31 @@ class ConsoleAddon:
self.master = master
self.started = False
+ @command.command("console.choose")
+ def console_choose(
+ self, prompt: str, choicecmd: str, *cmd: typing.Sequence[str]
+ ) -> None:
+ """
+ Prompt the user to choose from a list of strings returned by a
+ command, then invoke another command with all occurances of {choice}
+ replaced by the choice the user made.
+ """
+ choices = ctx.master.commands.call_args(choicecmd, [])
+
+ def callback(opt):
+ repl = " ".join(cmd)
+ repl = repl.replace("{choice}", opt)
+ self.master.commands.call(repl)
+
+ self.master.overlay(overlay.Chooser(choicecmd, choices, "", callback))
+ ctx.log.info(choices)
+
@command.command("console.command")
- def console_command(self, partial: str) -> None:
+ def console_command(self, *partial: typing.Sequence[str]) -> None:
"""
Prompt the user to edit a command with a (possilby empty) starting value.
"""
- signals.status_prompt_command.send(partial=partial)
+ signals.status_prompt_command.send(partial=" ".join(partial) + " ") # type: ignore
@command.command("console.view.commands")
def view_commands(self) -> None:
@@ -146,20 +167,27 @@ def default_keymap(km):
km.add("O", "console.view.options")
km.add("Q", "console.exit")
km.add("q", "console.view.pop")
- km.add("i", "console.command 'set intercept='")
- km.add("W", "console.command 'set save_stream_file='")
+ km.add("i", "console.command set intercept=")
+ km.add("W", "console.command set save_stream_file=")
km.add("A", "flow.resume @all", context="flowlist")
km.add("a", "flow.resume @focus", context="flowlist")
- km.add("b", "console.command 'cut.save s.content|@focus '", context="flowlist")
- km.add("C", "console.command 'cut.clip '", context="flowlist")
+ km.add("b", "console.command cut.save s.content|@focus ''", context="flowlist")
km.add("d", "view.remove @focus", context="flowlist")
km.add("D", "view.duplicate @focus", context="flowlist")
km.add("e", "set console_eventlog=toggle", context="flowlist")
+ km.add(
+ "E",
+ "console.choose Format export.formats "
+ "console.command export.file {choice} @focus ''",
+ context="flowlist"
+ )
km.add("f", "console.command 'set view_filter='", context="flowlist")
km.add("F", "set console_focus_follow=toggle", context="flowlist")
km.add("g", "view.go 0", context="flowlist")
km.add("G", "view.go -1", context="flowlist")
+ km.add("l", "console.command cut.clip ", context="flowlist")
+ km.add("L", "console.command view.load ", context="flowlist")
km.add("m", "flow.mark.toggle @focus", context="flowlist")
km.add("r", "replay.client @focus", context="flowlist")
km.add("S", "console.command 'replay.server '")
diff --git a/mitmproxy/types/multidict.py b/mitmproxy/types/multidict.py
index c4f42580..bd9766a3 100644
--- a/mitmproxy/types/multidict.py
+++ b/mitmproxy/types/multidict.py
@@ -155,22 +155,6 @@ class _MultiDict(MutableMapping, metaclass=ABCMeta):
else:
return super().items()
- def collect(self):
- """
- Returns a list of (key, value) tuples, where values are either
- singular if there is only one matching item for a key, or a list
- if there are more than one. The order of the keys matches the order
- in the underlying fields list.
- """
- coll = []
- for key in self:
- values = self.get_all(key)
- if len(values) == 1:
- coll.append([key, values[0]])
- else:
- coll.append([key, values])
- return coll
-
class MultiDict(_MultiDict, serializable.Serializable):
def __init__(self, fields=()):
diff --git a/mitmproxy/utils/typecheck.py b/mitmproxy/utils/typecheck.py
index c97ff529..a5f27fee 100644
--- a/mitmproxy/utils/typecheck.py
+++ b/mitmproxy/utils/typecheck.py
@@ -1,7 +1,7 @@
import typing
-def check_command_return_type(value: typing.Any, typeinfo: typing.Any) -> bool:
+def check_command_type(value: typing.Any, typeinfo: typing.Any) -> bool:
"""
Check if the provided value is an instance of typeinfo. Returns True if the
types match, False otherwise. This function supports only those types
@@ -17,7 +17,7 @@ def check_command_return_type(value: typing.Any, typeinfo: typing.Any) -> bool:
if not isinstance(value, (tuple, list)):
return False
for v in value:
- if not check_command_return_type(v, T):
+ if not check_command_type(v, T):
return False
elif typename.startswith("typing.Union"):
try:
@@ -26,7 +26,7 @@ def check_command_return_type(value: typing.Any, typeinfo: typing.Any) -> bool:
# Python 3.5.x
types = typeinfo.__union_params__ # type: ignore
for T in types:
- checks = [check_command_return_type(value, T) for T in types]
+ checks = [check_command_type(value, T) for T in types]
if not any(checks):
return False
elif value is None and typeinfo is None:
diff --git a/setup.cfg b/setup.cfg
index d0307bc8..993cad31 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -35,7 +35,6 @@ exclude =
mitmproxy/proxy/server.py
mitmproxy/tools/
mitmproxy/controller.py
- mitmproxy/export.py
mitmproxy/flow.py
mitmproxy/io/compat.py
mitmproxy/master.py
@@ -54,7 +53,6 @@ exclude =
mitmproxy/controller.py
mitmproxy/ctx.py
mitmproxy/exceptions.py
- mitmproxy/export.py
mitmproxy/flow.py
mitmproxy/io/io.py
mitmproxy/io/compat.py
@@ -85,7 +83,6 @@ exclude =
mitmproxy/proxy/root_context.py
mitmproxy/proxy/server.py
mitmproxy/stateobject.py
- mitmproxy/types/multidict.py
mitmproxy/utils/bits.py
pathod/language/actions.py
pathod/language/base.py
diff --git a/test/mitmproxy/addons/test_export.py b/test/mitmproxy/addons/test_export.py
new file mode 100644
index 00000000..5c7c4976
--- /dev/null
+++ b/test/mitmproxy/addons/test_export.py
@@ -0,0 +1,109 @@
+import pytest
+import os
+
+from mitmproxy import exceptions
+from mitmproxy.addons import export # heh
+from mitmproxy.test import tflow
+from mitmproxy.test import tutils
+from mitmproxy.test import taddons
+from unittest import mock
+
+
+@pytest.fixture
+def get_request():
+ return tflow.tflow(
+ req=tutils.treq(
+ method=b'GET',
+ content=b'',
+ path=b"/path?a=foo&a=bar&b=baz"
+ )
+ )
+
+
+@pytest.fixture
+def post_request():
+ return tflow.tflow(
+ req=tutils.treq(
+ method=b'POST',
+ headers=(),
+ content=bytes(range(256))
+ )
+ )
+
+
+@pytest.fixture
+def patch_request():
+ return tflow.tflow(
+ req=tutils.treq(method=b'PATCH', path=b"/path?query=param")
+ )
+
+
+@pytest.fixture
+def tcp_flow():
+ return tflow.ttcpflow()
+
+
+class TestExportCurlCommand:
+ def test_get(self, get_request):
+ result = """curl -H 'header:qvalue' -H 'content-length:7' 'http://address:22/path?a=foo&a=bar&b=baz'"""
+ assert export.curl_command(get_request) == result
+
+ def test_post(self, post_request):
+ result = "curl -X POST 'http://address:22/path' --data-binary '{}'".format(
+ str(bytes(range(256)))[2:-1]
+ )
+ assert export.curl_command(post_request) == result
+
+ def test_patch(self, patch_request):
+ result = """curl -H 'header:qvalue' -H 'content-length:7' -X PATCH 'http://address:22/path?query=param' --data-binary 'content'"""
+ assert export.curl_command(patch_request) == result
+
+ def test_tcp(self, tcp_flow):
+ with pytest.raises(exceptions.CommandError):
+ export.curl_command(tcp_flow)
+
+
+class TestRaw:
+ def test_get(self, get_request):
+ assert b"header: qvalue" in export.raw(get_request)
+
+ def test_tcp(self, tcp_flow):
+ with pytest.raises(exceptions.CommandError):
+ export.raw(tcp_flow)
+
+
+def qr(f):
+ with open(f, "rb") as fp:
+ return fp.read()
+
+
+def test_export(tmpdir):
+ f = str(tmpdir.join("path"))
+ e = export.Export()
+ with taddons.context():
+ assert e.formats() == ["curl", "raw"]
+ with pytest.raises(exceptions.CommandError):
+ e.file("nonexistent", tflow.tflow(resp=True), f)
+
+ e.file("raw", tflow.tflow(resp=True), f)
+ assert qr(f)
+ os.unlink(f)
+
+ e.file("curl", tflow.tflow(resp=True), f)
+ assert qr(f)
+ os.unlink(f)
+
+
+def test_clip(tmpdir):
+ e = export.Export()
+ with taddons.context():
+ with pytest.raises(exceptions.CommandError):
+ e.clip("nonexistent", tflow.tflow(resp=True))
+
+ with mock.patch('pyperclip.copy') as pc:
+ e.clip("raw", tflow.tflow(resp=True))
+ assert pc.called
+
+ with mock.patch('pyperclip.copy') as pc:
+ e.clip("curl", tflow.tflow(resp=True))
+ assert pc.called
diff --git a/test/mitmproxy/addons/test_view.py b/test/mitmproxy/addons/test_view.py
index 979f0aa1..eddcb04c 100644
--- a/test/mitmproxy/addons/test_view.py
+++ b/test/mitmproxy/addons/test_view.py
@@ -5,6 +5,7 @@ from mitmproxy.test import tflow
from mitmproxy.addons import view
from mitmproxy import flowfilter
from mitmproxy import exceptions
+from mitmproxy import io
from mitmproxy.test import taddons
@@ -130,10 +131,28 @@ def test_filter():
assert len(v) == 4
-def test_load():
+def tdump(path, flows):
+ w = io.FlowWriter(open(path, "wb"))
+ for i in flows:
+ w.add(i)
+
+
+def test_load(tmpdir):
+ path = str(tmpdir.join("path"))
v = view.View()
with taddons.context() as tctx:
tctx.master.addons.add(v)
+ tdump(
+ path,
+ [
+ tflow.tflow(resp=True),
+ tflow.tflow(resp=True)
+ ]
+ )
+ v.load_file(path)
+ assert len(v) == 2
+ v.load_file(path)
+ assert len(v) == 4
def test_resolve():
diff --git a/test/mitmproxy/data/test_flow_export/locust_get.py b/test/mitmproxy/data/test_flow_export/locust_get.py
deleted file mode 100644
index 632d5d53..00000000
--- a/test/mitmproxy/data/test_flow_export/locust_get.py
+++ /dev/null
@@ -1,35 +0,0 @@
-from locust import HttpLocust, TaskSet, task
-
-class UserBehavior(TaskSet):
- def on_start(self):
- ''' on_start is called when a Locust start before any task is scheduled '''
- self.path()
-
- @task()
- def path(self):
- url = self.locust.host + '/path'
-
- headers = {
- 'header': 'qvalue',
- 'content-length': '7',
- }
-
- params = {
- 'a': ['foo', 'bar'],
- 'b': 'baz',
- }
-
- self.response = self.client.request(
- method='GET',
- url=url,
- headers=headers,
- params=params,
- )
-
- ### Additional tasks can go here ###
-
-
-class WebsiteUser(HttpLocust):
- task_set = UserBehavior
- min_wait = 1000
- max_wait = 3000
diff --git a/test/mitmproxy/data/test_flow_export/locust_patch.py b/test/mitmproxy/data/test_flow_export/locust_patch.py
deleted file mode 100644
index f64e0857..00000000
--- a/test/mitmproxy/data/test_flow_export/locust_patch.py
+++ /dev/null
@@ -1,37 +0,0 @@
-from locust import HttpLocust, TaskSet, task
-
-class UserBehavior(TaskSet):
- def on_start(self):
- ''' on_start is called when a Locust start before any task is scheduled '''
- self.path()
-
- @task()
- def path(self):
- url = self.locust.host + '/path'
-
- headers = {
- 'header': 'qvalue',
- 'content-length': '7',
- }
-
- params = {
- 'query': 'param',
- }
-
- data = '''content'''
-
- self.response = self.client.request(
- method='PATCH',
- url=url,
- headers=headers,
- params=params,
- data=data,
- )
-
- ### Additional tasks can go here ###
-
-
-class WebsiteUser(HttpLocust):
- task_set = UserBehavior
- min_wait = 1000
- max_wait = 3000
diff --git a/test/mitmproxy/data/test_flow_export/locust_post.py b/test/mitmproxy/data/test_flow_export/locust_post.py
deleted file mode 100644
index df23476a..00000000
--- a/test/mitmproxy/data/test_flow_export/locust_post.py
+++ /dev/null
@@ -1,26 +0,0 @@
-from locust import HttpLocust, TaskSet, task
-
-class UserBehavior(TaskSet):
- def on_start(self):
- ''' on_start is called when a Locust start before any task is scheduled '''
- self.path()
-
- @task()
- def path(self):
- url = self.locust.host + '/path'
-
- data = '''content'''
-
- self.response = self.client.request(
- method='POST',
- url=url,
- data=data,
- )
-
- ### Additional tasks can go here ###
-
-
-class WebsiteUser(HttpLocust):
- task_set = UserBehavior
- min_wait = 1000
- max_wait = 3000
diff --git a/test/mitmproxy/data/test_flow_export/locust_task_get.py b/test/mitmproxy/data/test_flow_export/locust_task_get.py
deleted file mode 100644
index 03821cd8..00000000
--- a/test/mitmproxy/data/test_flow_export/locust_task_get.py
+++ /dev/null
@@ -1,20 +0,0 @@
- @task()
- def path(self):
- url = self.locust.host + '/path'
-
- headers = {
- 'header': 'qvalue',
- 'content-length': '7',
- }
-
- params = {
- 'a': ['foo', 'bar'],
- 'b': 'baz',
- }
-
- self.response = self.client.request(
- method='GET',
- url=url,
- headers=headers,
- params=params,
- )
diff --git a/test/mitmproxy/data/test_flow_export/locust_task_patch.py b/test/mitmproxy/data/test_flow_export/locust_task_patch.py
deleted file mode 100644
index d425209c..00000000
--- a/test/mitmproxy/data/test_flow_export/locust_task_patch.py
+++ /dev/null
@@ -1,22 +0,0 @@
- @task()
- def path(self):
- url = self.locust.host + '/path'
-
- headers = {
- 'header': 'qvalue',
- 'content-length': '7',
- }
-
- params = {
- 'query': 'param',
- }
-
- data = '''content'''
-
- self.response = self.client.request(
- method='PATCH',
- url=url,
- headers=headers,
- params=params,
- data=data,
- )
diff --git a/test/mitmproxy/data/test_flow_export/locust_task_post.py b/test/mitmproxy/data/test_flow_export/locust_task_post.py
deleted file mode 100644
index a5f307ee..00000000
--- a/test/mitmproxy/data/test_flow_export/locust_task_post.py
+++ /dev/null
@@ -1,11 +0,0 @@
- @task()
- def path(self):
- url = self.locust.host + '/path'
-
- data = '''\x00\x01\x02\x03\x04\x05\x06\x07\x08\t\n\x0b\x0c\r\x0e\x0f\x10\x11\x12\x13\x14\x15\x16\x17\x18\x19\x1a\x1b\x1c\x1d\x1e\x1f !"#$%&\'()*+,-./0123456789:;<=>?@ABCDEFGHIJKLMNOPQRSTUVWXYZ[\\]^_`abcdefghijklmnopqrstuvwxyz{|}~\x7f\x80\x81\x82\x83\x84\x85\x86\x87\x88\x89\x8a\x8b\x8c\x8d\x8e\x8f\x90\x91\x92\x93\x94\x95\x96\x97\x98\x99\x9a\x9b\x9c\x9d\x9e\x9f\xa0\xa1\xa2\xa3\xa4\xa5\xa6\xa7\xa8\xa9\xaa\xab\xac\xad\xae\xaf\xb0\xb1\xb2\xb3\xb4\xb5\xb6\xb7\xb8\xb9\xba\xbb\xbc\xbd\xbe\xbf\xc0\xc1\xc2\xc3\xc4\xc5\xc6\xc7\xc8\xc9\xca\xcb\xcc\xcd\xce\xcf\xd0\xd1\xd2\xd3\xd4\xd5\xd6\xd7\xd8\xd9\xda\xdb\xdc\xdd\xde\xdf\xe0\xe1\xe2\xe3\xe4\xe5\xe6\xe7\xe8\xe9\xea\xeb\xec\xed\xee\xef\xf0\xf1\xf2\xf3\xf4\xf5\xf6\xf7\xf8\xf9\xfa\xfb\xfc\xfd\xfe\xff'''
-
- self.response = self.client.request(
- method='POST',
- url=url,
- data=data,
- )
diff --git a/test/mitmproxy/data/test_flow_export/python_get.py b/test/mitmproxy/data/test_flow_export/python_get.py
deleted file mode 100644
index e9ed072a..00000000
--- a/test/mitmproxy/data/test_flow_export/python_get.py
+++ /dev/null
@@ -1,9 +0,0 @@
-import requests
-
-response = requests.get(
- 'http://address:22/path',
- params=[('a', 'foo'), ('a', 'bar'), ('b', 'baz')],
- headers={'header': 'qvalue'}
-)
-
-print(response.text) \ No newline at end of file
diff --git a/test/mitmproxy/data/test_flow_export/python_patch.py b/test/mitmproxy/data/test_flow_export/python_patch.py
deleted file mode 100644
index d83a57b9..00000000
--- a/test/mitmproxy/data/test_flow_export/python_patch.py
+++ /dev/null
@@ -1,10 +0,0 @@
-import requests
-
-response = requests.patch(
- 'http://address:22/path',
- params=[('query', 'param')],
- headers={'header': 'qvalue'},
- data=b'content'
-)
-
-print(response.text) \ No newline at end of file
diff --git a/test/mitmproxy/data/test_flow_export/python_post.py b/test/mitmproxy/data/test_flow_export/python_post.py
deleted file mode 100644
index 42f1af9a..00000000
--- a/test/mitmproxy/data/test_flow_export/python_post.py
+++ /dev/null
@@ -1,17 +0,0 @@
-import requests
-
-response = requests.post(
- 'http://address:22/path',
- data=(b'\x00\x01\x02\x03\x04\x05\x06\x07\x08\t\n\x0b\x0c\r\x0e\x0f\x10\x11\x12\x13'
- b'\x14\x15\x16\x17\x18\x19\x1a\x1b\x1c\x1d\x1e\x1f !"#$%&\'()*+,-./01234567'
- b'89:;<=>?@ABCDEFGHIJKLMNOPQRSTUVWXYZ[\\]^_`abcdefghijklmnopqrstuvwxyz{|}~\x7f'
- b'\x80\x81\x82\x83\x84\x85\x86\x87\x88\x89\x8a\x8b\x8c\x8d\x8e\x8f'
- b'\x90\x91\x92\x93\x94\x95\x96\x97\x98\x99\x9a\x9b\x9c\x9d\x9e\x9f'
- b'\xa0\xa1\xa2\xa3\xa4\xa5\xa6\xa7\xa8\xa9\xaa\xab\xac\xad\xae\xaf'
- b'\xb0\xb1\xb2\xb3\xb4\xb5\xb6\xb7\xb8\xb9\xba\xbb\xbc\xbd\xbe\xbf'
- b'\xc0\xc1\xc2\xc3\xc4\xc5\xc6\xc7\xc8\xc9\xca\xcb\xcc\xcd\xce\xcf'
- b'\xd0\xd1\xd2\xd3\xd4\xd5\xd6\xd7\xd8\xd9\xda\xdb\xdc\xdd\xde\xdf'
- b'\xe0\xe1\xe2\xe3\xe4\xe5\xe6\xe7\xe8\xe9\xea\xeb\xec\xed\xee\xef'
- b'\xf0\xf1\xf2\xf3\xf4\xf5\xf6\xf7\xf8\xf9\xfa\xfb\xfc\xfd\xfe\xff')
-)
-print(response.text)
diff --git a/test/mitmproxy/data/test_flow_export/python_post_json.py b/test/mitmproxy/data/test_flow_export/python_post_json.py
deleted file mode 100644
index d6ae6357..00000000
--- a/test/mitmproxy/data/test_flow_export/python_post_json.py
+++ /dev/null
@@ -1,9 +0,0 @@
-import requests
-
-response = requests.post(
- 'http://address:22/path',
- headers={'content-type': 'application/json'},
- json={'email': 'example@example.com', 'name': 'example'}
-)
-
-print(response.text) \ No newline at end of file
diff --git a/test/mitmproxy/test_command.py b/test/mitmproxy/test_command.py
index 24d11d37..958328b2 100644
--- a/test/mitmproxy/test_command.py
+++ b/test/mitmproxy/test_command.py
@@ -1,9 +1,6 @@
import typing
from mitmproxy import command
from mitmproxy import flow
-from mitmproxy import master
-from mitmproxy import options
-from mitmproxy import proxy
from mitmproxy import exceptions
from mitmproxy.test import tflow
from mitmproxy.test import taddons
@@ -19,24 +16,41 @@ class TAddon:
def cmd2(self, foo: str) -> str:
return 99
+ def cmd3(self, foo: int) -> int:
+ return foo
+
def empty(self) -> None:
pass
+ def varargs(self, one: str, *var: typing.Sequence[str]) -> typing.Sequence[str]:
+ return list(var)
+
class TestCommand:
+ def test_varargs(self):
+ with taddons.context() as tctx:
+ cm = command.CommandManager(tctx.master)
+ a = TAddon()
+ c = command.Command(cm, "varargs", a.varargs)
+ assert c.signature_help() == "varargs str *str -> [str]"
+ assert c.call(["one", "two", "three"]) == ["two", "three"]
+ with pytest.raises(exceptions.CommandError):
+ c.call(["one", "two", 3])
+
def test_call(self):
- o = options.Options()
- m = master.Master(o, proxy.DummyServer(o))
- cm = command.CommandManager(m)
+ with taddons.context() as tctx:
+ cm = command.CommandManager(tctx.master)
+ a = TAddon()
+ c = command.Command(cm, "cmd.path", a.cmd1)
+ assert c.call(["foo"]) == "ret foo"
+ assert c.signature_help() == "cmd.path str -> str"
- a = TAddon()
- c = command.Command(cm, "cmd.path", a.cmd1)
- assert c.call(["foo"]) == "ret foo"
- assert c.signature_help() == "cmd.path str -> str"
+ c = command.Command(cm, "cmd.two", a.cmd2)
+ with pytest.raises(exceptions.CommandError):
+ c.call(["foo"])
- c = command.Command(cm, "cmd.two", a.cmd2)
- with pytest.raises(exceptions.CommandError):
- c.call(["foo"])
+ c = command.Command(cm, "cmd.three", a.cmd3)
+ assert c.call(["1"]) == 1
def test_simple():
@@ -70,17 +84,16 @@ def test_typename():
assert command.typename(command.Cuts, True) == "[cuts]"
assert command.typename(flow.Flow, False) == "flow"
+ assert command.typename(typing.Sequence[str], False) == "[str]"
class DummyConsole:
- def load(self, l):
- l.add_command("view.resolve", self.resolve)
- l.add_command("cut", self.cut)
-
+ @command.command("view.resolve")
def resolve(self, spec: str) -> typing.Sequence[flow.Flow]:
n = int(spec)
return [tflow.tflow(resp=True)] * n
+ @command.command("cut")
def cut(self, spec: str) -> command.Cuts:
return [["test"]]
@@ -114,6 +127,13 @@ def test_parsearg():
tctx.master.commands, "foo", command.Cuts
) == [["test"]]
+ assert command.parsearg(
+ tctx.master.commands, "foo", typing.Sequence[str]
+ ) == ["foo"]
+ assert command.parsearg(
+ tctx.master.commands, "foo, bar", typing.Sequence[str]
+ ) == ["foo", "bar"]
+
class TDec:
@command.command("cmd1")
diff --git a/test/mitmproxy/test_export.py b/test/mitmproxy/test_export.py
deleted file mode 100644
index b789e6b5..00000000
--- a/test/mitmproxy/test_export.py
+++ /dev/null
@@ -1,133 +0,0 @@
-import re
-
-import pytest
-
-from mitmproxy import export # heh
-from mitmproxy.net.http import Headers
-from mitmproxy.test import tflow
-from mitmproxy.test import tutils
-
-
-def clean_blanks(s):
- return re.sub(r"^\s+", "", s, flags=re.MULTILINE)
-
-
-def python_equals(testdata, text):
- """
- Compare two bits of Python code, disregarding non-significant differences
- like whitespace on blank lines and trailing space.
- """
- d = open(tutils.test_data.path(testdata)).read()
- assert clean_blanks(text).rstrip() == clean_blanks(d).rstrip()
-
-
-@pytest.fixture
-def get_request():
- return tflow.tflow(
- req=tutils.treq(
- method=b'GET',
- content=b'',
- path=b"/path?a=foo&a=bar&b=baz"
- )
- )
-
-
-@pytest.fixture
-def post_request():
- return tflow.tflow(
- req=tutils.treq(
- method=b'POST',
- headers=(),
- content=bytes(range(256))
- )
- )
-
-
-@pytest.fixture
-def patch_request():
- return tflow.tflow(
- req=tutils.treq(method=b'PATCH', path=b"/path?query=param")
- )
-
-
-class TExport:
- def test_get(self, get_request):
- raise NotImplementedError()
-
- def test_post(self, post_request):
- raise NotImplementedError()
-
- def test_patch(self, patch_request):
- raise NotImplementedError()
-
-
-class TestExportCurlCommand(TExport):
- def test_get(self, get_request):
- result = """curl -H 'header:qvalue' -H 'content-length:7' 'http://address:22/path?a=foo&a=bar&b=baz'"""
- assert export.curl_command(get_request) == result
-
- def test_post(self, post_request):
- result = "curl -X POST 'http://address:22/path' --data-binary '{}'".format(
- str(bytes(range(256)))[2:-1]
- )
- assert export.curl_command(post_request) == result
-
- def test_patch(self, patch_request):
- result = """curl -H 'header:qvalue' -H 'content-length:7' -X PATCH 'http://address:22/path?query=param' --data-binary 'content'"""
- assert export.curl_command(patch_request) == result
-
-
-class TestExportPythonCode(TExport):
- def test_get(self, get_request):
- python_equals("mitmproxy/data/test_flow_export/python_get.py",
- export.python_code(get_request))
-
- def test_post(self, post_request):
- python_equals("mitmproxy/data/test_flow_export/python_post.py",
- export.python_code(post_request))
-
- def test_post_json(self, post_request):
- post_request.request.content = b'{"name": "example", "email": "example@example.com"}'
- post_request.request.headers = Headers(content_type="application/json")
- python_equals("mitmproxy/data/test_flow_export/python_post_json.py",
- export.python_code(post_request))
-
- def test_patch(self, patch_request):
- python_equals("mitmproxy/data/test_flow_export/python_patch.py",
- export.python_code(patch_request))
-
-
-class TestExportLocustCode(TExport):
- def test_get(self, get_request):
- python_equals("mitmproxy/data/test_flow_export/locust_get.py",
- export.locust_code(get_request))
-
- def test_post(self, post_request):
- post_request.request.content = b'content'
- post_request.request.headers.clear()
- python_equals("mitmproxy/data/test_flow_export/locust_post.py",
- export.locust_code(post_request))
-
- def test_patch(self, patch_request):
- python_equals("mitmproxy/data/test_flow_export/locust_patch.py",
- export.locust_code(patch_request))
-
-
-class TestExportLocustTask(TExport):
- def test_get(self, get_request):
- python_equals("mitmproxy/data/test_flow_export/locust_task_get.py",
- export.locust_task(get_request))
-
- def test_post(self, post_request):
- python_equals("mitmproxy/data/test_flow_export/locust_task_post.py",
- export.locust_task(post_request))
-
- def test_patch(self, patch_request):
- python_equals("mitmproxy/data/test_flow_export/locust_task_patch.py",
- export.locust_task(patch_request))
-
-
-class TestURL:
- def test_url(self):
- flow = tflow.tflow()
- assert export.url(flow) == "http://address:22/path"
diff --git a/test/mitmproxy/utils/test_typecheck.py b/test/mitmproxy/utils/test_typecheck.py
index 17f70d37..fe33070e 100644
--- a/test/mitmproxy/utils/test_typecheck.py
+++ b/test/mitmproxy/utils/test_typecheck.py
@@ -88,25 +88,26 @@ def test_check_any():
typecheck.check_option_type("foo", None, typing.Any)
-def test_check_command_return_type():
- assert(typecheck.check_command_return_type("foo", str))
- assert(typecheck.check_command_return_type(["foo"], typing.Sequence[str]))
- assert(typecheck.check_command_return_type(None, None))
- assert(not typecheck.check_command_return_type(["foo"], typing.Sequence[int]))
- assert(not typecheck.check_command_return_type("foo", typing.Sequence[int]))
- assert(typecheck.check_command_return_type([["foo", b"bar"]], command.Cuts))
- assert(not typecheck.check_command_return_type(["foo", b"bar"], command.Cuts))
- assert(not typecheck.check_command_return_type([["foo", 22]], command.Cuts))
+def test_check_command_type():
+ assert(typecheck.check_command_type("foo", str))
+ assert(typecheck.check_command_type(["foo"], typing.Sequence[str]))
+ assert(not typecheck.check_command_type(["foo", 1], typing.Sequence[str]))
+ assert(typecheck.check_command_type(None, None))
+ assert(not typecheck.check_command_type(["foo"], typing.Sequence[int]))
+ assert(not typecheck.check_command_type("foo", typing.Sequence[int]))
+ assert(typecheck.check_command_type([["foo", b"bar"]], command.Cuts))
+ assert(not typecheck.check_command_type(["foo", b"bar"], command.Cuts))
+ assert(not typecheck.check_command_type([["foo", 22]], command.Cuts))
# Python 3.5 only defines __parameters__
m = mock.Mock()
m.__str__ = lambda self: "typing.Sequence"
m.__parameters__ = (int,)
- typecheck.check_command_return_type([10], m)
+ typecheck.check_command_type([10], m)
# Python 3.5 only defines __union_params__
m = mock.Mock()
m.__str__ = lambda self: "typing.Union"
m.__union_params__ = (int,)
- assert not typecheck.check_command_return_type([22], m)
+ assert not typecheck.check_command_type([22], m)