NSDT工具推荐: Three.js AI纹理开发包 - YOLO合成数据生成器 - GLTF/GLB在线编辑 - 3D模型格式在线转换 - 可编程3D场景编辑器 - REVIT导出3D模型插件 - 3D模型语义搜索引擎 - AI模型在线查看 - Three.js虚拟轴心开发包 - 3D模型在线减面 - STL模型在线切割 - 3D道路快速建模
尽管时间序列数据可以存储在 MySQL 或 PostgreSQL 数据库中,但这并不是特别有效。如果你想要存储来自数千个不同传感器、服务器、容器或设备的每分钟变化的数据(每年超过 50 万个数据点!),将不可避免地遇到可扩展性问题。在使用关系数据库时,对这些数据进行查询或执行聚合也会导致性能问题。
另一方面,时间序列数据库 (TSDB) 经过优化以存储时间序列数据点。这在以下情况下特别有用:
- 分析股票价格的财务趋势。
- 销售预测。
- 监控 API 或 Web 服务的日志和指标。
- 出于安全目的监控来自汽车或飞机的传感器数据。
- 跟踪智能电网等物联网设备的用电量。
- 在比赛期间跟踪运动员的生命体征和表现。
InfluxDB创建了一个开源时间序列数据库,使开发人员可以更轻松地处理时间序列数据。本文将向您展示如何使用 Python 设置 InfluxDB,以及使用 Yahoo Finance API 获取的股票数据。
可以在此 仓库中访问本教程中编写的所有代码。
1、为什么使用 InfluxDB?
InfluxDB 带有一个预先构建的仪表板,你可以在其中分析时间序列数据,而无需太多基础工作。我们不要忘记它的性能优于 Elasticsearch 和 Cassandra。
InfluxDB有一个可以在本地运行的免费开源版本,还有一个支持 AWS、GCP 和 Azure 等主要云服务的云版本。
2、使用 Python 设置 InfluxDB
在开始之前,请确保你的计算机上安装了Python 3.6 或更高版本。此外还需要一个虚拟环境。本文使用venv,但你也可以使用 conda、pipenv 或 pyenv。
最后,关于Flux 查询的一些经验。
本指南使用模块influxdb-client-python 与 InfluxDB 交互。该库仅支持 InfluxDB 2.x 和 InfluxDB 1.8+,并且需要 Python 3.6 或更高版本。
可以了,让我们开始安装和连接客户端库。
如果你的计算机上安装了 Docker,可以使用以下命令简单地运行 InfluxDB 的 Docker Image:
1 2 | bash docker run --name influxdb -p 8086:8086 influxdb:2.1.0 |
如果没有 Docker,请在此处下载适用于你的操作系统的软件 并进行安装。如果你在 Mac 上运行 InfluxDB,可以使用 Homebrew 来安装它:
1 2 | bash brew install influxdb |
如果正在运行 Docker 映像,则可以直接转到localhost 8086。但是,如果是下载并安装了该软件,则需要在命令行中输入以下内容:
1 2 | bash influxd |
浏览器访问localhost 8086
,应该在 屏幕上看到:
单击 **Get Started**,将重定向到以下页面:
对于本教程,选择 **Quick Start** 并在此页面上输入你的信息:
也可以稍后创建组织和存储桶,但现在,只需为每个字段选择一个简单的名称。
注册后,你应该会在仪表板页面上找到自己。单击 **Load your data** 然后选择 **Python** 客户端库。
现在应该看到以下屏幕:
在 **Token** 下,应该已经列出了一个token。但是,如果你愿意,可以为本教程生成一个新的token。单击 **Generate Token** 并选择 **All Access Token** 因为你将在本教程的后面部分更新和删除数据。
请注意,此时 InfluxDB 会发出警告,但现在可以忽略它。
现在,必须设置一个 Python 虚拟环境。为教程创建一个新文件夹:
1 2 | bash mkdir influxDB-Tutorial |
然后将你的目录更改为新文件夹:
1 2 | bash cd influxDB-Tutorial |
创建虚拟环境:
1 2 | bash python3 -m venv venv |
激活它。
1 2 | bash source venv/bin/activate |
最后,安装 InfluxDB 的客户端库:
1 2 | bash pip install influxdb-client |
创建一个名为 的新文件__init.py__
,然后返回 InfluxDB UI:
选择适当的token和存储桶,然后复制 **Initialize the Client** 下的代码片段并将其粘贴到你的 Python 文件中。如果更改令牌/存储桶选择,代码片段将自动更新。
接下来,运行Python 文件:
1 2 | bash python3 __init__.py |
如果终端没有显示错误消息,则已成功连接到 InfluxDB。
要遵循最佳实践,可以将凭据存储在 .env 文件中。创建一个名为.env
并存储以下信息的文件:
1 2 3 4 | bash TOKEN = 'YOUR TOKEN' ORG = 'YOUR ORG NAME' BUCKET = 'YOUR BUCKET NAME' |
然后安装python-dotenv
模块以读取 .env 变量:
1 2 | bash pip3 install python-dotenv |
最后,更新你的 Python 文件以从 .env 文件加载数据:
from datetime import datetime | |
from dotenv import load_dotenv, main | |
import os | |
from influxdb_client import InfluxDBClient, Point, WritePrecision | |
from influxdb_client.client.write_api import SYNCHRONOUS | |
load_dotenv() | |
# You can generate a Token from the "Tokens Tab" in the UI | |
token = os.getenv('TOKEN') | |
org = os.getenv('ORG') | |
bucket = os.getenv('BUCKET') | |
client = InfluxDBClient(url="http://localhost:8086", token=token) |
url
请注意,如果你使用的是 InfluxDB Cloud 帐户,则需要更改参数。URL 将取决于你选择的云区域。可以在此处的文档中找到云 URL。
本教程稍后将需要导入 DateTime 模块和 InfluxDB 库的行。在开始时将所有导入语句放在一起是一个好习惯。但是,如果愿意,也可以在必要时导入它们。
或者,可以将你的凭据存储在具有扩展名.ini
或者.toml
的文件中,使用from_config_file
函数连接到 InfluxDB。
3、使用 influxdb-client-python 进行 CRUD 操作
本文使用 Python 中的yfinance 模块来收集一些历史股票数据。使用以下命令安装它:
1 2 | bash pip install yfinance |
可以使用以下代码片段来获取数据:
1 2 3 4 | python import yfinance as yf data = yf.download("MSFT", start="2021-01-01", end="2021-10-30") print(data.to_csv()) |
确保将文件名参数传递给to_csv
方法;这将在本地存储 CSV,以便稍后读取数据。
或者,可以从GitHub存储库获取 CSV 文件。
接下来,创建一个类并将 CRUD 操作添加为其方法:
class InfluxClient: | |
def __init__(self,token,org,bucket): | |
self._org=org | |
self._bucket = bucket | |
self._client = InfluxDBClient(url="http://localhost:8086", token=token) |
如果使用 InfluxDB 的云实例,需要将 URL 参数替换为适当的云区域。
要创建该类的实例,请使用以下命令:
1 2 | python IC = InfluxClient(token,org,bucket) |
4、写入数据
InfluxDBClient 有一个名为的方法write_api
,用于将数据写入数据库。下面是这个方法的代码片段:
from influxdb_client.client.write_api import SYNCHRONOUS, ASYNCHRONOUS | |
def write_data(self,data,write_option=SYNCHRONOUS): | |
write_api = self._client.write_api(write_option) | |
write_api.write(self._bucket, self._org , data,write_precision='s') |
InfluxDBClient 支持异步和同步写入,可以根据需要指定写入类型。有关异步写入的更多信息,请参阅“如何在influxdb-client
中使用 Asyncio ”。
data
参数可以写成三种不同的方式,如下图:
4.1 线路协议字符串
1 2 3 | python # Data Write Method 1 IC.write_data(["MSFT,stock=MSFT Open=62.79,High=63.84,Low=62.13"]) |
请注意,字符串必须遵循特定格式:
1 | measurementName,tagKey=tagValue fieldKey1="fieldValue1",fieldKey2=fieldValue2 timestamp |
tagValue 和第一个 fieldKey 之间有一个空格,最后一个 fieldValue 和 timeStamp 之间有另一个空格。解析时,这些空格用作分隔符;因此,必须按照上面显示的方式对其进行格式化。另请注意,在这种情况下,我假设第一个字段值fieldValue1
是一个字符串,fieldValue2
而是一个数字。因此,fieldValue1
应该出现在引号中。
另请注意,时间戳是可选的。如果没有提供时间戳,InfluxDB 使用其主机的系统时间(UTC)。可以在此处阅读有关线路协议的更多信息。
4.2 数据点结构
1 2 3 4 5 6 7 8 9 10 11 12 | python # Data Write Method 2 IC.write_data( [ Point('MSFT') .tag("stock","MSFT") .field("Open",62.79) .field("High",63.38) .field("Low",62.13) .time(int(datetime.strptime('2021-11-07','%Y-%m-%d').timestamp())) ], ) |
如果不想处理 Line Protocol String 中的格式,可以使用 Point() 类。这可确保你的数据正确序列化为线路协议。
4.3 字典样式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 | python # Data Write Method 3 IC.write_data([ { "measurement": "MSFT", "tags": {"stock": "MSFT"}, "fields": { "Open": 62.79, "High": 63.38, "Low": 62.13, }, "time": int(datetime.strptime('2021-11-07','%Y-%m-%d').timestamp()) }, { "measurement": "MSFT_DATE", "tags": {"stock": "MSFT"}, "fields": { "Open": 62.79, "High": 63.38, "Low": 62.13, }, } ],write_option=ASYNCHRONOUS) |
在此方法中,将传递两个数据点并将写入选项设置为ASYNCHRONOUS
. 这是 Python 友好的,因为数据作为字典传递。
写入数据的所有不同方式都合并在以下要点中:
# Data Write Method 1 | |
IC.write_data(["MSFT_2021-11-07_Line_Protocol,stock=MSFT Open=62.79,High=63.84,Low=62.13"]) | |
# Data Write Method 2 | |
IC.write_data( | |
[ | |
Point('MSFT_2021-11-07_Point_Class') | |
.tag("stock","MSFT") | |
.field("Open",65) | |
.field("High",63.38) | |
.field("Low",62.13) | |
.time(int(datetime.strptime('2021-11-07','%Y-%m-%d').timestamp())) | |
], | |
) | |
# Data Write Method 3 | |
IC.write_data([ | |
{ | |
"measurement": "MSFT_2021-11-07_Dictionary_Method", | |
"tags": {"stock": "MSFT"}, | |
"fields": { | |
"Open": 66, | |
"High": 63.38, | |
"Low": 62.13, | |
}, | |
"time": int(datetime.strptime('2021-11-07','%Y-%m-%d').timestamp()) | |
}, | |
{ | |
"measurement": "MSFT_DATE", | |
"tags": {"stock": "MSFT"}, | |
"fields": { | |
"Open": 67, | |
"High": 63.38, | |
"Low": 62.13, | |
}, | |
} | |
],write_option=ASYNCHRONOUS) |
接下来,插入 MSFT 股票和 AAPL 股票的所有数据。由于数据存储在 CSV 文件中,因此可以使用第一种方法 — 行协议字符串 — 来写入数据:
import csv | |
MSFT_file = open('Data/MSFT.csv') | |
csvreader = csv.reader(MSFT_file) | |
header = next(csvreader) | |
rows = [] | |
for row in csvreader: | |
date,open,high,low = row[0],row[1],row[2],row[3] | |
line_protocol_string = '' | |
line_protocol_string+=f'MSFT_{date},' | |
line_protocol_string+=f'stock=MSFT ' | |
line_protocol_string+=f'Open={open},High={high},Low={low} ' | |
line_protocol_string+=str(int(datetime.strptime(date,'%Y-%m-%d').timestamp())) | |
rows.append(line_protocol_string) | |
IC.write_data(rows) |
可以通过将文件路径和字符串从 MSFT 更改为 AAPL 来插入 AAPL 股票的数据:
1 2 3 | python3 AAPL_file = open('Data/AAPL.csv') csvreader = csv.reader(AAPL_file) |
5、读取数据
InfluxDBClient
还有一个方法叫做query_api
,可以用来读取数据。可以将查询用于各种目的,例如根据特定日期过滤数据、聚合时间范围内的数据、查找时间范围内的最高/最低值等等。它们类似于在 SQL 中使用的查询。从 InfluxDB 读取数据时,您需要使用查询。
以下代码用于我们类的 read 方法:
def query_data(self,query): | |
query_api = self._client.query_api() | |
result = query_api.query(org=self._org, query=query) | |
results = [] | |
for table in result: | |
for record in table.records: | |
results.append((record.get_value(), record.get_field())) | |
print(results) | |
return results |
在这里,它接受一个查询,然后执行它。查询的返回值是与你的查询匹配的 Flux 对象的集合。Flux 对象有以下方法:
1 2 3 4 | .get_measurement() .get_field() .values.get(“<your tags>”) .get_time() |
下面显示了两个查询示例,它们演示了该query_data
功能的实际作用。第一个查询返回自 2021 年 10 月 1 日以来 MSFT 股票的高值,第二个查询返回 2021 年 10 月 29 日 MSFT 股票的高值。
''' | |
Return the High Value for MSFT stock for since 1st October,2021 | |
''' | |
query1 = 'from(bucket: "TestBucket")\ | |
|> range(start: 1633124983)\ | |
|> filter(fn: (r) => r._field == "High")\ | |
|> filter(fn: (r) => r.stock == "MSFT")' | |
IC.query_data(query1) | |
''' | |
Return the High Value for the MSFT stock on 2021-10-29 | |
''' | |
query2 = 'from(bucket: "TestBucket")\ | |
|> range(start: 1633124983)\ | |
|> filter(fn: (r) => r._field == "High")\ | |
|> filter(fn: (r) => r._measurement == "MSFT_2021-10-29")' | |
IC.query_data(query2) |
确保根据需要更改查询开头的存储桶名称。就我而言,我的存储桶名称是 *TestBucket*。
6、更新数据
与写入和查询 API 不同,InfluxDB 没有更新 API。下面的陈述取自他们关于如何处理重复数据点的文档。
对于具有相同测量名称、标签集和时间戳的点,InfluxDB 创建新旧字段集的并集。对于任何匹配的字段键,InfluxDB 使用新点的字段值
要更新数据点,您需要拥有名称、标签集和时间戳,并且只需执行写入操作。
7、删除数据
可以使用 delete_api
删除数据。下面是一些演示如何删除数据的代码:
def delete_data(self,measurement): | |
delete_api = self._client.delete_api() | |
start = "1970-01-01T00:00:00Z" | |
stop = "2021-10-30T00:00:00Z" | |
delete_api.delete(start, stop, f'_measurement="{measurement}"', bucket=self._bucket, org=self._org) |
删除功能需要数据点的测量值。以下代码显示了删除函数的一个简单用例:
''' | |
Delete Data Point with measurement = 2021-10-29 | |
''' | |
IC.delete_data("MSFT_2021-10-29") | |
''' | |
Return the High Value for the MSFT stock on 2021-10-29 | |
''' | |
query2 = 'from(bucket: "TestBucket")\ | |
|> range(start: 1633124983)\ | |
|> filter(fn: (r) => r._field == "High")\ | |
|> filter(fn: (r) => r._measurement == "MSFT_2021-10-29")' | |
IC.query_data(query2) |
InfluxDB 的文档包括一个编写数据的最佳实践列表。还有一些数据布局和架构设计的最佳实践,我们应该遵循这些实践以获得最佳结果。
8、时间序列数据库的一些实际用例
本文研究了一个使用 TSDB 存储股票价值的简单用例,因此你可以分析历史股票价格并预测未来价值。但是,也可以使用物联网设备、销售数据和任何其他随时间变化的数据系列。
其他一些实际用例包括:
9、结论
希望本指南能够帮助你设置自己的 InfluxDB 实例。我们学习了如何使用 InfluxDB 的 Python 客户端库构建一个简单的应用程序来执行 CRUD 操作,但是如果想仔细查看任何内容,可以在此处找到包含整个源代码的 repo。
查看InfluxDB 的开源 TSDB。它有十种编程语言的客户端库,包括 Python、C++ 和 JavaScript,它还有很多内置的可视化工具,所以你可以准确地看到你的数据在做什么。
原文链接:Getting Started with Python and InfluxDB
BimAnt翻译整理,转载请标明出处