Skip to content
Snippets Groups Projects
Pi_Receiver.py 5.3 KiB
Newer Older
#!/usr/bin/env python3

'''
    =========================================================================================
 
        CS408 Environmental Monitoring Independent of Existing Infrastructure
        Copyright (C) 2021 Callum Inglis

        This program is free software; you can redistribute it and/or modify
        it under the terms of the GNU General Public License as published by
        the Free Software Foundation; either version 2 of the License, or
        (at your option) any later version.

        This program is distributed in the hope that it will be useful,
        but WITHOUT ANY WARRANTY; without even the implied warranty of
        MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
        GNU General Public License for more details.

        You should have received a copy of the GNU General Public License along
        with this program; if not, write to the Free Software Foundation, Inc.,
        51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.

        Contact: Callum.Inglis.2018(at)uni.strath.ac.uk

    =========================================================================================

    Libraries Used
        https://github.com/raspberrypi-tw/lora-sx1276

    =========================================================================================
'''

# Usage: python Pi_Receiver.py -f 433 -b BW125 -s 7

import sys 
from time import sleep
sys.path.insert(0, '../')     
from SX127x.LoRa import *
from SX127x.LoRaArgumentParser import LoRaArgumentParser
from SX127x.board_config import BOARD

BOARD.setup()

parser = LoRaArgumentParser("Continous LoRa receiver.")

class SensorReading(object):
    def __init__(self, sensor, data):
        self.sensor = SensorModule(**sensor)
        self.readings = SensorData(**data)

class SensorModule(object):
    def __init__(self, ID, samplePeriod):
        self.sensorID = ID
        self.samplePeriod = samplePeriod

class SensorData(object):
    def __init__(self, ppm, sht, co2):
        self.ppm = SensorPPM(**ppm)
        self.sht = SensorSHT(**sht)
        self.co2 = SensorCo2(**co2)

class SensorPPM(object):
    def __init__(self, p10, p25, p100):
        self.p10 = p10
        self.p25 = p25
        self.p100 = p100

class SensorSHT(object):
    def __init__(self, temperature, humidity):
        self.temperature = temperature
        self.humidity = humidity

class SensorCo2(object):
    def __init__(self, tmp):
        self.tmp = "Coming Soon!"
    
class LoRaRcvCont(LoRa):
    def __init__(self, verbose=False):
        super(LoRaRcvCont, self).__init__(verbose)
        self.set_mode(MODE.SLEEP)
        self.set_dio_mapping([0] * 6)

    def on_rx_done(self):
        if DEBUG:
            print("\n[+] Rx Done")

        self.clear_irq_flags(RxDone=1)
        payload = self.read_payload(nocheck=True)
        data = ''.join([chr(c) for c in payload])
        #print(bytes(payload).decode())
        self.set_mode(MODE.SLEEP)
        self.reset_ptr_rx()
        self.set_mode(MODE.RXCONT)

    def on_tx_done(self):
        print("\nTxDone")
        print(self.get_irq_flags())

    def on_cad_done(self):
        print("\non_CadDone")
        print(self.get_irq_flags())

    def on_rx_timeout(self):
        print("\non_RxTimeout")
        print(self.get_irq_flags())

    def on_valid_header(self):
        print("\non_ValidHeader")
        print(self.get_irq_flags())

    def on_payload_crc_error(self):
        print("\non_PayloadCrcError")
        print(self.get_irq_flags())

    def on_fhss_change_channel(self):
        print("\non_FhssChangeChannel")
        print(self.get_irq_flags())

    def start(self):
        self.reset_ptr_rx()
        self.set_mode(MODE.RXCONT)
        while True:
            sleep(.5)
            rssi_value = self.get_rssi_value()
            status = self.get_modem_status()

            if DEBUG:
                sys.stdout.flush()
                sys.stdout.write("\r%d %d %d" % (rssi_value, status['rx_ongoing'], status['modem_clear']))

# Parse LoRa response, validate, save / transmit
def handleData(data):
    try:
        parsed = json.loads(data)
        p = SensorReading(**parsed)
    except:
        print("[-] Unable to Parse response, ignoring") # TODO Error handling, log increased error rates etc
        return

    # TODO Validate response is valid and non-corrupt

    # Process response
    print("Sensor ID: %s" % (p.sensor.sensorID))
    print("\tPPM 10: %s" % p.readings.ppm.p10)
    #SensorReading = SensorModule(parsed)
    #sensorData = sensorData(parsed['sensorData'])

    # TODO Send To API
    

lora = LoRaRcvCont(verbose=False)
args = parser.parse_args(lora)

lora.set_mode(MODE.STDBY)
lora.set_pa_config(pa_select=1)
#lora.set_rx_crc(True)
#lora.set_coding_rate(CODING_RATE.CR4_6)
#lora.set_pa_config(max_power=0, output_power=0)
#lora.set_lna_gain(GAIN.G1)
#lora.set_implicit_header_mode(False)
#lora.set_low_data_rate_optim(True)
#lora.set_pa_ramp(PA_RAMP.RAMP_50_us)
#lora.set_agc_auto_on(True)

print("[+] Receiver & API Gateway")
assert(lora.get_agc_auto_on() == 1)

try:
    lora.start()
except KeyboardInterrupt:
    sys.stdout.flush()
    print("")
    sys.stderr.write("KeyboardInterrupt\n")
finally:
    sys.stdout.flush()
    print("")
    lora.set_mode(MODE.SLEEP)
    print(lora)
    BOARD.teardown()