Skip to content
Snippets Groups Projects
Verified Commit 5cead1b2 authored by William Waites's avatar William Waites
Browse files

initial implementation of imitation game

parent a76e6c64
No related branches found
No related tags found
No related merge requests found
import socketserver
import argparse
import logging
import hashlib
import time
import random
from queue import Queue
import threading
class Interlocutor(object):
queue = Queue()
def __init__(self, parent, sid):
self.request = parent.request
self.rfile = parent.rfile
self.wfile = parent.wfile
self.sid = sid
self.log = logging.getLogger(str(self))
self.n = 2
self.qs = Queue()
self.rs = Queue()
self.serv_colour = 95
self.peer_colour = 93
def ask(self, question):
self.qs.put(question)
return self.rs.get()
def read(self):
while True:
line = self.rfile.readline().strip().decode("utf-8")
if len(line) > 0:
return line
def write(self, *strings, colour=None):
if colour:
strings = [f"\033[31;{colour}m"] + list(strings) + ["\033[0m"]
for s in strings:
self.wfile.write(s.encode("utf-8"))
class Interrogator(Interlocutor):
def __str__(self):
return f"I({self.sid})"
def connect(self, peer):
self.peer = peer
peer.connect(self)
self.log.info(f"connected to {self.peer}")
def handle(self):
self.log.info("connecting...")
self.write(f"""
Welcome to the Imitation Game. Your role is "interrogator". You
get to ask {self.n} questions of your interlocutor to determine if
they are a human or a machine. At the end of the session you
will be asked which you think they are and why. Good luck!
Please wait to be connected to an interlocutor...""", colour=self.serv_colour)
interlocutor = self.queue.get()
self.connect(interlocutor)
self.write(f" connected.\n\nYou may begin. Please ask a question.\n\n", colour=self.serv_colour)
for i in range(self.n):
question = self.read()
self.log.info(f"Q{i}: {question}")
response = self.peer.ask(question)
self.write("\n", response, "\n\n", colour=self.peer_colour)
self.write(f"""
Thank you. Based on this interaction, do you believe that your
interlocutor is a human? Please answer Yes or No.\n\n""", colour=self.serv_colour)
judgement = self.read()
self.log.info(f"Is a human? {judgement}")
self.write(f"""
Why do you believe this?\n\n""", colour=self.serv_colour)
reason = self.read()
self.log.info(f"Why? {reason}")
self.write(f"""
Thank you. Goodbye.
""", colour=self.serv_colour)
class Human(Interlocutor):
def __init__(self, *av, **kw):
super(Human, self).__init__(*av, **kw)
self.barrier = threading.Barrier(2)
def __str__(self):
return f"H({self.sid})"
def connect(self, peer):
self.barrier.wait()
self.peer = peer
self.log.info(f"connected to {self.peer}")
def handle(self):
self.log.info("connecting...")
self.write(f"""
Welcome to the Imitation Game. Your role is HUMAN. You will be
asked {self.n} questions by an interlocutor who is attempting
to find out if you are a human or a machine. Your task is to
convince them that you are human. Good luck!
Please wait to be connected to an interlocutor...""", colour=self.serv_colour)
self.queue.put(self)
self.barrier.wait()
self.write(f"connected.\n\nIt begins. Please wait for a question.\n", colour=self.serv_colour)
for i in range(self.n):
question = self.qs.get()
self.write("\n", question, "\n\n", colour=self.peer_colour)
response = self.read()
self.log.info(f"R{i}: {response}")
self.rs.put(response)
self.write("""
That is all. Thank you for playing the Imitation Game.
""", colour=self.serv_colour)
class Handler(socketserver.StreamRequestHandler):
"""
The request handler class for our server.
It is instantiated once per connection to the server, and must
override the handle() method to implement communication to the
client.
"""
def handle(self):
raddr = self.request.getpeername()
sid = hashlib.sha3_224("{}:{}:{}".format(time.time(), raddr[0], raddr[1]).encode("utf-8")).hexdigest()[:8]
log = logging.getLogger(sid)
role = random.choice([Interrogator, Human])
h = role(self, sid)
h.handle()
class ThreadedTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
pass
def cli():
parser = argparse.ArgumentParser("imitation_server")
parser.add_argument("-b", "--bind", default="0.0.0.0", help="hostname to bind (default localhost)")
parser.add_argument("-p", "--port", default=1950, type=int, help="port to bind (default 1950)")
args = parser.parse_args()
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(name)s %(levelname)s: %(message)s')
random.seed(time.time())
with ThreadedTCPServer((args.bind, args.port), Handler) as server:
server.serve_forever()
#!/usr/bin/env python
from setuptools import setup, find_packages
setup(name='imitation',
version='0.1',
description='Turing Test',
author=['William Waites'],
author_email='william.waites@strath.ac.uk',
keywords=['AI'],
classifiers=[
# How mature is this project? Common values are
# 3 - Alpha
# 4 - Beta
# 5 - Production/Stable
'Development Status :: 3 - Alpha',
# Intended audience
'Intended Audience :: Education',
'Intended Audience :: Science/Research',
# License
#'License :: OSI Approved :: GNU General Public License (GPL)',
# Specify the Python versions you support here. In particular,
# ensure that you indicate whether you support Python 2, Python 3
# or both.
'Programming Language :: Python :: 3',
],
license='GPLv3',
packages=find_packages(),
install_requires=[
],
entry_points={
'console_scripts': [
'imitation = imitation.client:cli',
'imitation_server = imitation.server:cli',
],
},
)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment