import uuid import time import weakref import Queue import threading ## coroutine decorator def coroutine(handler): handler._is_coroutine = True ## ------ ## Actors ## ------ class Actor(object): def __init__(self): self.message_handlers = {} self.uuid = uuid.uuid4() def _find_handler(self, message): return self.message_handlers.get(message.__class__) class Room(Actor): def __init__(self): Actor.__init__(self) self.message_handlers.update( {RoomUserQuery: self.handle_userquery}) self.children = [] def handle_userquery(self, message): if message.mapfunc: message.respond( self, [child for child in self.children if message.mapfunc(child)]) else: message.respond(self, self.children) class Being(Actor): pass class Robot(Being): def __init__(self): Being.__init__(self) self.message_handlers.update( {IsRogue: self.handle_is_rogue}) self.is_rogue = False def handle_is_rogue(self, message): message.respond(self, self.is_rogue) class FlakyRobot(Robot): def __init__(self): Robot.__init__(self) self.message_handlers.update( {FlakeOut: self.handle_flake_out, IsRogue: self.handle_is_rogue}) def handle_flake_out(self, message): result = yield PrintText( self, [GLOBAL_PRINTER], "BZZT! Flakin' out over here!") time.sleep(5) result = yield PrintText( self, [GLOBAL_PRINTER], "Whew! I was freaking out for a sec there") def handle_is_rogue(self, message): if self.is_rogue: result = yield PrintText( self, [GLOBAL_PRINTER], "GAHHH!!!! I've been spotted!!! *gzzt*") time.sleep(3) message.respond(self, self.is_rogue) class Player(Being): def __init__(self): Being.__init__(self) self.message_handlers.update( {RogueScan: self.handle_rogue_scanner}) def handle_rogue_scanner(self, message): # find out about all the robots in the room uquery_response = yield RoomUserQuery( self, [ROOM], lambda x: isinstance(x, Robot)) robots = uquery_response[ROOM] # okay, we know about our robots.. let's find out which ones # of them are rogues rogue_response = yield IsRogue(self, robots) # compile a rogue report! report = ["You ran the scanner on the robots and found:"] for robot, isrogue in rogue_response.iteritems(): if isrogue: report.append( " Robot #%s IS a rogue!" % robot.uuid) else: report.append( " Robot #%s IS NOT rogue." % robot.uuid) print_response = yield PrintText( self, [GLOBAL_PRINTER], '\n'.join(report)) class TextPrinter(Actor): def __init__(self): Actor.__init__(self) self.message_handlers.update( {PrintText: self.handle_print_text}) def handle_print_text(self, message): print message.text_message ## ------ ## Messages ## ------ class Message(object): def __init__(self, source, recipients): """ Args: source: source actor recipients: should always be a *list* of actors (Reserved) Attributes: source: same as arg recipients: same as arg _responses: responses _parent_coroutine: a coroutine that will start up again with information from _responses once all _pending_recipients are gone _pending_recipients: recipients left to process before this process can finish and return to its parent """ self.source = source self.recipients = recipients self._responses = {} self._parent_coroutine = None self._pending_recipients = set(recipients) def respond(self, actor, response): self._responses[actor] = response def remove_recipient_maybe_queue_parent(self, recipient, queue): self._pending_recipients.remove(recipient) if not self._pending_recipients and self._parent_coroutine: queue.put( CoroutineTask(self._responses, self._parent_coroutine)) # some messages class IsRogue(Message): pass class RogueScan(Message): pass class FlakeOut(Message): pass class PrintText(Message): def __init__(self, source, recipients, text_message): Message.__init__(self, source, recipients) self.text_message = text_message class RoomUserQuery(Message): def __init__(self, source, recipients, mapfunc=None): Message.__init__(self, source, recipients) self.mapfunc = mapfunc ## ----- ## Scheduling and etc? ## ----- class Task(object): pass class MessageTask(Task): def __init__(self, message, recipient): self.message = message self.recipient = recipient class CoroutineTask(Task): def __init__(self, responses, coroutine): self.responses = responses self.coroutine = coroutine class MessageProcessor(threading.Thread): def __init__(self, master): threading.Thread.__init__(self) self.master = master def run(self): while True: task = self.master.queue.get() if isinstance(task, MessageTask): self.send_message(task.message, task.recipient) else: self.continue_coroutine( task.responses, task.coroutine) self.master.queue.task_done() def send_message(self, message, recipient): handler = recipient._find_handler(message) if not handler: return coroutine = handler(message) # if we have a coroutine at all, process it if unicode(type(coroutine)) == u"": try: new_message = coroutine.next() new_message._parent_coroutine = coroutine self.master.coroutine_map[coroutine] = (recipient, message) self.master.queue_message(new_message) except StopIteration: # would this ever ever happen? message.remove_recipient_maybe_queue_parent( recipient, self.master.queue) else: # if it's not a coroutine, we can consider ourselves done here message.remove_recipient_maybe_queue_parent( recipient, self.master.queue) def continue_coroutine(self, responses, coroutine): try: message = coroutine.send(responses) message._parent_coroutine = coroutine self.master.queue_message(message) except StopIteration: # yay, this coroutine is done with, suckas. recipient, old_message = self.master.coroutine_map.pop(coroutine) old_message.remove_recipient_maybe_queue_parent( recipient, self.master.queue) class XuddMaster(object): def __init__(self): self.actors = weakref.WeakValueDictionary() self.queue = Queue.Queue() # we need to keep track of the original messages coroutines # were mapped to so we can run # remove_recipient_maybe_queue_parent on them when the coroutine # finishes # # this is a dictionary of # coroutine_map[coroutine] = (recipient, message) # where message is the original message we were blocking on self.coroutine_map = {} self.__processors = [] def add_actor(self, actor): self.actors[actor.uuid] = actor def queue_message(self, message): for recipient in message.recipients: self.queue.put(MessageTask(message, recipient)) def add_processors(self, numprocessors): for i in range(numprocessors): processor = MessageProcessor(self) self.__processors.append(processor) processor.setDaemon(True) processor.start() def join(self): self.queue.join() ## ----- ## Simulation! ## ----- xudd_master = XuddMaster() ROOM = Room() PLAYER = Player() GLOBAL_PRINTER = TextPrinter() ROBOT1 = Robot() ROBOT2 = Robot() ROBOT3 = FlakyRobot() ROBOT4 = FlakyRobot() ROBOT2.is_rogue = True ROBOT4.is_rogue = True ## add our player and NPCs to the room ROOM.children.extend( [PLAYER, ROBOT1, ROBOT2, ROBOT3, ROBOT4]) ## Add all the actors map(xudd_master.add_actor, [ROOM, PLAYER, GLOBAL_PRINTER, ROBOT1, ROBOT2, ROBOT3, ROBOT4]) ## Add the processors.. we'll use 5 xudd_master.add_processors(5) ## Create the initial messages we want to send # only the last two robots should respond to this one xudd_master.queue_message(FlakeOut(None, [ROBOT1, ROBOT2, ROBOT3, ROBOT4])) xudd_master.queue_message(RogueScan(None, [PLAYER])) xudd_master.queue_message(FlakeOut(None, [ROBOT1, ROBOT2, ROBOT3, ROBOT4])) xudd_master.join()