panda's tech note

Advent Calendar 2020: ソフトウェア無線

注意LimeSDR などのソフトウェア無線は技術基準適合証明(通称,技適)を受けていないため,これらの機器を日本国内で無線機として利用することは電波法により禁じられています。無線機として使用するためには,実験試験局免許を取得するか電波暗室・シールドボックスなどの設備を使用する必要があります。または,アンテナの代わりにケーブルとアッテネータを用い有線接続をし電波を発しないようにすることで,無線通信ではなく有線通信とはなりますが実験することができます(この場合も電波が漏れないように注意してください)。このページを参照される方は,実験される国や地域の法令などを遵守するようにご注意ください。また,実験等はご自身の責任でお願いします。

Day 20: 無線インターフェイスの定義

今日は双方向通信のために無線インターフェイスを定義して,各関数・メソッドを実装します。

アドレスの指定

双方向通信に際して自分のアドレスと通信先のアドレスを指定するために,argparse で以下のオプションを指定できるようにします。

parser.add_argument('--my-address', type=int, default='1')
parser.add_argument('--remote-address', type=int, default='2')

--my-address が自端末のアドレスで,--remote-address が通信先のアドレスを意味しています。

無線コネクションクラス

自端末と通信先の端末の間でコネクションを生成します。今回は再送や認証等のステートを持たないので,以下のように自端末の無線インターフェイスのインスタンス iface(後述)と通信先のアドレス target を引数とするコンストラクタを持つクラスを実装します。

"""
SdrConnection
"""
class SdrConnection:
    """
    Constructor
    """
    def __init__(self, iface, target):
        self.iface = iface
        self.target = target
        self.seqno = 0

    """
    Send data
    """
    def send(self, data):
        dst = bitstring.BitArray(int=self.target, length=32)
        self.seqno += 1
        return self.iface.xmit(dst, self.seqno, data)

無線インターフェイスクラス

SDR機器を無線インターフェイスとして扱うために SdrInterface クラスを実装します。コンストラクタは SoapySDR.Device インスタンス sdr と自端末の32ビットのアドレス(整数) address を引数としてインスタンスを生成します。コンストラクタでは,送受信ストリームを生成します。また,このクラスは,無線インターフェイス上でコネクションを生成するために newConnection() メソッドを実装します。コネクションを削除するメソッドは今回は省略しました。

これ以外にパケットを送受信するための xmit() および recv() メソッドを実装します。xmit() メソッドは宛先アドレス,シーケンス番号,データからフレームを生成し,SDRから送信します。recv() メソッドは SoapySDR.Device.readStream() を呼び出して信号のサンプル列を取り出して,プリアンブル等が正しく検出されれば,フレームを解析し,対応するコネクションとデータを返します。このメソッドは正しいフレームが受信されなかった場合,直ちに結果を返すノンブロッキング(中の SoapySDR.Device.readStream() はブロッキング)で実装しているので,ビジーループでこのメソッドを呼び出すことを想定しています。

SdrInterface クラスは以下のように実装しました。

"""
SdrInterface
"""
class SdrInterface:
    """
    Constructor
    """
    def __init__(self, sdr, address):
        self.sdr = sdr
        self.address = address
        self.connections = {}
        # Initialize the tx/rx streams
        self.txStream = sdr.setupStream(SoapySDR.SOAPY_SDR_TX, SoapySDR.SOAPY_SDR_CF32, [0])
        self.rxStream = sdr.setupStream(SoapySDR.SOAPY_SDR_RX, SoapySDR.SOAPY_SDR_CF32, [0])
        # Activate the streams
        sdr.activateStream(self.txStream)
        sdr.activateStream(self.rxStream)

    """
    Deactivate
    """
    def deactivate(self):
        # Deactivate and close the stream
        self.sdr.deactivateStream(self.rxStream)
        self.sdr.closeStream(self.rxStream)
        return True

    """
    Create a new connection
    """
    def newConnection(self, target):
        if target in self.connections:
            # Connection already exists
            return False
        conn = SdrConnection(self, target)
        self.connections[target] = conn
        return conn

    """
    Transmit a packet
    """
    def xmit(self, dst, seqno, data):
        # Source address
        src = bitstring.BitArray(int=self.iface.address, length=32)
        # Postamble
        postamble = np.ones(128, dtype=np.complex64)
        # Build the datalink layer frame
        frame = build_datalink(dst, src, seqno, bitstring.BitArray(data))
        # Build the physical layer protocol header
        phy = build_phy(frame.size)
        # Combine the physical layer header and the data-link frame
        symbols = np.concatenate([phy, frame, postamble])

        # Get samples from symbols
        samples = np.repeat(symbols, SAMPLES_PER_SYMBOL)

        mtu = self.sdr.getStreamMTU(self.txStream)
        sent = 0
        while sent < len(samples):
            chunk = samples[sent:sent+mtu]
            status = self.sdr.writeStream(self.txStream, [chunk], chunk.size, timeoutUs=1000000)
            if status.ret != chunk.size:
                sys.stderr.write("Failed to transmit all samples in writeStream(): {}\n".format(status.ret))
                return False
            sent += status.ret

        return True

    """
    Receive a packet (blocking)
    """
    def recv(self):
        # Prepare a receive buffer
        rxBuffer = np.zeros(RECEIVE_BUFFER_SIZE, np.complex64)
        # Receive samples
        status = self.sdr.readStream(self.rxStream, [rxBuffer], rxBuffer.size)
        if status.ret != rxBuffer.size:
            sys.stderr.write("Failed to receive samples in readStream(): {}\n".format(status.ret))
            return False
        # Detect the edge offset
        edgeOffset = detectEdge(rxBuffer, SAMPLES_PER_SYMBOL)
        if not edgeOffset:
            # No edge detected
            return False
        if edgeOffset + 4 >= SAMPLES_PER_SYMBOL:
            edgeOffset -= SAMPLES_PER_SYMBOL
        # Decode symbols from the center of a set of samples
        symbols = rxBuffer[range(edgeOffset + 4, rxBuffer.size, SAMPLES_PER_SYMBOL)]
        # Detect the preamble
        preamblePosition = detectPreamble(symbols)
        if not preamblePosition:
            # Preamble not detected
            return False
        # Receive the samples while the signal is valid
        packetSymbols = np.copy(symbols[preamblePosition:])
        while True:
            status = self.sdr.readStream(self.rxStream, [rxBuffer], rxBuffer.size)
            if status.ret != rxBuffer.size:
                sys.stderr.write("Failed to receive samples in readStream(): {}\n".format(status.ret))
                return False
            symbols = rxBuffer[range(edgeOffset + 4, rxBuffer.size, SAMPLES_PER_SYMBOL)]
            packetSymbols = np.concatenate([packetSymbols, symbols])
            if np.sum(np.abs(symbols) > RECEIVE_SIGNAL_THRESHOLD) != symbols.size:
                break
        data = demodulate(packetSymbols)
        # Parse the protocol data
        protocol = MyProtocol()
        if not protocol.parse(data):
            return False
        # Check if the destination is my interface
        if self.address != protocol.destination:
            return False
        # Check if the connection is established
        if protocol.source not in self.connections:
            return False
        conn = self.connections[protocol.source]
        return {"conn": conn, "payload": protocol.payload}

無線インターフェイスとコネクションの使い方

以下のようにインターフェイスとコネクションのインスタンスを作成して使用します。

# Initialize an SDR interface
iface = SdrInterface(sdr, args.my_address)

# Initialize a new connection
conn = iface.newConnection(args.remote_address)
while True:
    ret = iface.recv()
    if ret:
        print(ret)

今日のまとめと明日の予定

今日は無線インターフェイスクラスとコネクションクラスを実装しました。明日はこれらを使って2台の間でインタラクティブな通信を実装しようと思います。