您的位置:首页 > 编程语言 > Python开发

使用Python编写脚本将MQTT数据转存至InfluxDB

2018-02-26 22:47 549 查看

前言

之前使用Rabbitmq部署了一个简单的MQTT服务器,暂未做用户隔离,也部署了InfluxDB时序数据库,但是并不能直接通过配置将MQTT服务器的数据转存至时序数据库中,于是我决定自己写脚本实现下.

准备

打开shell使用

pip install influxdb
安装InluxDB所需模块

pip install paho-mqtt
安装Rabbmq所需模块

源码

# coding=utf-8
import json
import random
import threading
import os
import paho.mqtt.client as mqtt
import time
from influxdb import InfluxDBClient
from my_lib.code_handle.code_handle import auto_code

class Mqtt_handle:
topic_sub='$dp'
topic_pub='$info'
counts = 0
clientID = ''
for i in range(0, 2):
clientID = clientID.join(str(random.uniform(0, 1)))
mqtt_client = mqtt.Client(clientID)
DB_client = InfluxDBClient('118.89.106.236', 8086, '', '', 'mydb')  # 初始化

def __init__(self, host, port):
self._host = host
self._port = port
self.mqtt_client.on_connect = self._on_connect  # 设置连接上服务器回调函数
self.mqtt_client.on_message = self._on_message  # 设置接收到服务器消息回调函数

def connect(self, username=None, password=None):
self.mqtt_client.username_pw_set(username, password)
self.mqtt_client.connect(self._host, self._port, 60)  # 连接服务器,端口为1883,维持心跳为60秒

def publish(self, data):
self.mqtt_client.publish(self.topic_pub, data)

def loop(self, timeout=None):
thread = threading.Thread(target=self._loop, args=(timeout,))
thread.start()

def _loop(self, timeout=None):
if not timeout:
self.mqtt_client.loop_forever()
else:
self.mqtt_client.loop(timeout)

def _on_connect(self, client, userdata, flags, rc):
local_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
with open('./run.log', 'a+')as f:
f.write('@Run ' + local_time + ' Connected with result code :' + str(rc))
client.subscribe(self.topic_sub)

def _on_message(self, client, userdata, msg):  # 从服务器接受到消息后回调此函数
data_json = auto_code(str(msg.payload))
if self._is_json(data_json):
data_list = [json.loads(data_json)]
#如果符合InfluxDB格式就转存至数据库
if 'measurement' in data_list[0] and 'tags' in data_list[0] and 'fields' in data_list[0]:
try:
DB_client.write_points(data_list)
self.counts += 1
local_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
#//**********//记录一个上传日志
with open('./upload.log', 'a+')as f:
f.write('Success,counts:' + str(self.counts) + ' Time:' + local_time + '\n')
except Exception as e:
with open('./upload.log', 'a+')as f:
f.write(e.message)
f.write('\nTopic:' + auto_code(str(msg.topic)) + " Msg:" + data_json + '\n\n')
#//**********//
#如果接受到停止指令就停止程序并记录一个停止日志
elif data_list[0].has_key('cmd') and data_list[0]['cmd'] == 'exit':
print '\n@mqtt_handle Exit\n'
with open('./run.log', 'a')as f:
local_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
f.write(
'@Stop ' + local_time + ' Topic:' + auto_code(str(msg.topic)) + " Msg:" + data_json + '\n')
os._exit(0)#停止程序
#解析JSON前先判断数据 是否是JSON格式,避免程序崩溃
def _is_json(self, data):
try:
json.loads(data)
except ValueError:
return False
return True

if __name__ == '__main__':
local_host = '127.0.0.1'
DB_client = InfluxDBClient(local_host, 8086, '', '', 'mydb')  # 初始化
mqtt_client = Mqtt_handle(local_host, 1883)
mqtt_client.connect('influxdb', 'influxdb')
mqtt_client.loop()


写一个简单的Shell运行一下脚本:

#!/bin/bash
nohup python /home/ubuntu/app/py/mqttDB/mqtt_handle.py &
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: