You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

60 lines
1.3 KiB

  1. import json
  2. import os
  3. import sys
  4. from paho.mqtt.client import Client
  5. from sqlalchemy import Column, DateTime, Integer, String, create_engine
  6. from sqlalchemy.ext.declarative import declarative_base
  7. from sqlalchemy.orm import sessionmaker
  8. DATABASE_URL = os.getenv('DATABASE_URL')
  9. if DATABASE_URL is None:
  10. print('Please set DATABASE_URL')
  11. sys.exit(1)
  12. MQTT_SERVER = os.getenv('MQTT_SERVER') or 'localhost'
  13. MQTT_PORT = os.getenv('MQTT_PORT') or '1883'
  14. mqtt = Client('rtlamr-postgres')
  15. engine = create_engine(DATABASE_URL)
  16. Base = declarative_base()
  17. Session = sessionmaker(bind=engine)
  18. session = Session()
  19. class Consumption(Base):
  20. __tablename__ = 'consumption'
  21. id = Column(Integer, primary_key=True)
  22. timestamp = Column(DateTime, index=True)
  23. metertype = Column(String(4))
  24. meterid = Column(String(32))
  25. consumption = Column(Integer)
  26. def on_message(c, obj, msg):
  27. payload = json.loads(msg.payload)
  28. session.add(Consumption(
  29. timestamp=payload['time'],
  30. metertype=msg.topic.split('/')[1],
  31. meterid=msg.topic.split('/')[2],
  32. consumption=payload['consumption']
  33. ))
  34. session.commit()
  35. if __name__ == '__main__':
  36. Base.metadata.create_all(engine)
  37. mqtt.on_message = on_message
  38. mqtt.connect(MQTT_SERVER, int(MQTT_PORT), 60)
  39. mqtt.subscribe('meter/#')
  40. mqtt.loop_forever()