aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--mitmproxy/protocol/http2.py58
-rw-r--r--pathod/language/actions.py9
-rw-r--r--setup.py2
-rw-r--r--test/pathod/test_language_actions.py20
4 files changed, 61 insertions, 28 deletions
diff --git a/mitmproxy/protocol/http2.py b/mitmproxy/protocol/http2.py
index 39512c8f..9247e657 100644
--- a/mitmproxy/protocol/http2.py
+++ b/mitmproxy/protocol/http2.py
@@ -73,7 +73,7 @@ class SafeH2Connection(connection.H2Connection):
frame_chunk = chunk[position:position + max_outbound_frame_size]
if self.local_flow_control_window(stream_id) < len(frame_chunk):
self.lock.release()
- time.sleep(0)
+ time.sleep(0.1)
continue
self.send_data(stream_id, frame_chunk)
self.conn.send(self.data_to_send())
@@ -165,9 +165,21 @@ class Http2Layer(base.Layer):
new_settings = dict([(id, cs.new_value) for (id, cs) in six.iteritems(event.changed_settings)])
other_conn.h2.safe_update_settings(new_settings)
elif isinstance(event, events.ConnectionTerminated):
- # Do not immediately terminate the other connection.
- # Some streams might be still sending data to the client.
- return False
+ if event.error_code == h2.errors.NO_ERROR:
+ # Do not immediately terminate the other connection.
+ # Some streams might be still sending data to the client.
+ return False
+ else:
+ # Something terrible has happened - kill everything!
+ self.client_conn.h2.close_connection(
+ error_code=event.error_code,
+ last_stream_id=event.last_stream_id,
+ additional_data=event.additional_data
+ )
+ self.client_conn.send(self.client_conn.h2.data_to_send())
+ self._kill_all_streams()
+ return False
+
elif isinstance(event, events.PushedStreamReceived):
# pushed stream ids should be unique and not dependent on race conditions
# only the parent stream id must be looked up first
@@ -177,7 +189,6 @@ class Http2Layer(base.Layer):
self.client_conn.send(self.client_conn.h2.data_to_send())
headers = netlib.http.Headers([[str(k), str(v)] for k, v in event.headers])
- headers['x-mitmproxy-pushed'] = 'true'
self.streams[event.pushed_stream_id] = Http2SingleStreamLayer(self, event.pushed_stream_id, headers)
self.streams[event.pushed_stream_id].timestamp_start = time.time()
self.streams[event.pushed_stream_id].pushed = True
@@ -352,8 +363,22 @@ class Http2SingleStreamLayer(http._HttpTransmissionLayer, threading.Thread):
raise NotImplementedError()
def send_request(self, message):
+ if not hasattr(self.server_conn, 'h2'):
+ raise exceptions.Http2ProtocolException("Zombie Stream")
+
+ while True:
+ self.server_conn.h2.lock.acquire()
+ max_streams = self.server_conn.h2.remote_settings.max_concurrent_streams
+ if self.server_conn.h2.open_outbound_streams + 1 >= max_streams:
+ # wait until we get a free slot for a new outgoing stream
+ self.server_conn.h2.lock.release()
+ time.sleep(0.1)
+ else:
+ break
+
if self.pushed:
# nothing to do here
+ self.server_conn.h2.lock.release()
return
with self.server_conn.h2.lock:
@@ -364,16 +389,19 @@ class Http2SingleStreamLayer(http._HttpTransmissionLayer, threading.Thread):
self.server_stream_id = self.server_conn.h2.get_next_available_stream_id()
self.server_to_client_stream_ids[self.server_stream_id] = self.client_stream_id
- headers = message.headers.copy()
- headers.insert(0, ":path", message.path)
- headers.insert(0, ":method", message.method)
- headers.insert(0, ":scheme", message.scheme)
+ headers = message.headers.copy()
+ headers.insert(0, ":path", message.path)
+ headers.insert(0, ":method", message.method)
+ headers.insert(0, ":scheme", message.scheme)
+ self.server_stream_id = self.server_conn.h2.get_next_available_stream_id()
+ self.server_to_client_stream_ids[self.server_stream_id] = self.client_stream_id
+ self.server_conn.h2.safe_send_headers(
+ self.is_zombie,
+ self.server_stream_id,
+ headers,
+ )
+ self.server_conn.h2.lock.release()
- self.server_conn.h2.safe_send_headers(
- self.is_zombie,
- self.server_stream_id,
- headers
- )
self.server_conn.h2.safe_send_body(
self.is_zombie,
self.server_stream_id,
@@ -408,7 +436,7 @@ class Http2SingleStreamLayer(http._HttpTransmissionLayer, threading.Thread):
if self.response_data_finished.is_set():
while self.response_data_queue.qsize() > 0:
yield self.response_data_queue.get()
- return
+ break
if self.zombie: # pragma: no cover
raise exceptions.Http2ProtocolException("Zombie Stream")
diff --git a/pathod/language/actions.py b/pathod/language/actions.py
index 34a9bafb..e85affac 100644
--- a/pathod/language/actions.py
+++ b/pathod/language/actions.py
@@ -1,12 +1,14 @@
import abc
import copy
import random
+from functools import total_ordering
import pyparsing as pp
from . import base
+@total_ordering
class _Action(base.Token):
"""
@@ -31,8 +33,11 @@ class _Action(base.Token):
c.offset = l + 1
return c
- def __cmp__(self, other):
- return cmp(self.offset, other.offset)
+ def __lt__(self, other):
+ return self.offset < other.offset
+
+ def __eq__(self, other):
+ return self.offset == other.offset
def __repr__(self):
return self.spec()
diff --git a/setup.py b/setup.py
index e68be13a..2e7f0a0b 100644
--- a/setup.py
+++ b/setup.py
@@ -65,7 +65,7 @@ setup(
"certifi>=2015.11.20.1", # no semver here - this should always be on the last release!
"configargparse>=0.10, <0.11",
"construct>=2.5.2, <2.6",
- "cryptography>=1.3, <1.4",
+ "cryptography>=1.3, <1.5",
"Flask>=0.10.1, <0.11",
"h2>=2.3.1, <3",
"html2text>=2016.1.8, <=2016.4.2",
diff --git a/test/pathod/test_language_actions.py b/test/pathod/test_language_actions.py
index 81d2155d..f12d8105 100644
--- a/test/pathod/test_language_actions.py
+++ b/test/pathod/test_language_actions.py
@@ -1,11 +1,11 @@
-from six.moves import cStringIO as StringIO
+from six import BytesIO
from pathod.language import actions
from pathod import language
def parse_request(s):
- return language.parse_pathoc(s).next()
+ return next(language.parse_pathoc(s))
def test_unique_name():
@@ -16,9 +16,9 @@ def test_unique_name():
class TestDisconnects:
def test_parse_pathod(self):
- a = language.parse_pathod("400:d0").next().actions[0]
+ a = next(language.parse_pathod("400:d0")).actions[0]
assert a.spec() == "d0"
- a = language.parse_pathod("400:dr").next().actions[0]
+ a = next(language.parse_pathod("400:dr")).actions[0]
assert a.spec() == "dr"
def test_at(self):
@@ -42,18 +42,18 @@ class TestDisconnects:
class TestInject:
def test_parse_pathod(self):
- a = language.parse_pathod("400:ir,@100").next().actions[0]
+ a = next(language.parse_pathod("400:ir,@100")).actions[0]
assert a.offset == "r"
assert a.value.datatype == "bytes"
assert a.value.usize == 100
- a = language.parse_pathod("400:ia,@100").next().actions[0]
+ a = next(language.parse_pathod("400:ia,@100")).actions[0]
assert a.offset == "a"
def test_at(self):
e = actions.InjectAt.expr()
v = e.parseString("i0,'foo'")[0]
- assert v.value.val == "foo"
+ assert v.value.val == b"foo"
assert v.offset == 0
assert isinstance(v, actions.InjectAt)
@@ -61,8 +61,8 @@ class TestInject:
assert v.offset == "r"
def test_serve(self):
- s = StringIO()
- r = language.parse_pathod("400:i0,'foo'").next()
+ s = BytesIO()
+ r = next(language.parse_pathod("400:i0,'foo'"))
assert language.serve(r, s, {})
def test_spec(self):
@@ -96,7 +96,7 @@ class TestPauses:
assert v.offset == "a"
def test_request(self):
- r = language.parse_pathod('400:p10,10').next()
+ r = next(language.parse_pathod('400:p10,10'))
assert r.actions[0].spec() == "p10,10"
def test_spec(self):