使用Python编写脚本将MQTT数据转存至InfluxDB
2018-02-26 22:47
549 查看
前言
之前使用Rabbitmq部署了一个简单的MQTT服务器,暂未做用户隔离,也部署了InfluxDB时序数据库,但是并不能直接通过配置将MQTT服务器的数据转存至时序数据库中,于是我决定自己写脚本实现下.准备
打开shell使用pip install influxdb安装InluxDB所需模块
pip install paho-mqtt安装Rabbmq所需模块
源码
# coding=utf-8 import json import random import threading import os import paho.mqtt.client as mqtt import time from influxdb import InfluxDBClient from my_lib.code_handle.code_handle import auto_code class Mqtt_handle: topic_sub='$dp' topic_pub='$info' counts = 0 clientID = '' for i in range(0, 2): clientID = clientID.join(str(random.uniform(0, 1))) mqtt_client = mqtt.Client(clientID) DB_client = InfluxDBClient('118.89.106.236', 8086, '', '', 'mydb') # 初始化 def __init__(self, host, port): self._host = host self._port = port self.mqtt_client.on_connect = self._on_connect # 设置连接上服务器回调函数 self.mqtt_client.on_message = self._on_message # 设置接收到服务器消息回调函数 def connect(self, username=None, password=None): self.mqtt_client.username_pw_set(username, password) self.mqtt_client.connect(self._host, self._port, 60) # 连接服务器,端口为1883,维持心跳为60秒 def publish(self, data): self.mqtt_client.publish(self.topic_pub, data) def loop(self, timeout=None): thread = threading.Thread(target=self._loop, args=(timeout,)) thread.start() def _loop(self, timeout=None): if not timeout: self.mqtt_client.loop_forever() else: self.mqtt_client.loop(timeout) def _on_connect(self, client, userdata, flags, rc): local_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())) with open('./run.log', 'a+')as f: f.write('@Run ' + local_time + ' Connected with result code :' + str(rc)) client.subscribe(self.topic_sub) def _on_message(self, client, userdata, msg): # 从服务器接受到消息后回调此函数 data_json = auto_code(str(msg.payload)) if self._is_json(data_json): data_list = [json.loads(data_json)] #如果符合InfluxDB格式就转存至数据库 if 'measurement' in data_list[0] and 'tags' in data_list[0] and 'fields' in data_list[0]: try: DB_client.write_points(data_list) self.counts += 1 local_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())) #//**********//记录一个上传日志 with open('./upload.log', 'a+')as f: f.write('Success,counts:' + str(self.counts) + ' Time:' + local_time + '\n') except Exception as e: with open('./upload.log', 'a+')as f: f.write(e.message) f.write('\nTopic:' + auto_code(str(msg.topic)) + " Msg:" + data_json + '\n\n') #//**********// #如果接受到停止指令就停止程序并记录一个停止日志 elif data_list[0].has_key('cmd') and data_list[0]['cmd'] == 'exit': print '\n@mqtt_handle Exit\n' with open('./run.log', 'a')as f: local_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())) f.write( '@Stop ' + local_time + ' Topic:' + auto_code(str(msg.topic)) + " Msg:" + data_json + '\n') os._exit(0)#停止程序 #解析JSON前先判断数据 是否是JSON格式,避免程序崩溃 def _is_json(self, data): try: json.loads(data) except ValueError: return False return True if __name__ == '__main__': local_host = '127.0.0.1' DB_client = InfluxDBClient(local_host, 8086, '', '', 'mydb') # 初始化 mqtt_client = Mqtt_handle(local_host, 1883) mqtt_client.connect('influxdb', 'influxdb') mqtt_client.loop()
写一个简单的Shell运行一下脚本:
#!/bin/bash nohup python /home/ubuntu/app/py/mqttDB/mqtt_handle.py &
相关文章推荐
- 使用python编写数据检索脚本
- 使用 Python 为 KVM 编写脚本,第 1 部分: libvirt
- 使用python编写批量卸载手机中安装的android应用脚本
- 使用python编写脚本获取手机当前应用的信息
- 使用Python编写一个在Linux下实现截图分享的脚本的教程
- 使用python编写android截屏脚本双击运行即可
- 使用 Python 编写 KVM 脚本,第 2 部分: 添加 GUI 来使用 libvirt 和 Python 管理 KVM
- 使用Python3编写抓取网页和只抓网页图片的脚本
- 扩展Python__在python脚本中使用其他语言(c/c++/java/c#)编写的模块
- 使用 Python 为 KVM 编写脚本,第 1 部分: libvirt
- Delphi中使用python脚本读取Excel数据
- Delphi中使用python脚本读取Excel数据
- 使用Python3编写抓取网页和只抓网页图片的脚本
- 使用 Python 为 KVM 编写脚本,第 1 部分: libvirt
- [zz]使用 Python 为 KVM 编写脚本,第 1 部分: libvirt
- 使用python编写android截屏脚本双击运行即可
- 使用Python编写提取日志中的中文的脚本的方法
- 使用python编写脚本获取手机当前应用apk的信息
- 使用Python编写提取日志中的中文的脚本的方法
- 使用python编写批量卸载手机中安装的android应用脚本