Advent Calendar 2020: ソフトウェア無線
注意:LimeSDR などのソフトウェア無線は技術基準適合証明(通称,技適)を受けていないため,これらの機器を日本国内で無線機として利用することは電波法により禁じられています。無線機として使用するためには,実験試験局免許を取得するか電波暗室・シールドボックスなどの設備を使用する必要があります。または,アンテナの代わりにケーブルとアッテネータを用い有線接続をし電波を発しないようにすることで,無線通信ではなく有線通信とはなりますが実験することができます(この場合も電波が漏れないように注意してください)。このページを参照される方は,実験される国や地域の法令などを遵守するようにご注意ください。また,実験等はご自身の責任でお願いします。
Day 20: 無線インターフェイスの定義
今日は双方向通信のために無線インターフェイスを定義して,各関数・メソッドを実装します。
アドレスの指定
双方向通信に際して自分のアドレスと通信先のアドレスを指定するために,argparse
で以下のオプションを指定できるようにします。
parser.add_argument('--my-address', type=int, default='1')
parser.add_argument('--remote-address', type=int, default='2')
--my-address
が自端末のアドレスで,--remote-address
が通信先のアドレスを意味しています。
無線コネクションクラス
自端末と通信先の端末の間でコネクションを生成します。今回は再送や認証等のステートを持たないので,以下のように自端末の無線インターフェイスのインスタンス iface
(後述)と通信先のアドレス target
を引数とするコンストラクタを持つクラスを実装します。
"""
SdrConnection
"""
class SdrConnection:
"""
Constructor
"""
def __init__(self, iface, target):
self.iface = iface
self.target = target
self.seqno = 0
"""
Send data
"""
def send(self, data):
dst = bitstring.BitArray(int=self.target, length=32)
self.seqno += 1
return self.iface.xmit(dst, self.seqno, data)
無線インターフェイスクラス
SDR機器を無線インターフェイスとして扱うために SdrInterface
クラスを実装します。コンストラクタは SoapySDR.Device
インスタンス sdr
と自端末の32ビットのアドレス(整数) address
を引数としてインスタンスを生成します。コンストラクタでは,送受信ストリームを生成します。また,このクラスは,無線インターフェイス上でコネクションを生成するために newConnection()
メソッドを実装します。コネクションを削除するメソッドは今回は省略しました。
これ以外にパケットを送受信するための xmit()
および recv()
メソッドを実装します。xmit()
メソッドは宛先アドレス,シーケンス番号,データからフレームを生成し,SDRから送信します。recv()
メソッドは SoapySDR.Device.readStream()
を呼び出して信号のサンプル列を取り出して,プリアンブル等が正しく検出されれば,フレームを解析し,対応するコネクションとデータを返します。このメソッドは正しいフレームが受信されなかった場合,直ちに結果を返すノンブロッキング(中の SoapySDR.Device.readStream()
はブロッキング)で実装しているので,ビジーループでこのメソッドを呼び出すことを想定しています。
SdrInterface
クラスは以下のように実装しました。
"""
SdrInterface
"""
class SdrInterface:
"""
Constructor
"""
def __init__(self, sdr, address):
self.sdr = sdr
self.address = address
self.connections = {}
# Initialize the tx/rx streams
self.txStream = sdr.setupStream(SoapySDR.SOAPY_SDR_TX, SoapySDR.SOAPY_SDR_CF32, [0])
self.rxStream = sdr.setupStream(SoapySDR.SOAPY_SDR_RX, SoapySDR.SOAPY_SDR_CF32, [0])
# Activate the streams
sdr.activateStream(self.txStream)
sdr.activateStream(self.rxStream)
"""
Deactivate
"""
def deactivate(self):
# Deactivate and close the stream
self.sdr.deactivateStream(self.rxStream)
self.sdr.closeStream(self.rxStream)
return True
"""
Create a new connection
"""
def newConnection(self, target):
if target in self.connections:
# Connection already exists
return False
conn = SdrConnection(self, target)
self.connections[target] = conn
return conn
"""
Transmit a packet
"""
def xmit(self, dst, seqno, data):
# Source address
src = bitstring.BitArray(int=self.iface.address, length=32)
# Postamble
postamble = np.ones(128, dtype=np.complex64)
# Build the datalink layer frame
frame = build_datalink(dst, src, seqno, bitstring.BitArray(data))
# Build the physical layer protocol header
phy = build_phy(frame.size)
# Combine the physical layer header and the data-link frame
symbols = np.concatenate([phy, frame, postamble])
# Get samples from symbols
samples = np.repeat(symbols, SAMPLES_PER_SYMBOL)
mtu = self.sdr.getStreamMTU(self.txStream)
sent = 0
while sent < len(samples):
chunk = samples[sent:sent+mtu]
status = self.sdr.writeStream(self.txStream, [chunk], chunk.size, timeoutUs=1000000)
if status.ret != chunk.size:
sys.stderr.write("Failed to transmit all samples in writeStream(): {}\n".format(status.ret))
return False
sent += status.ret
return True
"""
Receive a packet (blocking)
"""
def recv(self):
# Prepare a receive buffer
rxBuffer = np.zeros(RECEIVE_BUFFER_SIZE, np.complex64)
# Receive samples
status = self.sdr.readStream(self.rxStream, [rxBuffer], rxBuffer.size)
if status.ret != rxBuffer.size:
sys.stderr.write("Failed to receive samples in readStream(): {}\n".format(status.ret))
return False
# Detect the edge offset
edgeOffset = detectEdge(rxBuffer, SAMPLES_PER_SYMBOL)
if not edgeOffset:
# No edge detected
return False
if edgeOffset + 4 >= SAMPLES_PER_SYMBOL:
edgeOffset -= SAMPLES_PER_SYMBOL
# Decode symbols from the center of a set of samples
symbols = rxBuffer[range(edgeOffset + 4, rxBuffer.size, SAMPLES_PER_SYMBOL)]
# Detect the preamble
preamblePosition = detectPreamble(symbols)
if not preamblePosition:
# Preamble not detected
return False
# Receive the samples while the signal is valid
packetSymbols = np.copy(symbols[preamblePosition:])
while True:
status = self.sdr.readStream(self.rxStream, [rxBuffer], rxBuffer.size)
if status.ret != rxBuffer.size:
sys.stderr.write("Failed to receive samples in readStream(): {}\n".format(status.ret))
return False
symbols = rxBuffer[range(edgeOffset + 4, rxBuffer.size, SAMPLES_PER_SYMBOL)]
packetSymbols = np.concatenate([packetSymbols, symbols])
if np.sum(np.abs(symbols) > RECEIVE_SIGNAL_THRESHOLD) != symbols.size:
break
data = demodulate(packetSymbols)
# Parse the protocol data
protocol = MyProtocol()
if not protocol.parse(data):
return False
# Check if the destination is my interface
if self.address != protocol.destination:
return False
# Check if the connection is established
if protocol.source not in self.connections:
return False
conn = self.connections[protocol.source]
return {"conn": conn, "payload": protocol.payload}
無線インターフェイスとコネクションの使い方
以下のようにインターフェイスとコネクションのインスタンスを作成して使用します。
# Initialize an SDR interface
iface = SdrInterface(sdr, args.my_address)
# Initialize a new connection
conn = iface.newConnection(args.remote_address)
while True:
ret = iface.recv()
if ret:
print(ret)
今日のまとめと明日の予定
今日は無線インターフェイスクラスとコネクションクラスを実装しました。明日はこれらを使って2台の間でインタラクティブな通信を実装しようと思います。