Skip to content
Snippets Groups Projects
Commit 2b29648e authored by Callum Inglis's avatar Callum Inglis
Browse files

Implement RabbitMQ broker.

Tweak timings in Sensor Transmission.
parent 1c52398f
No related branches found
No related tags found
2 merge requests!17Implement RabbitMQ broker; API Modifications following breaking changes;,!12CO2 Sensor, Refactoring, CAD Files, Update Config from API
......@@ -89,7 +89,7 @@ boolean clearToSend(int listenDuration) {
* @return RX Payload in JSON, null on timeout reached or error
*/
StaticJsonDocument<1024> listenForAndConsumeMessage(int listenDuration) {
int listenUntil = now() + listenDuration;
int listenUntil = now() + (listenDuration * 2);
StaticJsonDocument<1024> json;
// Listen until timeout expires
......@@ -139,7 +139,7 @@ StaticJsonDocument<1024> listenForAndConsumeMessage(int listenDuration) {
boolean listenForTxPayloadAccept(int listenDuration, int messageID) {
if (DEBUG) { Serial.println("[.] Transmit - Payload - Listening for Ack"); }
StaticJsonDocument<1024> json = listenForAndConsumeMessage(listenDuration);
StaticJsonDocument<1024> json = listenForAndConsumeMessage(listenDuration * 4);
// Timeout, likely TX Payload was not recieved or ack got lost.
if (json.isNull()) {
......@@ -306,4 +306,4 @@ boolean transmitData(DynamicJsonDocument payload) {
if (DEBUG) { Serial.println("Packet Sent Succesfully\n---------------+++++-------------------"); }
return true;
}
\ No newline at end of file
}
......@@ -35,6 +35,7 @@
"""
import sys
from time import sleep, time
import datetime
import json
import requests # pip3 install requests
from SX127x.LoRa import *
......@@ -42,13 +43,24 @@ from SX127x.LoRaArgumentParser import LoRaArgumentParser
from SX127x.board_config import BOARD
import SX127x.packer as packer
import secrets
from multiprocessing import Process
import logging
import pika # pip3 install pika
sys.path.insert(0, '../')
# SYSTEM CONFIG
DEBUG = 2
API_URL = "https://emiei.4oh4.co/api"
API_URL = "https://" + secrets.API_HTTP_BASIC_AUTH + "emiei.4oh4.co/api"
MAX_TX_RESERVATION_TIME = 2 # Seconds
LOG_FILE = "Pi_Receiver.log"
# RabbitMQ Config
MQ_HOST = 'localhost'
MQ_USER = secrets.MQ_USER
MQ_PASS = secrets.MQ_PASS
MQ_EXCHANGE = 'EMIEI'
MQ_QUEUE_OUT = 'EMIEI-API-OUT'
# Configure Pi for LoRa
BOARD.setup()
......@@ -98,18 +110,32 @@ class SensorResponse(object):
def to_json(self):
return json.dumps(self, default=lambda o: o.__dict__, sort_keys=True)
"""
Publish Message to RMQ.
Consumer will pick up message and handle a reliable transmission to the API.
"""
def send_to_api(self):
headers = {'Content-Type': 'Application/json'}
response = requests.post(API_URL + '/sensor/reading', data=self.to_json(), headers=headers)
# If using self-signed cert turn off signature validation
# response = requests.post(API_URL + '/sensor/reading', data=self.ToJson(), headers=headers, verify=False) #
if DEBUG > 1:
print(self.to_json())
print(response) # TODO Ensure 200 Response!
return
connection = pika.BlockingConnection(pika.ConnectionParameters(host=MQ_HOST,credentials=pika.PlainCredentials(MQ_USER, MQ_PASS)))
channel = connection.channel()
channel.exchange_declare(exchange=MQ_EXCHANGE, exchange_type='direct', durable=True, auto_delete=False)
channel.queue_declare(queue=MQ_QUEUE_OUT, durable=True, exclusive=False, auto_delete=False)
channel.queue_bind(queue=MQ_QUEUE_OUT, exchange=MQ_EXCHANGE, routing_key=MQ_QUEUE_OUT)
queueItem = {
"timestamp" : datetime.datetime.today().strftime('%Y-%m-%d_%H-%M-%S'),
"URL" : API_URL + "/sensor/reading",
"DATA" : self.to_json(),
"HEADERS" : {"Content-Type": "Application/json"}
}
channel.basic_publish(
exchange=MQ_EXCHANGE,
routing_key=MQ_QUEUE_OUT,
body=json.dumps(queueItem)
)
## TODO Ensure this worked, return true/false as appropriate
return True
class GatewayMetadata(object):
"""JSON Object received from sensor when they transmit a "TX Payload". Component within SensorResponse(object)
......@@ -326,6 +352,48 @@ class RxHelloQueue:
self.sensorID = sensor_id
self.reservedUntil = time() + reservation_duration + 1
class MqConsumer:
def __init__(self):
self.connection = pika.BlockingConnection(pika.ConnectionParameters(host=MQ_HOST,credentials=pika.PlainCredentials(MQ_USER, MQ_PASS)))
self.channel = self.connection.channel()
self.channel.exchange_declare(exchange=MQ_EXCHANGE, exchange_type='direct', durable=True, auto_delete=False)
self.channel.queue_declare(queue=MQ_QUEUE_OUT, durable=True, exclusive=False, auto_delete=False)
self.channel.queue_bind(queue=MQ_QUEUE_OUT, exchange=MQ_EXCHANGE, routing_key=MQ_QUEUE_OUT)
self.channel.basic_qos(prefetch_count=5)
self.channel.basic_consume(MQ_QUEUE_OUT, self.callback, auto_ack=False)
def run(self):
logging.info('[MQ Consumer] [i] Waiting for messages')
self.channel.start_consuming()
# Consume message & try to send to API. Require 200 response
def callback(self, ch, method, properties, body):
content = json.loads(body.decode())
data = json.loads(content['DATA'])
logging.info(data['sensorMetadata'])
logging.info(content)
try:
# If using self-signed cert turn off signature validation
# response = requests.post(API_URL + '/sensor/reading', data=self.ToJson(), headers=headers, verify=False) #
response = requests.post(content['URL'], data=content['DATA'], headers=content['HEADERS'])
except Exception as e:
logging.info("[MQ Consumer] [-] API Error (%s) During Transmission" % e)
self.channel.basic_reject(delivery_tag=method.delivery_tag) # Re-Queue
return
# API Submissions Succesful
if response.status_code == 200:
logging.info("[MQ Consumer] [+] Successfully transmitted messageID %s (%s) from sensor %s to API" % (data['sensorMetadata']['messageID'], content['timestamp'], data['sensorMetadata']['uid']))
self.channel.basic_ack(delivery_tag=method.delivery_tag) # Ack from queue
return
# Unsucesful
logging.info("[MQ Consumer] [-] API Error (%s) when transmitting messageID %s (%s) from sensor %s to API" % (response.status_code, content['DATA']['sensorMetadata']['messageID'], content['timestamp'], content['DATA']['sensorMetadata']['uid']))
self.channel.basic_reject(delivery_tag=method.delivery_tag) # Re-Queue
return
def get_gateway_serial():
"""Extract serial number from cpu info file
......@@ -416,16 +484,12 @@ def handle_sensor_response_packet(data):
if DEBUG > 1:
print(" Received Payload: %s" % p.to_json())
# TODO Transmit ACK
print(" Sending Sensor Response Ack")
ack_message(p)
print("[+] Sensor Response Ack Sent")
# TODO Process response? Rabbit MQ?
# Send Data To API
print(" Sending Data to API")
p.send_to_api()
if p.send_to_api():
print(" Sending Sensor Response Ack")
ack_message(p) # TODO Transmit ACK bak to sensor
print("[+] Sensor Response Ack Sent")
print("[+] Data Sent to API")
sleep(0.2)
......@@ -456,31 +520,49 @@ def handle_incoming_lora_packet(rx_hello_queue, data):
if handle_sensor_response_packet(data):
return
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO, filename=LOG_FILE, filemode="a+", format="%(asctime)-15s %(levelname)-8s %(message)s")
logging.getLogger("pika").setLevel(logging.WARNING) # Reduce logging level
# Setup Receiver
loraReceiver = LoRaReceiver(verbose=False)
args = parser.parse_args(loraReceiver)
# Setup Receiver
loraReceiver = LoRaReceiver(verbose=False)
args = parser.parse_args(loraReceiver)
loraReceiver.set_mode(MODE.STDBY)
loraReceiver.set_pa_config(pa_select=PA_SELECT.PA_BOOST)
loraReceiver.set_pa_config(max_power=300, output_power=500) # Default max_power = 10.8dB, output_power = 4.2dB
loraReceiver.set_mode(MODE.STDBY)
loraReceiver.set_pa_config(pa_select=PA_SELECT.PA_BOOST)
loraReceiver.set_pa_config(max_power=300, output_power=500) # Default max_power = 10.8dB, output_power = 4.2dB
print("[+] Receiver & API Gateway")
print("[i] Power Levels:", loraReceiver.get_pa_config(convert_dBm=True))
print("[+] Receiver & API Gateway")
print("[i] Power Levels:", loraReceiver.get_pa_config(convert_dBm=True))
loraReceiver.set_agc_auto_on(1)
assert (loraReceiver.get_agc_auto_on() == 1)
loraReceiver.set_agc_auto_on(1)
assert (loraReceiver.get_agc_auto_on() == 1)
try:
loraReceiver.receive()
# Start Consumer
subscriber_list = []
subscriber_list.append(MqConsumer())
except KeyboardInterrupt:
sys.stdout.flush()
print("")
sys.stderr.write("KeyboardInterrupt\n")
process_list = []
for sub in subscriber_list:
process = Process(target=sub.run)
process.start()
process_list.append(process)
finally:
sys.stdout.flush()
print("")
loraReceiver.set_mode(MODE.SLEEP)
BOARD.teardown()
try:
# TODO Start Consumer here
loraReceiver.receive()
except KeyboardInterrupt:
sys.stdout.flush()
print("")
sys.stderr.write("KeyboardInterrupt\n")
finally:
sys.stdout.flush()
print("")
loraReceiver.set_mode(MODE.SLEEP)
BOARD.teardown()
# wait for finish
for process in process_list:
process.join()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment