panda's tech note

Advent Calendar 2021:自作メッセージングプロトコル

Day 7: ID管理ネットワークの構築(1)

2日目で設計したとおり、ID管理ネットワークは非中央集権的なP2Pネットワークを採用します。まずは、ネットワークプログラミングの基礎的なところからやっていきます。

TCPサーバの実装

Pythonでソケットプログラミングをするのが数年ぶりなので、リハビリがてら、まずはエコーバックする簡単なTCPサーバを実装します。

以下のコードでは、0.0.0.0 (すべてのアドレス) のポート番号10914番で接続を待ち受け、接続を確立後は接続相手からのデータを受信し、それをそのまま接続相手に送り返します。デフォルトのソケット設定ではブロッキングI/Oであるため、受信データは1バイト以上となりますが、相手が接続を切断した場合などには受信データのサイズが0になるため、この場合にはデータ処理を終了し、TCPサーバのソケットも閉じます。

import socket

bind_address = "0.0.0.0"
bind_port = 10914

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind((bind_address, bind_port))
sock.listen(1)
running = True
csock, caddr = sock.accept()
while running:
    data = csock.recv(4096)
    csock.send(data)
    if len(data) == 0:
        running = False

sock.close()

マルチスレッドによる複数クライアントからの同時接続

上記のコードでは、クライアント1台のみが接続可能であり、その処理の終了とともにプログラムを終了させていました。これではサーバ機能として役に立たないため、マルチスレッド機能を使い、複数クライアントからの同時接続を実装します。

まずは、クライアント接続を処理するためのスレッドクラスを threading.Thread を継承して以下の通り実装します。内容は単純で、コンストラクタ __init__() でスレッドの初期化とクライアントソケットをインスタンスにセットし、上記の1クライアントの処理を run() メソッドに実装しています。

import threading
class ClientThread(threading.Thread):
    def __init__(self, csock):
        threading.Thread.__init__(self)
        self.csock = csock
        self.running = True
    def run(self):
        while self.running:
            data = self.csock.recv(4096)
            self.csock.send(data)
            if len(data) == 0:
                self.running = False

このスレッドクラスを以下のように sock.accept() でクライアント接続を受け付けた後に初期化し、スレッド処理を開始します。それにより、クライアントからのデータを受信する recv() メソッドがブロッキングI/Oですが、その間も以下の while 内の処理を継続することができます。これにより、複数クライアントからの同時接続を処理できるようになります。

# Open a socket and listen connections on it
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind((bind_address, bind_port))
sock.listen(10)
while True:
    # Accept a client and start a thread
    csock, caddr = sock.accept()
    th = ClientThread(csock)
    th.start()

    # Join dead threads
    for th in threading.enumerate():
        if th == threading.main_thread():
            continue
        if not th.is_alive():
            th.join()
# Close the socket
sock.close()

ただし、このままだと作成したスレッドがいつまでもプログラム内でリソースを食い続けるので、 Alive でないスレッドは join() によりリソースの解放処理を行います。

Pythonの threading モジュールでは、作成したスレッドリストを threading.enumerate() で取得できます。ただし、このスレッドリストにはメインスレッドも含まれていることに注意が必要です(メインスレッドは Alive 状態なので上記のコードでは問題ないと思います)。上記のコードでは、このスレッドリストで Alive でないスレッドを join() しています。

実装

上記で説明したマルチスレッドによる複数クライアントからの同時接続に対応したTCPエコーサーバーを [github:drpnd/advmsg:idn/agent.py] に実装しました。このプログラムを実行すると、TCPポート番号10914番で接続を受け付け、ここに送られたデータをそのままクライアントに送り返します。

telnet で以下のように echo back を確認しました。少しわかりにくいですが、1行目の this is a test message.\n が入力で、その下の行がサーバから受信した echo back です。

$ telnet -4 localhost 10914
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
this is a test message.
this is a test message.
^]
telnet> ^D
Connection closed.

まとめと明日の予定

リハビリがてら、PythonでTCPサーバの実装をしました。明日はこのプログラムを元にP2Pネットワークの構築をしていきます。