Newer
Older
#!/usr/bin/env python3
'''
=========================================================================================
CS408 Environmental Monitoring Independent of Existing Infrastructure
Copyright (C) 2021 Callum Inglis
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License along
with this program; if not, write to the Free Software Foundation, Inc.,
51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
Contact: Callum.Inglis.2018(at)uni.strath.ac.uk
=========================================================================================
Libraries Used
https://github.com/raspberrypi-tw/lora-sx1276
=========================================================================================
'''
# Usage: python Pi_Receiver.py -f 433 -b BW125 -s 7
import sys
sys.path.insert(0, '../')
from time import sleep, time
import json
import requests # pip3 install requests
from SX127x.LoRa import *
from SX127x.LoRaArgumentParser import LoRaArgumentParser
from SX127x.board_config import BOARD
DEBUG = 1
API_URL = "http://environmental-monitoring.int.4oh4.co/api"
BOARD.setup()
parser = LoRaArgumentParser("Continous LoRa receiver.")
class SensorResponse(object):
def __init__(self, sensorMetadata, data):
self.sensorMetadata = SensorMetadata(**sensorMetadata)
self.sensorReading = SensorReading(**data)
# Python Object to JSON Object
def ToJson(self):
return json.dumps(self, default=lambda o: o.__dict__, sort_keys=True, indent=4)
def sendToApi(self):
headers = {'Content-Type': 'Application/json'}
response = requests.post(API_URL + '/sensor/reading', data=self.ToJson(), headers=headers, verify=False) # Using self signed cert for now, sort later!
if DEBUG > 0:
print(response)
return
class SensorMetadata(object):
def __init__(self, uid, messageID, samplePeriod):
self.samplePeriod = samplePeriod
def __init__(self, ppm, sht, co2):
self.ppm = PPM(**ppm)
self.sht = SHT(**sht)
self.co2 = Co2(**co2)
def __init__(self, p10, p25, p100):
self.p10 = p10
self.p25 = p25
self.p100 = p100
def __init__(self, temperature, humidity):
self.temperature = temperature
self.humidity = humidity
def __init__(self, tmp):
self.tmp = "Coming Soon!"
class Reply():
ackStatus = False
def __init__(self, remoteSensorID, replyMsgID):
self.messageID = random.randrange(10000, 99999)
self.gatewayUid = getserial()
self.remoteSensorID = remoteSensorID
self.replyMsgID = replyMsgID
def setAckStatus(self, ackStatus):
self.ackStatus = ackStatus
def ToJson(self):
return json.dumps(self, default=lambda o: o.__dict__, sort_keys=True, indent=4)
class LoRaReceiver(LoRa):
def __init__(self, verbose=False):
super(LoRaReceiver, self).__init__(verbose)
self._id = "Base-01"
self.set_mode(MODE.SLEEP)
self.set_dio_mapping([0] * 6)
def on_rx_done(self):
print("\n[+] Rx Done")
self.clear_irq_flags(RxDone=1)
payload = self.read_payload(nocheck=True)
# Parse our data here!
data = ''.join([chr(c) for c in payload])
self.reset_ptr_rx()
self.set_mode(MODE.RXCONT)
def on_tx_done(self):
print("\nTxDone")
# set RX
self.set_dio_mapping([0,0,0,0,0,0]) # RX
sleep(1)
self.reset_ptr_rx()
self.set_mode(MODE.RXCONT)
self.clear_irq_flags(RxDone=1)
def on_cad_done(self):
print("\non_CadDone")
print(self.get_irq_flags())
def on_rx_timeout(self):
print("\non_RxTimeout")
print(self.get_irq_flags())
def on_valid_header(self):
print("\non_ValidHeader")
print(self.get_irq_flags())
def on_payload_crc_error(self):
print("\non_PayloadCrcError")
print(self.get_irq_flags())
def on_fhss_change_channel(self):
print("\non_FhssChangeChannel")
print(self.get_irq_flags())
self.reset_ptr_rx()
self.set_mode(MODE.RXCONT)
while True:
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
if (self.get_mode() == MODE.TX):
rssi_value = self.get_rssi_value()
status = self.get_modem_status()
if DEBUG > 1:
sys.stdout.flush()
sys.stdout.write("\r%d %d %d" % (rssi_value, status['rx_ongoing'], status['modem_clear']))
def transmit(self, _payload):
global args
self.tx_counter = 0
self.write_payload(_payload)
self.set_mode(MODE.TX)
# Source: https://www.raspberrypi-spy.co.uk/2012/09/getting-your-raspberry-pi-serial-number-using-python/
def getserial():
# Extract serial from cpuinfo file
cpuserial = "0000000000000000"
try:
f = open('/proc/cpuinfo','r')
for line in f:
if line[0:6]=='Serial':
cpuserial = line[10:26]
f.close()
except:
cpuserial = "ERROR000000000"
return cpuserial
# Parse LoRa response, validate, save / transmit
def handleData(data):
try:
parsed = json.loads(data)
print("[-] Unable to Parse response, ignoring") # TODO Error handling, log increased error rates etc
# TODO Transmit ACK
ackMsg(p)
# TODO Validate response is valid and non-corrupt
# Process response
print("Sensor ID: %s" % (p.sensorMetadata.uid))
print("\tPPM 10: %s" % p.sensorReading.ppm.p10)
#SensorReading = SensorModule(parsed)
#sensorData = sensorData(parsed['sensorData'])
# TODO Send To API
# Upon succesfully receiving a message, send back an ack
def ackMsg(sensorResponse):
data = Reply(sensorResponse.sensorMetadata.uid, sensorResponse.sensorMetadata.messageID)
data.setAckStatus(True)
print(data.ToJson())
_length, _payload = packer.Pack_Str( data.ToJson() )
payload = [int(hex(c), 0) for c in _payload]
loraReceiver.transmit(payload)
return
# Setup Receiver
loraReceiver = LoRaReceiver(verbose=False)
args = parser.parse_args(loraReceiver)
loraReceiver.set_mode(MODE.STDBY)
loraReceiver.set_pa_config(pa_select=1)
#loraReceiver.set_rx_crc(True)
#loraReceiver.set_coding_rate(CODING_RATE.CR4_6)
#loraReceiver.set_pa_config(max_power=0, output_power=0)
#loraReceiver.set_lna_gain(GAIN.G1)
#loraReceiver.set_implicit_header_mode(False)
#loraReceiver.set_low_data_rate_optim(True)
#loraReceiver.set_pa_ramp(PA_RAMP.RAMP_50_us)
#loraReceiver.set_agc_auto_on(True)
# Go Go Go!
print("[+] Receiver & API Gateway")
assert(loraReceiver.get_agc_auto_on() == 1)
except KeyboardInterrupt:
sys.stdout.flush()
print("")
sys.stderr.write("KeyboardInterrupt\n")
finally:
sys.stdout.flush()
print("")
loraReceiver.set_mode(MODE.SLEEP)
BOARD.teardown()