Monday, August 17, 2009

How to Pass an Opened Socket (or File Descriptor) to a Forked Process in Python/Windows

Client Side

The client side program creates 5 threads, and each thread communicates with the server.

import socket, threading
import ctypes

NUMBER_OF_CLIENT_THREADS = 5
HOST = 'localhost'
PORT = 7777
BUF_SIZE = 100

class Client(threading.Thread):

def run(self):

buf = ctypes.create_string_buffer(BUF_SIZE)

# open a conn. to the server
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect((HOST, PORT))

# send your message and recive a response
client.send('Hello, I am {0}'.format(self.getName()))
buf.value = '\0' * BUF_SIZE
data = client.recvfrom_into(buf, BUF_SIZE)
print '\tServer:', buf.value

# say your last words
client.send('Goodbye Server!')
buf.value = '\0' * BUF_SIZE
data = client.recvfrom_into(buf, BUF_SIZE)
print '\tServer:', buf.value

# exit
client.close()


def main():

thread_list = []

for i in xrange(NUMBER_OF_CLIENT_THREADS):
thread_list.append( Client() )
thread_list[i].setName("thread-"+repr(i))
thread_list[i].start()

for thr in thread_list:
thr.join()

if __name__ == "__main__":
main()

Server Side

Server forks a new child process for each connection, and child processes communicate with the client threads.
import socket, signal, os, multiprocessing, time
from ctypes import *

#
# Structures needed to duplicate a socket:
#
kernel32 = windll.kernel32
Ws2_32 = windll.Ws2_32
msvc = cdll.msvcrt

class GUID(Structure):
_fields_ = [("Data1", c_ulong),
("Data2", c_ushort),
("Data3", c_ushort),
("Data4", c_char * 8)]

class WSAPROTOCOLCHAIN(Structure):
_fields_ = [("ChainLen", c_int),
("ChainEntries", c_ulong * 7)]

class WSAPROTOCOL_INFO(Structure):
_fields_ = [("dwServiceFlags1",c_ulong),
("dwServiceFlags2",c_ulong),
("dwServiceFlags3",c_ulong),
("dwServiceFlags4",c_ulong),
("dwProviderFlags",c_ulong),
("ProviderId",GUID),
("dwCatalogEntryId",c_ulong),
("ProtocolChain", WSAPROTOCOLCHAIN),
("iVersion",c_int),
("iAddressFamily",c_int),
("iMaxSockAddr",c_int),
("iMinSockAddr",c_int),
("iSocketType",c_int),
("iProtocol",c_int),
("iProtocolMaxOffset",c_int),
("iNetworkByteOrder",c_int),
("iSecurityScheme",c_int),
("dwMessageSize",c_ulong),
("dwProviderReserved",c_ulong),
("szProtocol",c_char* 256)]

BUF_SIZE = 100

def WorkerProcess(conn_child):

buf = create_string_buffer(BUF_SIZE)

# communicate with the parent process to duplicate the opened socket with client
pid = os.getpid()
conn_child.send(pid)

wsock_prot_info = conn_child.recv()
# (AF_INET, SOCK_STREAM, IPPROTO_TCP, x, group, flag)
sock_no = Ws2_32.WSASocketA(2, 1, 6, byref(wsock_prot_info), 0, 0)

if(sock_no == c_uint(~0)):
print "Error in duplicating the socket"
conn_child.send(1)
exit(1)

conn_child.send(0) # everthing has gone OK!

# Now we have obtained the socket, go on to communicate with the client
while 1:

buf.value = '\0' * len(buf)
ret = Ws2_32.recv(sock_no, buf, len(buf), 0)
if ret == 0 or ret == -1:
break
print 'Client:' + buf.value

buf.value = '\0' * len(buf)
buf.value = "OK"
ret = Ws2_32.send(sock_no, buf, len(buf), 0)
if ret == -1: # SOCKET_ERROR
print "Error in sending data"
break

msvc._close(sock_no)

print "Worker Process Exiting..."
exit(1)

def main():

HOST = ''
PORT = 7777
wsock_prot_info = WSAPROTOCOL_INFO()

# 1. Initialize ZFS
print "Server is starting..."

# 2. Set up the server:
server = socket.socket( socket.AF_INET, socket.SOCK_STREAM )
server.bind ( (HOST, PORT) )
server.listen ( 5 )

# 3. Listen for client connections:
while 1:

conn, addr = server.accept()
print 'New connection with', addr

#
# WE WANT TO PASS THE OPENED SOCKET TO THE NEWLY FORKED PROCESS:
#

# 1. open a pipe to realize comm. bw parent and the forked process
conn_parent, conn_child = multiprocessing.Pipe(True)

# 2. fork a process and hand the one side of the pipe to the child
new_worker = multiprocessing.Process(target=WorkerProcess, args=(conn_child,))
new_worker.start()

# 3. Child sends back its process id
cpid = conn_parent.recv()
print "New Child Process with pid {0} to handle the new client".format(cpid)

# 4. duplicate the opened socket and obtain prorocol info structure
if Ws2_32.WSADuplicateSocketA(conn.fileno(), c_ulong(cpid), byref(wsock_prot_info)) != 0:
print "Error in duplicating the socket"

# 5. Send this structure to the child, it'll use this info to create its own handle
conn_parent.send(wsock_prot_info)

# 6. Get the confirmation from the child
ret = conn_parent.recv()
if(ret!=0):
print "Error in handing in the socket to the child process"

# 7. Close connection with the client (child will handle that) and the pipe.
conn_parent.close()
conn.close()

if __name__ == "__main__":
main()