panda's tech note

Advent Calendar 2021:自作メッセージングプロトコル

Day 8: ID管理ネットワークの構築(2)

7日目でネットワークプログラミングのリハビリをしたので、ID管理ネットワークを構築するためのプログラムを実装していきます。

サービス発見(Service Discovery)

P2Pネットワークでまず問題になるのはP2Pネットワークに接続するための情報を取得する Service Discovery です。この部分まで非中央集権的に実装しようとすると大変なので、今回はランデブーポイントをあらかじめ定義し、何らかの方法で(Webでもよい)接続ノードと共有するようにします。

今回はIPアドレスとポート番号の組み合わせのリストを idn/rendezvous.txt ファイルに置いておき、そこに記載されているランデブーポイントからP2Pネットワークに接続するようにします。以下のようなコードでファイル内のピアのIPアドレス・ポート番号のタプルをリストに変換して使うこととします。

with open('idn/rendezvous.txt', 'r') as f:
    peers = []
    for ln in f:
        ln = ln.strip()
        ipaddr, port = ln.split()
        peers.append((ipaddr, port))

TCPサーバへの接続

昨日は telnet でTCPサーバに接続しましたが、ここではPythonから接続する方法をおさらいします。単純に以下のように、TCPソケット(SOCK_STREAM)を作成して、IPアドレス(今回は127.0.0.1)とポート番号(今回は10914)に対して connect() するだけです。C言語では getaddrinfo() により sockaddr 構造体を解決する必要がありますが、Pythonでは connect() にIPアドレスとポート番号のタプルを渡すだけで接続できます。

import socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(('127.0.0.1', 10914))

IPv6は?という質問が飛んできそうですが、今回はIPv4で実装させてください。IPv6化はどこかで考えます。P2PでNAT対応とIPv6対応はどちらも大変なのですが、どこかで考える必要はあるとは思っています。

ピアのリストに対して、接続を試みるコードを以下のように実装します。接続したソケットの処理はここではまだ実装していません。

for p in peers:
    try:
        # Establish a connection to a peer
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.connect((p[0], int(p[1])))
        # Start a peer thread
        # To be implemented
    except:
        # In case of encountering an error
        pass

PeerThreadクラスとPeerクラス

昨日は ClientThread クラスを実装しました。P2Pではクライアントもサーバも同等の働きをするため、 PeerThread クラスとして実装します。それに合わせて、クライアント機能とサーバ機能を合わせた機能を実装する Peer クラスも実装します。

Peer クラスは今日はまだ機能の実装はしないため、単純に他のピアと接続済みのソケット、IPアドレス、ポート番号をインスタンス変数として持つ、以下のようなクラスとして実装します。明日以降、ここに機能(メソッド)を実装していきます。

class Peer():
    def __init__(self, sock, ipaddr, port):
        self.sock = sock
        self.ipaddr = ipaddr
        self.port = port
        pass

PeerThread クラスは昨日実装した ClientThread クラスとほぼ同じです。異なる点はクライアントのソケットを渡していたのを Peer クラスのインスタンスを用いるように変更した点です。run() メソッドは echo back のままにしていますが、明日からはここで読み取ったデータに基づき、Peer クラスに実装するメソッドを呼び出すように実装していく予定です。

class PeerThread(threading.Thread):
    def __init__(self, peer):
        threading.Thread.__init__(self)
        self.peer = peer
        self.running = True
    def run(self):
        while self.running:
            data = self.peer.sock.recv(4096)
            self.peer.sock.send(data)
            if len(data) == 0:
                self.running = False

ピアの管理

メインスレッドでピアやスレッドを管理するため、以下のようなピアを管理するクラスを実装しました。内容としては、昨日の main() 関数内の実装と同じため、説明は割愛します。

class PeerManager():
    threads = {}
    def __init__(self):
        pass
    def add_new_peer(self, peer):
        th = PeerThread(peer)
        self.threads[th.ident] = th
        th.start()
    def clean_threads(self):
        # Join dead threads
        for th in threading.enumerate():
            if th == threading.main_thread():
                continue
            if not th.is_alive():
                del self.threads[th.ident]
                th.join()

実装

上記で説明したTCPエコーサーバーを [github:drpnd/advmsg:idn/agent.py] に実装しました。このプログラムを実行すると、idn/rendezvous.txt ファイルで指定したピアとの接続を試み、TCPポート番号10914番で接続を受け付け、ここに送られたデータをそのままクライアントに送り返すエコーバック機能を実行します。今回は、接続元からは何も送信しないので大丈夫ですが、接続元から何か送信する場合、無限にエコーバックし合うようになってしまうため注意してください。

まとめと明日の予定

今日は昨日実装したTCPサーバを拡張して、P2Pネットワークの実装準備をしました。明日はこのプログラムにP2Pネットワークの機能を追加していきます。