Advent Calendar 2021:自作メッセージングプロトコル
Day 7: ID管理ネットワークの構築(1)
2日目で設計したとおり、ID管理ネットワークは非中央集権的なP2Pネットワークを採用します。まずは、ネットワークプログラミングの基礎的なところからやっていきます。
TCPサーバの実装
Pythonでソケットプログラミングをするのが数年ぶりなので、リハビリがてら、まずはエコーバックする簡単なTCPサーバを実装します。
以下のコードでは、0.0.0.0
(すべてのアドレス) のポート番号10914番で接続を待ち受け、接続を確立後は接続相手からのデータを受信し、それをそのまま接続相手に送り返します。デフォルトのソケット設定ではブロッキングI/Oであるため、受信データは1バイト以上となりますが、相手が接続を切断した場合などには受信データのサイズが0になるため、この場合にはデータ処理を終了し、TCPサーバのソケットも閉じます。
import socket
bind_address = "0.0.0.0"
bind_port = 10914
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind((bind_address, bind_port))
sock.listen(1)
running = True
csock, caddr = sock.accept()
while running:
data = csock.recv(4096)
csock.send(data)
if len(data) == 0:
running = False
sock.close()
マルチスレッドによる複数クライアントからの同時接続
上記のコードでは、クライアント1台のみが接続可能であり、その処理の終了とともにプログラムを終了させていました。これではサーバ機能として役に立たないため、マルチスレッド機能を使い、複数クライアントからの同時接続を実装します。
まずは、クライアント接続を処理するためのスレッドクラスを threading.Thread
を継承して以下の通り実装します。内容は単純で、コンストラクタ __init__()
でスレッドの初期化とクライアントソケットをインスタンスにセットし、上記の1クライアントの処理を run()
メソッドに実装しています。
import threading
class ClientThread(threading.Thread):
def __init__(self, csock):
threading.Thread.__init__(self)
self.csock = csock
self.running = True
def run(self):
while self.running:
data = self.csock.recv(4096)
self.csock.send(data)
if len(data) == 0:
self.running = False
このスレッドクラスを以下のように sock.accept()
でクライアント接続を受け付けた後に初期化し、スレッド処理を開始します。それにより、クライアントからのデータを受信する recv()
メソッドがブロッキングI/Oですが、その間も以下の while
内の処理を継続することができます。これにより、複数クライアントからの同時接続を処理できるようになります。
# Open a socket and listen connections on it
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind((bind_address, bind_port))
sock.listen(10)
while True:
# Accept a client and start a thread
csock, caddr = sock.accept()
th = ClientThread(csock)
th.start()
# Join dead threads
for th in threading.enumerate():
if th == threading.main_thread():
continue
if not th.is_alive():
th.join()
# Close the socket
sock.close()
ただし、このままだと作成したスレッドがいつまでもプログラム内でリソースを食い続けるので、 Alive
でないスレッドは join()
によりリソースの解放処理を行います。
Pythonの threading
モジュールでは、作成したスレッドリストを threading.enumerate()
で取得できます。ただし、このスレッドリストにはメインスレッドも含まれていることに注意が必要です(メインスレッドは Alive
状態なので上記のコードでは問題ないと思います)。上記のコードでは、このスレッドリストで Alive
でないスレッドを join()
しています。
実装
上記で説明したマルチスレッドによる複数クライアントからの同時接続に対応したTCPエコーサーバーを [github:drpnd/advmsg:idn/agent.py] に実装しました。このプログラムを実行すると、TCPポート番号10914番で接続を受け付け、ここに送られたデータをそのままクライアントに送り返します。
telnet
で以下のように echo back を確認しました。少しわかりにくいですが、1行目の this is a test message.\n
が入力で、その下の行がサーバから受信した echo back です。
$ telnet -4 localhost 10914
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
this is a test message.
this is a test message.
^]
telnet> ^D
Connection closed.
まとめと明日の予定
リハビリがてら、PythonでTCPサーバの実装をしました。明日はこのプログラムを元にP2Pネットワークの構築をしていきます。