diff options
author | Aldo Cortesi <aldo@nullcube.com> | 2013-02-17 12:42:48 +1300 |
---|---|---|
committer | Aldo Cortesi <aldo@nullcube.com> | 2013-02-17 12:42:48 +1300 |
commit | aaf892e3afc682b2dc2a166a96872420e50092cd (patch) | |
tree | 5de224011071aab6f8fb026e7ac56685b80c1f7a /libmproxy/controller.py | |
parent | 1ccb2c5dea9530682aae83d489f1738d9286fa4e (diff) | |
download | mitmproxy-aaf892e3afc682b2dc2a166a96872420e50092cd.tar.gz mitmproxy-aaf892e3afc682b2dc2a166a96872420e50092cd.tar.bz2 mitmproxy-aaf892e3afc682b2dc2a166a96872420e50092cd.zip |
Significantly refactor the master/slave message passing interface.
Diffstat (limited to 'libmproxy/controller.py')
-rw-r--r-- | libmproxy/controller.py | 85 |
1 files changed, 62 insertions, 23 deletions
diff --git a/libmproxy/controller.py b/libmproxy/controller.py index f38d1edb..c36bb9df 100644 --- a/libmproxy/controller.py +++ b/libmproxy/controller.py @@ -17,37 +17,73 @@ import Queue, threading should_exit = False -class Msg: + +class DummyReply: + """ + A reply object that does nothing. Useful when we need an object to seem + like it has a channel, and during testing. + """ def __init__(self): + self.acked = False + + def __call__(self, msg=False): + self.acked = True + + +class Reply: + """ + Messages sent through a channel are decorated with a "reply" attribute. + This object is used to respond to the message through the return + channel. + """ + def __init__(self, obj): + self.obj = obj self.q = Queue.Queue() self.acked = False - def _ack(self, data=False): + def __call__(self, msg=False): if not self.acked: self.acked = True - if data is None: - self.q.put(data) + if msg is None: + self.q.put(msg) else: - self.q.put(data or self) + self.q.put(msg or self.obj) - def _send(self, masterq): - self.acked = False - try: - masterq.put(self, timeout=3) - while not should_exit: # pragma: no cover - try: - g = self.q.get(timeout=0.5) - except Queue.Empty: - continue - return g - except (Queue.Empty, Queue.Full): # pragma: no cover - return None + +class Channel: + def __init__(self, q): + self.q = q + + def ask(self, m): + """ + Send a message to the master, and wait for a response. + """ + m.reply = Reply(m) + self.q.put(m) + while not should_exit: + try: + # The timeout is here so we can handle a should_exit event. + g = m.reply.q.get(timeout=0.5) + except Queue.Empty: + continue + return g + + def tell(self, m): + """ + Send a message to the master, and keep going. + """ + m.reply = None + self.q.put(m) class Slave(threading.Thread): - def __init__(self, masterq, server): - self.masterq, self.server = masterq, server - self.server.set_mqueue(masterq) + """ + Slaves get a channel end-point through which they can send messages to + the master. + """ + def __init__(self, channel, server): + self.channel, self.server = channel, server + self.server.set_channel(channel) threading.Thread.__init__(self) def run(self): @@ -55,6 +91,9 @@ class Slave(threading.Thread): class Master: + """ + Masters get and respond to messages from slaves. + """ def __init__(self, server): """ server may be None if no server is needed. @@ -81,18 +120,18 @@ class Master: def run(self): global should_exit should_exit = False - self.server.start_slave(Slave, self.masterq) + self.server.start_slave(Slave, Channel(self.masterq)) while not should_exit: self.tick(self.masterq) self.shutdown() - def handle(self, msg): # pragma: no cover + def handle(self, msg): c = "handle_" + msg.__class__.__name__.lower() m = getattr(self, c, None) if m: m(msg) else: - msg._ack() + msg.reply() def shutdown(self): global should_exit |