Advent Calendar 2020: ソフトウェア無線
注意:LimeSDR などのソフトウェア無線は技術基準適合証明(通称,技適)を受けていないため,これらの機器を日本国内で無線機として利用することは電波法により禁じられています。無線機として使用するためには,実験試験局免許を取得するか電波暗室・シールドボックスなどの設備を使用する必要があります。または,アンテナの代わりにケーブルとアッテネータを用い有線接続をし電波を発しないようにすることで,無線通信ではなく有線通信とはなりますが実験することができます(この場合も電波が漏れないように注意してください)。このページを参照される方は,実験される国や地域の法令などを遵守するようにご注意ください。また,実験等はご自身の責任でお願いします。
Day 22: インタラクティブ通信
以下のようにSDR処理ようのスレッドとキーボード入力用のスレッドを用意します。
# Messages
messages = []
"""
Call the main routine
"""
if __name__ == "__main__":
# Parse the arguments
args = parser.parse_args()
# Prepare a semaphore
semaphore = threading.Semaphore(1)
t1 = threading.Thread(target=main, args=(args, semaphore,))
t2 = threading.Thread(target=workerKeyInput, args=(semaphore, ))
t1.start()
t2.start()
キーボード入力スレッドのワーカーは以下の通り実装しました。
"""
Key input worker
"""
def workerKeyInput(semaphore):
while True:
s = sys.stdin.readline()
with semaphore:
global messages
messages.append(s)
SDR処理用のスレッドは以下のように実装しました。
"""
Main routine for SDR
"""
def main(args, semaphore):
# Create an SDR device instance
try:
sdr = SoapySDR.Device(dict(driver="lime"))
except:
sys.stderr.write("Failed to create an SDR device instance.\n")
return False
if not sdr:
sys.stderr.write("Could not find any SDR devices.\n")
return False
# Setup the Tx channel 0
sdr.setSampleRate(SoapySDR.SOAPY_SDR_TX, 0, SAMPLE_RATE)
sdr.setBandwidth(SoapySDR.SOAPY_SDR_TX, 0, args.bandwidth)
sdr.setAntenna(SoapySDR.SOAPY_SDR_TX, 0, args.tx_antenna)
sdr.setGain(SoapySDR.SOAPY_SDR_TX, 0, "PAD", args.tx_gain)
sdr.setFrequency(SoapySDR.SOAPY_SDR_TX, 0, args.rf)
# Setup the Rx channel 0
sdr.setSampleRate(SoapySDR.SOAPY_SDR_RX, 0, SAMPLE_RATE)
sdr.setBandwidth(SoapySDR.SOAPY_SDR_RX, 0, args.bandwidth)
sdr.setAntenna(SoapySDR.SOAPY_SDR_RX, 0, args.rx_antenna)
sdr.setGain(SoapySDR.SOAPY_SDR_RX, 0, "LNA", args.rx_gain)
sdr.setFrequency(SoapySDR.SOAPY_SDR_RX, 0, args.rf)
# Initialize an SDR interface
iface = SdrInterface(sdr, args.my_address)
# Initialize a new connection
conn = iface.newConnection(args.remote_address)
if not conn:
return False
while True:
with semaphore:
global messages
if messages:
msg = messages.pop(0)
msg = msg.strip()
print("Sending: {}".format(msg))
conn.send(bytes(msg, "utf8"))
ret = iface.recv()
if ret:
print("Received: {}".format(ret["payload"]))
return True
これでインタラクティブな送受信が実装できました。
今日のまとめと明日の予定
今日はこれまで実装してきたSDRの処理をマルチスレッド化し,キーボード入力と組み合わせてインタラクティブな送受信を実装しました。明日はこれを使って実験をしようと思います。