diff options
-rw-r--r-- | mitmproxy/protocol/http2.py | 58 | ||||
-rw-r--r-- | pathod/language/actions.py | 9 | ||||
-rw-r--r-- | setup.py | 2 | ||||
-rw-r--r-- | test/pathod/test_language_actions.py | 20 |
4 files changed, 61 insertions, 28 deletions
diff --git a/mitmproxy/protocol/http2.py b/mitmproxy/protocol/http2.py index 39512c8f..9247e657 100644 --- a/mitmproxy/protocol/http2.py +++ b/mitmproxy/protocol/http2.py @@ -73,7 +73,7 @@ class SafeH2Connection(connection.H2Connection): frame_chunk = chunk[position:position + max_outbound_frame_size] if self.local_flow_control_window(stream_id) < len(frame_chunk): self.lock.release() - time.sleep(0) + time.sleep(0.1) continue self.send_data(stream_id, frame_chunk) self.conn.send(self.data_to_send()) @@ -165,9 +165,21 @@ class Http2Layer(base.Layer): new_settings = dict([(id, cs.new_value) for (id, cs) in six.iteritems(event.changed_settings)]) other_conn.h2.safe_update_settings(new_settings) elif isinstance(event, events.ConnectionTerminated): - # Do not immediately terminate the other connection. - # Some streams might be still sending data to the client. - return False + if event.error_code == h2.errors.NO_ERROR: + # Do not immediately terminate the other connection. + # Some streams might be still sending data to the client. + return False + else: + # Something terrible has happened - kill everything! + self.client_conn.h2.close_connection( + error_code=event.error_code, + last_stream_id=event.last_stream_id, + additional_data=event.additional_data + ) + self.client_conn.send(self.client_conn.h2.data_to_send()) + self._kill_all_streams() + return False + elif isinstance(event, events.PushedStreamReceived): # pushed stream ids should be unique and not dependent on race conditions # only the parent stream id must be looked up first @@ -177,7 +189,6 @@ class Http2Layer(base.Layer): self.client_conn.send(self.client_conn.h2.data_to_send()) headers = netlib.http.Headers([[str(k), str(v)] for k, v in event.headers]) - headers['x-mitmproxy-pushed'] = 'true' self.streams[event.pushed_stream_id] = Http2SingleStreamLayer(self, event.pushed_stream_id, headers) self.streams[event.pushed_stream_id].timestamp_start = time.time() self.streams[event.pushed_stream_id].pushed = True @@ -352,8 +363,22 @@ class Http2SingleStreamLayer(http._HttpTransmissionLayer, threading.Thread): raise NotImplementedError() def send_request(self, message): + if not hasattr(self.server_conn, 'h2'): + raise exceptions.Http2ProtocolException("Zombie Stream") + + while True: + self.server_conn.h2.lock.acquire() + max_streams = self.server_conn.h2.remote_settings.max_concurrent_streams + if self.server_conn.h2.open_outbound_streams + 1 >= max_streams: + # wait until we get a free slot for a new outgoing stream + self.server_conn.h2.lock.release() + time.sleep(0.1) + else: + break + if self.pushed: # nothing to do here + self.server_conn.h2.lock.release() return with self.server_conn.h2.lock: @@ -364,16 +389,19 @@ class Http2SingleStreamLayer(http._HttpTransmissionLayer, threading.Thread): self.server_stream_id = self.server_conn.h2.get_next_available_stream_id() self.server_to_client_stream_ids[self.server_stream_id] = self.client_stream_id - headers = message.headers.copy() - headers.insert(0, ":path", message.path) - headers.insert(0, ":method", message.method) - headers.insert(0, ":scheme", message.scheme) + headers = message.headers.copy() + headers.insert(0, ":path", message.path) + headers.insert(0, ":method", message.method) + headers.insert(0, ":scheme", message.scheme) + self.server_stream_id = self.server_conn.h2.get_next_available_stream_id() + self.server_to_client_stream_ids[self.server_stream_id] = self.client_stream_id + self.server_conn.h2.safe_send_headers( + self.is_zombie, + self.server_stream_id, + headers, + ) + self.server_conn.h2.lock.release() - self.server_conn.h2.safe_send_headers( - self.is_zombie, - self.server_stream_id, - headers - ) self.server_conn.h2.safe_send_body( self.is_zombie, self.server_stream_id, @@ -408,7 +436,7 @@ class Http2SingleStreamLayer(http._HttpTransmissionLayer, threading.Thread): if self.response_data_finished.is_set(): while self.response_data_queue.qsize() > 0: yield self.response_data_queue.get() - return + break if self.zombie: # pragma: no cover raise exceptions.Http2ProtocolException("Zombie Stream") diff --git a/pathod/language/actions.py b/pathod/language/actions.py index 34a9bafb..e85affac 100644 --- a/pathod/language/actions.py +++ b/pathod/language/actions.py @@ -1,12 +1,14 @@ import abc import copy import random +from functools import total_ordering import pyparsing as pp from . import base +@total_ordering class _Action(base.Token): """ @@ -31,8 +33,11 @@ class _Action(base.Token): c.offset = l + 1 return c - def __cmp__(self, other): - return cmp(self.offset, other.offset) + def __lt__(self, other): + return self.offset < other.offset + + def __eq__(self, other): + return self.offset == other.offset def __repr__(self): return self.spec() @@ -65,7 +65,7 @@ setup( "certifi>=2015.11.20.1", # no semver here - this should always be on the last release! "configargparse>=0.10, <0.11", "construct>=2.5.2, <2.6", - "cryptography>=1.3, <1.4", + "cryptography>=1.3, <1.5", "Flask>=0.10.1, <0.11", "h2>=2.3.1, <3", "html2text>=2016.1.8, <=2016.4.2", diff --git a/test/pathod/test_language_actions.py b/test/pathod/test_language_actions.py index 81d2155d..f12d8105 100644 --- a/test/pathod/test_language_actions.py +++ b/test/pathod/test_language_actions.py @@ -1,11 +1,11 @@ -from six.moves import cStringIO as StringIO +from six import BytesIO from pathod.language import actions from pathod import language def parse_request(s): - return language.parse_pathoc(s).next() + return next(language.parse_pathoc(s)) def test_unique_name(): @@ -16,9 +16,9 @@ def test_unique_name(): class TestDisconnects: def test_parse_pathod(self): - a = language.parse_pathod("400:d0").next().actions[0] + a = next(language.parse_pathod("400:d0")).actions[0] assert a.spec() == "d0" - a = language.parse_pathod("400:dr").next().actions[0] + a = next(language.parse_pathod("400:dr")).actions[0] assert a.spec() == "dr" def test_at(self): @@ -42,18 +42,18 @@ class TestDisconnects: class TestInject: def test_parse_pathod(self): - a = language.parse_pathod("400:ir,@100").next().actions[0] + a = next(language.parse_pathod("400:ir,@100")).actions[0] assert a.offset == "r" assert a.value.datatype == "bytes" assert a.value.usize == 100 - a = language.parse_pathod("400:ia,@100").next().actions[0] + a = next(language.parse_pathod("400:ia,@100")).actions[0] assert a.offset == "a" def test_at(self): e = actions.InjectAt.expr() v = e.parseString("i0,'foo'")[0] - assert v.value.val == "foo" + assert v.value.val == b"foo" assert v.offset == 0 assert isinstance(v, actions.InjectAt) @@ -61,8 +61,8 @@ class TestInject: assert v.offset == "r" def test_serve(self): - s = StringIO() - r = language.parse_pathod("400:i0,'foo'").next() + s = BytesIO() + r = next(language.parse_pathod("400:i0,'foo'")) assert language.serve(r, s, {}) def test_spec(self): @@ -96,7 +96,7 @@ class TestPauses: assert v.offset == "a" def test_request(self): - r = language.parse_pathod('400:p10,10').next() + r = next(language.parse_pathod('400:p10,10')) assert r.actions[0].spec() == "p10,10" def test_spec(self): |