commit 19900291f60646f14b808d0788a81d5a2e9bccc6 Author: Michael Smith Date: Sat Oct 27 18:03:26 2018 +0200 Initial commit diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..894a44c --- /dev/null +++ b/.gitignore @@ -0,0 +1,104 @@ +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +.hypothesis/ +.pytest_cache/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# pyenv +.python-version + +# celery beat schedule file +celerybeat-schedule + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..9425bb5 --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2018 Michael Smith + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/README.md b/README.md new file mode 100644 index 0000000..f8fd4dd --- /dev/null +++ b/README.md @@ -0,0 +1,15 @@ +# py-cayenne-lpp-decoder +Cayenne Low Power Payload decoder written in Python. +See [Cayenne Low Power Payload Documentation](https://mydevices.com/cayenne/docs/lora/#lora-cayenne-low-power-payload) for more information. + +The decoder expects a base64 encoded payload string, commonly used in LoRaWAN implementations. + +## Usage + +```python +>>> import cayennelpp +>>> print(cayennelpp.lppdecode('A2cBEAVnAP8=')) +[{'channel': 3, 'type': 'Temperature Sensor', 'value': 27.2}, {'channel': 5, 'type': 'Temperature Sensor', +'value': 25.5}] + +``` diff --git a/cayennelpp.py b/cayennelpp.py new file mode 100644 index 0000000..f8d269a --- /dev/null +++ b/cayennelpp.py @@ -0,0 +1,139 @@ +# -*- coding: utf-8 -*- +import base64 +import binascii + + +def one_per_bit_unsigned(data): + return int.from_bytes(data, byteorder='big', signed=False) + + +def point_zero_one_per_bit_signed(data): + return int.from_bytes(data, byteorder='big', signed=True) / 100 + + +def temperature(data): + return int.from_bytes(data, byteorder='big', signed=True) / 10 + + +def humidity(data): + return int.from_bytes(data, byteorder='big', signed=False) / 2 + + +def accelerometer(data): + x = int.from_bytes(data[:2], byteorder='big', signed=True) / 1000 + y = int.from_bytes(data[2:4], byteorder='big', signed=True) / 1000 + z = int.from_bytes(data[4:6], byteorder='big', signed=True) / 1000 + + return {'x': x, 'y': y, 'z': z} + + +def barometer(data): + return int.from_bytes(data, byteorder='big', signed=False) / 10 + + +def gyrometer(data): + x = int.from_bytes(data[:2], byteorder='big', signed=True) / 100 + y = int.from_bytes(data[2:4], byteorder='big', signed=True) / 100 + z = int.from_bytes(data[4:6], byteorder='big', signed=True) / 100 + + return {'x': x, 'y': y, 'z': z} + + +def gps_location(data): + lat = int.from_bytes(data[:3], byteorder='big', signed=True) / 10000 + lon = int.from_bytes(data[3:6], byteorder='big', signed=True) / 10000 + alt = int.from_bytes(data[6:], byteorder='big', signed=True) / 100 + + return {'lat': lat, 'lon': lon, 'alt': alt} + + +DATA_TYPES = { + b'\x00': { + "name": "Digital Input", + "size": 1, + "decoder": one_per_bit_unsigned + }, + b'\x01': { + "name": "Digital Output", + "size": 1, + "decoder": one_per_bit_unsigned + }, + b'\x02': { + "name": "Analog Input", + "size": 2, + "decoder": point_zero_one_per_bit_signed + }, + b'\x03': { + "name": "Analog Output", + "size": 2, + "decoder": point_zero_one_per_bit_signed + }, + b'\x65': { + "name": "Illuminance Sensor", + "size": 2, + "decoder": one_per_bit_unsigned + }, + b'\x66': { + "name": "Presence Sensor", + "size": 1, + "decoder": one_per_bit_unsigned + }, + b'\x67': { + "name": "Temperature Sensor", + "size": 2, + "decoder": temperature + }, + b'\x68': { + "name": "Humidity Sensor", + "size": 1, + "decoder": humidity + }, + b'\x71': { + "name": "Accelerometer", + "size": 6, + "decoder": accelerometer + }, + b'\x73': { + "name": "Barometer", + "size": 2, + "decoder": barometer + }, + b'\x86': { + "name": "Gyrometer", + "size": 6, + "decoder": gyrometer + }, + b'\x88': { + "name": "GPS Location", + "size": 9, + "decoder": gps_location + }, +} + + +def lppdecode(base64_string): + cursor = 0 + result = [] + payload = base64.b64decode(base64_string) + + while cursor < len(payload): + data_channel = int.from_bytes(payload[cursor:cursor + 1], + byteorder='big') + cursor += 1 + data_type = payload[cursor:cursor + 1] + if data_type in DATA_TYPES: + cursor += 1 + name = DATA_TYPES[data_type]['name'] + size = DATA_TYPES[data_type]['size'] + decoder = DATA_TYPES[data_type]['decoder'] + result.append( + {"channel": data_channel, + "type": name, + "value": decoder(payload[cursor:cursor + size])}) + cursor += size + else: + # Unknown LPP data type + result = None + break + + return result diff --git a/test_decoder.py b/test_decoder.py new file mode 100644 index 0000000..01c1d6e --- /dev/null +++ b/test_decoder.py @@ -0,0 +1,66 @@ +# -*- coding: utf-8 -*- +import unittest +import cayennelpp + + +class DecoderTestCase(unittest.TestCase): + """Unit tests for Cayenne LPP decoder.""" + + def test_two_temperature_sensors(self): + result = cayennelpp.lppdecode('A2cBEAVnAP8=') + self.assertEqual(result, + [{'channel': 3, + 'type': 'Temperature Sensor', + 'value': 27.2}, + {'channel': 5, + 'type': 'Temperature Sensor', + 'value': 25.5}]) + + def test_temperature_and_acceleration_sensors(self): + result = cayennelpp.lppdecode('AWf/1wZxBNL7LgAA') + self.assertEqual(result, + [{'channel': 1, + 'type': 'Temperature Sensor', + 'value': -4.1}, + {'channel': 6, + 'type': 'Accelerometer', + 'value': {'x': 1.234, 'y': -1.234, 'z': 0.0}}]) + + def test_decode_gps(self): + result = cayennelpp.lppdecode('AYgGdl/ylgoAA+g=') + self.assertEqual(result, + [{'channel': 1, + 'type': 'GPS Location', + 'value': {'lat': 42.3519, + 'lon': -87.9094, + 'alt': 10.0}}]) + + def test_decode_all_previous_tests_combined(self): + result = cayennelpp.lppdecode( + 'AWcBEAVnAP8CZ//XA3EE0vsuAAAEiAZ2X/KWCgAD6A==') + self.assertEqual(result, + [{'channel': 1, + 'type': 'Temperature Sensor', + 'value': 27.2}, + {'channel': 5, + 'type': 'Temperature Sensor', + 'value': 25.5}, + {'channel': 2, + 'type': 'Temperature Sensor', + 'value': -4.1}, + {'channel': 3, + 'type': 'Accelerometer', + 'value': {'x': 1.234, 'y': -1.234, 'z': 0.0}}, + {'channel': 4, + 'type': 'GPS Location', + 'value': {'lat': 42.3519, + 'lon': -87.9094, + 'alt': 10.0}}]) + + def test_decode_unknown_lpp_data_type(self): + result = cayennelpp.lppdecode('AQQBEA==') + self.assertEqual(result, None) + + +if __name__ == '__main__': + unittest.main()