influxDB 时序数据库安装 & flux语法 & restful接口 & nodjsAPI

扫测资讯 2024-11-15 00:07   42 0

安装

Install InfluxDB | InfluxDB OSS v2 Documentation

Debian和Ubuntu用户可以用apt-get包管理来安装最新版本的InfluxDB。

对于Ubuntu用户,可以用下面的命令添加InfluxDB的仓库,添加之后即可apt-get 安装influxdb2

wget -q https://repos.influxdata.com/influxdata-archive_compat.key
echo '393e8779c89ac8d958f81f942f9ad7fb82a25e133faddaf92e15b16e6ac9ce4c influxdata-archive_compat.key' | sha256sum -c && cat influxdata-archive_compat.key | gpg --dearmor | sudo tee /etc/apt/trusted.gpg.d/influxdata-archive_compat.gpg > /dev/null
echo 'deb [signed-by=/etc/apt/trusted.gpg.d/influxdata-archive_compat.gpg] https://repos.influxdata.com/debian stable main' | sudo tee /etc/apt/sources.list.d/influxdata.list
sudo apt-get update && sudo apt-get install influxdb2

通过yum/apt-get安装的软件基本都可以用systemctl命令 (也可自行配置到systemctl中)

systemctl start influxdb  //如果你的系统不支持Systemd:service influxdb2 start

influxd version

生成API Token:

访问http://localhost:8086/

用户名ecmaster 密码 xxxxxxx      org:ecmaster      bucket:ecmaster

默认的ecmaster bucket存在一些系统参数,故新建一个ecsmart bucket存储桶。

进入系统后创建bucket(存储桶是存储时间序列数据):ecsmart

进入API Token,复制token 替换项目文件backend/setting.js中的token

influxdb配置文件默认在: /etc/influxdb/config.toml (修改配置文件位置: export INFLUXD_CONFIG_PATH=/path/to/custom/config/directory)
query-max-memory-bytes = 10737418240 //设置总查询内存最大为10G 默认是无限制

在InfluxDB中实现多用户隔离,通常是通过组织(Org)和存储桶(Bucket)的组合来处理的。

使用组织(Org)可以实现对用户的逻辑隔离,每个用户可以被分配到一个具体的组织,并且不同组织之间的数据是隔离的。这样可以确保不同用户之间的数据相互隔离,各自在独立的组织中进行管理和访问。

使用存储桶(Bucket)可以进一步细分和隔离不同类型或不同权限要求的数据。每个组织可以创建多个存储桶,用于逻辑上组织和隔离不同类型的数据。可以为每个用户创建独立的存储桶,确保他们的数据在存储桶层面上进行隔离和管控。

综合来说,组织用于逻辑上隔离不同用户,每个用户在独立的组织中进行管理;存储桶用于进一步隔离不同类型或不同权限要求的数据,确保数据在存储桶层面上进行隔离和管理。通过组织和存储桶的组合使用,可以实现更细粒度、更灵活的多用户隔离和数据管理。

基本概念

时序数据库特点是写多读少 相对于传统数据库 新增和查询多,基本没有更新和删除 。常用的一种使用场景: 监控数据统计 。每毫秒记录一下电脑内存的使用情况,然后就可以根据统计的数据,利用图形化界面制作内存使用情况的折线图;可以理解为 按时间记录一些数据(常用的监控数据、埋点统计数据等) ,然后制作图表做统计。

数据的修改和删除:

1.InfluxDB 不支持对已有的数据进行直接修改。相反,它采用覆盖写入(overwrite)的方式来实现近似的修改效果。使用INSERT语句将修改后的数据重新写入,并确保时间戳与原始数据相同。

2-1.数据保存策略 retention policy 可指定数据保留时间,超过指定时间,就删除这部分数据。

2-2.可以使用 DELETE 语句来删除指定时间范围内的数据。DELETE FROM measurement_name WHERE time > '2023-06-28' AND time < '2023-06-30'

支持类sql查询语句, 但推荐使用 2 . 0 的flux语句

1. organization: InfluxDB组织是一组用户的工作区。所有仪表板、任务、存储桶和用户都属于一个组织。

2. bucket :数据桶(即 database 数据库 + Retention P olic y 保留策略),所有 InfluxDB 数据都存储在一个存储桶中。

2-1. retention policy :存储策略,用于设置数据保留的时间、集群中存放副本数量以及shard group覆盖的时间范围。 每个数据库刚开始会自动创建一个默认的存储策略 autogen,数据保留时间为永久 ,副本数量为1,shard group持续时间为7天,之后用户可以自己设置。InfluxDB 会定期清除过期的数据。

3. measurement: 类似mysql中表

4. column : tag(带索引的,非必须)、field(不带索引)、timestemp(唯一主键)

5. Point 表里的一行数据 InfluxDB 中单条插入语句的数据结构 。由 同一 时间戳(time) 、数据(field) 集合 和标签(tags) 集合 组成。

INSERT 系统监控 ,device=CPU,variable=数量 系统监控.CPU.数量=2

insert measurement,tag=value,tag=value field=value,field=value

_ measurement _time _start _end _field _value为内置字段。

6. Series series 相当于是 InfluxDB 中一些数据的集合 ,在 同个 bucket 中,retention policy、measurement、tag完全相同的数据同属于一个 series ,同个series的数据在物理上会按照时间顺序排列存储在一起。

series的key为measurement + 所有 tags 的序列化字符串,这个key在之后会经常用到

下面这些概念了解即可:

7. Shard shard是在 tsm 存储引擎之上的一个概念。在 InfluxDB 中按照数据的时间戳所在的范围,会去创建不同的 shard( 比如RP保留时间为 24 小时 每个shard就保存 1 小时的数据 ),每一个 shard 都有自己的 cache、wal、tsm file 以及 compactor,这样做的目的就是为了可以通过时间来快速定位到要查询数据的相关资源,加速查询的过程,并且也让之后的批量删除数据的操作变得非常简单且高效。

每个shard有且只有一个shard group。 单个shard group中可能存在多个shard。 每个shard包含特定的series集合。

7-1. Shard duration 决定了每个shard group的时间跨度。具体由retention policy的 SHARD DURATION决定

7-2. Shard group shard group按照time、retention policy进行组织。每个包含数据的retention policy至少有一个关联的shard group。给定的shard group包含时间区间内所有shard数据。 每个shard group跨越的间隔是shard duration。

8. Continuous Query

CQ 是预先配置好的一些查询命令, 定期自动执行这些命令并将查询结果写入指定的 measurement 中,这个功能主要用于数据聚合

注意

在influxdb中,字段必须存在。因为字段是没有索引的。如果使用字段作为查询条件,会扫描符合查询条件的所有字段值,性能不及tag。 类比一下,fields相当于SQL的没有索引的列。tags是可选的,但是强烈建议你用上它,因为tag是有索引的,tags相当于SQL中的有索引的列。

tag 只能为字符串类型

field 类型无限制

不支持join

存储引擎

存储引擎(TSM)

InfluxDB存储引擎看起来非常类似于LSM树。它主要由cache、wal、tsm file、compactor组成。

Cache

cache是当前存储在WAL中的所有数据的内存副本。point由metric,tag和唯一字段组成。每个字段保持其自己的时间有序范围。cache中的数据未被压缩。对存储引擎的查询将合并Cache和TSM中的数据。

当influxDB启动时,会遍历所有的wal文件,重新构造cache,这样即使系统出现故障,也不会导致数据的丢失。

Wal(Write Ahead Log)

wal的内容与cache相同,其作用就是为了持久化数据,当系统崩溃后可以通过wal文件恢复还没有写入到tsm文件中的数据。

由于数据是被顺序插入到wal文件中,所以写入效率非常高。但是如果写入的数据没有按照时间顺序排列,而是以杂乱无章的方式写入,数据将会根据时间路由到不同的shard中,每一个shard都有自己的wal文件,这样就不再是完全的顺序写入,对性能会有一定影响。官方社区有说后续会进行优化,只使用一个 wal,而不是为每一个shard 创建 wal。

wal单个文件达到一定大小后会进行分片,创建一个新的wal分片文件用于写入数据。

TSM file

TSM文件是内存映射的只读文件的集合。 这些文件的结构看起来与LevelDB或其他LSM树变体中的SSTable非常相似。

一个TSM文件由4个区域组成:header,blocks,index,footer

Compactor

compactor是一个持续的过程,优化存储,提升查询性能,具体做以下几件事:

将已关闭的WAL文件转换为TSM文件并删除已关闭的WAL文件

将较小的TSM文件合并为较大的文件以提高压缩率

删除series data

写入最新的数据,确保TSM文件中point的唯一性。

influxdb-cli & restful接口

mac安装:

brew install influxdb-cli

influx config create --config-name onboarding \
    --host-url "http://10.1.1.115:8086" \
    --token "zTQfDoeRo6C92S_6qjIhvq4KlDp9o-a8wgTMOCBY5Qb-DYVL4WIKr533QtbLgebwLmxtnYscFrR_9fCEEbsvmw==" \
    --active
influx config list  
influx config delete onboarding

org相关操作:

influx org list 查询 默认limit只显示20个。

influx org list --name pppp_ecmaster //超过20个指定name可查询到

influx org delete -i 07668febbebcbbbd //删除org -i 指定id

bucket相关操作

influx bucket list -o undefined_ecmaster //查看指定org的bucket

influx bucket delete -i 6d10e05825be0228 -o undefined_ecmaster

API Token相关操作

influx auth list -o ecmaster //查看指定org的APIToken

user相关操作

influx user list  查询 默认limit只显示20个

influx user list --name pppp_ecmaster //超过20个 指定name才可查询到

influx user delete -i 0ba14fd3b1efe000 //删除用户 指定id

读写数据

influx write --bucket undefined_ecmaster -o undefined_ecmaster  --url https://influx-testdata.s3.amazonaws.com/air-sensor-data-annotated.csv //往bucket中写入数据

influx query -o undefined_ecmaster 'from(bucket:"undefined_ecmaster") |> range(start:-1d,stop:-1ms)' //读取数据

curl方式调用

查询org
curl --request GET "10.1.1.115:8086/api/v2/orgs" \

--header "Authorization: Token zTQfDoeRo6C92S_6qjIhvq4KlDp9o-a8wgTMOCBY5Qb-DYVL4WIKr533QtbLgebwLmxtnYscFrR_9fCEEbsvmw=="

新增 org

curl --request POST "http://10.1.1.115:8086/api/v2/orgs" \

--header "Authorization: Token zTQfDoeRo6C92S_6qjIhvq4KlDp9o-a8wgTMOCBY5Qb-DYVL4WIKr533QtbLgebwLmxtnYscFrR_9fCEEbsvmw==" \

--header "Accept: application/json" \

--header "Content-Type: application/json" \

--data '{"name": "personal_ecmaster1" }'

删除org

curl --request DELETE "10.1.1.115:8086/api/v2/orgs/f70722a047489640" \

--header "Authorization: Token zTQfDoeRo6C92S_6qjIhvq4KlDp9o-a8wgTMOCBY5Qb-DYVL4WIKr533QtbLgebwLmxtnYscFrR_9fCEEbsvmw=="

文档:https://docs.influxdata.com/influxdb/v2/api/#operation/DeleteAuthorizationsID

nodejs 操作influxdb

const { OrgsAPI, BucketsAPI, UsersAPI, AuthorizationsAPI } = require( '@influxdata/influxdb-client-apis');
const { InfluxDB } = require('@influxdata/influxdb-client');

创建 org user APIToken
const url = `http://${influxHost}:${influxPort}`;
const token = 'zTQfDoeRo6C92S_6qjIhvq4KlDp9o-a8wgTMOCBY5Qb-DYVL4WIKr533QtbLgebwLmxtnYscFrR_9fCEEbsvmw==';
const influxDB = new InfluxDB({url, token});

const name = `ecmaster`; //orgName

// org create
const orgsAPI = new OrgsAPI(influxDB);
const organization = await orgsAPI.postOrgs({body: {name}});
const orgID = organization.id;

//bucket create
const bucketsAPI = new BucketsAPI(influxDB);
const bucket = await bucketsAPI.postBuckets({body: {orgID, name }});

// user create
const usersAPI = new UsersAPI(influxDB);
const user = await usersAPI.postUsers({body: {name}});
const userID = user.id;
const password = uuid.v4();
await usersAPI.postUsersIDPassword({userID, body: {password}});
await orgsAPI.postOrgsIDOwners({ orgID, body: { id: user.id } }); // 当前orgID的所有者,拥有此org中所有资源的权限,没有org的write权限。 此用户登陆web界面创建的API Token分配的资源权限均是此org的。

//APIToken create
const authorizationsAPI = new AuthorizationsAPI(influxDB);
const apiToken = await authorizationsAPI.postAuthorizations({ body : {orgID, userID: user.id, permissions: [
  {
    "action": "read",
    "resource": {
      "orgID": `${orgID}`,
      "type": "authorizations"
    }
  },
  {
    "action": "read",
    "resource": {
      "orgID": `${orgID}`,
      "type": "buckets"
    }
  },
  {
    "action": "read",
    "resource": {
      "orgID": `${orgID}`,
      "type": "users"
    }
  },
  {
    "action": "write",
    "resource": {
      "orgID": `${orgID}`,
      "type": "buckets"
    }
  }
]}
});
API Token相关参数解释:
// name 可选: 如果设置了名称,则是该名称的资源的权限。如果未设置,则为该资源类型的所有资源的权限。
// id 可选: 如果设置了 ID,则是 该 ID 资源的权限。如果未设置,则为该资源类型的所有资源的权限。
//orgID 可选: 如果设置了 orgID,则这是该orgID组织拥有的所有资源的权限。如果未设置,则为该资源类型的所有资源的权限。
//type 必填: "authorizations" "buckets" "dashboards" "orgs" "sources" "tasks" "telegrafs" "users" "variables" "scrapers" "secrets" "labels" "views" "documents" "notificationRules" "notificationEndpoints" "checks" "dbrp" "notebooks" "annotations" "remotes" "replications"

const influxSetting= {
  org: organization.name,
  bucket: bucket.name,
  url,
  token: apiToken.token,
  username: user.name,
  password: password
}    
console.log(influxSetting);

清除influxdb org user APIToken
async function influxDBClear(){
  const url = `http://${influxHost}:${influxPort}`;
  const token = `${influxAdminToken}`;
  const influxDB = new InfluxDB({url, token});

  const name = `${serviceName}_ecmaster`;

  // org delete  删除org之后 bucket APIToken 无
  const orgsAPI = new OrgsAPI(influxDB);
  const orgs = (await orgsAPI.getOrgs({org: name})).orgs;//默认只显示20条。指定orgName检索
  for(const org of orgs){
    await orgsAPI.deleteOrgsID({orgID: org.id});
  }

  // user create
  const usersAPI = new UsersAPI(influxDB);
  const users = (await usersAPI.getUsers({name: username})).users;//默认只显示20条。
  // console.log(users, name);
  for(const user of users){
    await usersAPI.deleteUsersID({userID: user.id});
  }
  console.log('influxDB 清除完成');
}

flux函数库

Flux作为一个独立于InfluxDB的代码库,因为它将拥有自己的开源生命周期,无论你是否使用InfluxDB,它都是有用的。为此,我们采用了MIT许可,并接受将Flux与第三方系统进行集成的拉取请求,即使这些系统可能是InfluxDB的竞争对手。

每个 Flux 查询都需要以下内容:

1.数据源

from(bucket:"example-bucket")

  1. 时间范围

start 值和stop 值可以使用负持续时间是相对值,也可以是使用时间戳的绝对值。

|> range(start: -1h)

|> range(start: 2021-01-01T00:00:00Z, stop: 2021-01-01T12:00:00Z)

注意:influxdb事件默认是utc格式。 start:0 表示查所有数据

3.数据过滤器

|> filter(fn: (r) => (r._measurement == "cpu") and (r._field != "usage_system" ) )

过滤出cpu表且filed!=usage_system的数据

4 .开窗函数

|> aggregateWindow (every: 1d , fn: last ,createEmpty:true)  //开窗函数,按1天开窗,取1天内的last值。开窗处理后Row为一天的最后一条数据。createEmpty: 是否显示空数据行

开窗前:每个Row的_time为精确到毫秒时间戳

开窗后 :每个 Row的 _time 为精确到天 的时间戳

5.排序

|> sort(columns:["_time"], desc: true)

6.限制数量

|> limit(n:1)

7.yield函数
Flux 的yield()函数将过滤后的表作为查询结果输出。

Flux 会yield()在每个脚本的末尾自动假设一个函数,以便输出和可视化数据。yield()只有在同一个 Flux 查询中包含多个查询时,才需要显式调用。

from(bucket:"example-bucket")

|> range(start: -15m)

|> filter(fn: (r) =>   r._measurement == "cpu" )

|> yield()

完整demo:

from(bucket: "ecsmart")

|> range(start: -0, stop: -1ns) //start:0 表示查所有数据

|> filter(fn: (r) => r["_measurement"] == "正晨冷热源站")

|> filter(fn: (r) => r["variableID"] == "冷热源系统.冷热站.1号冷热站.室外温度")

|> drop(columns: ["_start","_stop","_field","device","deviceID","variable","variableID"])

|> aggregateWindow(every: 1d, fn: last)  //开窗函数,按1天开窗,取1天内的last值。开窗处理后Row为一天的最后一条数据, _time 精确到天

|> yield(name: "last")

自定义函数

普通函数

例如:

squared = (x) => x *x // square(x:3)  Returns 9

from(bucket: "ecsmart")

|> range(start: v.timeRangeStart, stop: v.timeRangeStop)

|> filter(fn: (r) => r["_measurement"] == "正晨冷热源站")

|> filter(fn: (r) => r["device"] == "1#C4用户侧循环水泵")

|> map(fn: (r) => ({ _value: squared(x: r._value) }))

转换函数 (<-)

转换是一个函数,它将流作为输入,对输入进行操作,然后输出新的流。

管道转发运算符 |> 将数据从先前的标识符或函数转发到转换函数中。

语法:

函数 x() 接收管道转发的数据并将其分配给 t 参数。

x = (t=<-) => t |> //...

例如:以下示例定义了一个 myFn 函数,该函数将每个输入行的 _value 列乘以 x 参数。 该示例使用 map() 函数遍历每一行,修改 _value,然后返回更新后的行

import "influxdata/influxdb/sample"

myFn = (tables=<-, x) =>

tables

|> map(fn: (r) => ({r with  _value: r._value * x}))

// 空气传感器样本数据

a = sample.data(set: "airSensor")

|> range(start: -1h)

|> filter(fn: (r) => r["_measurement"] == "airSensors")

|> keep(columns: ["_value", "sensor_id"])

|> myFn(x: 10.0)

influxQL 类sql查询

使用InfluxQL(一种类似SQL的查询语言)与InfluxDB交互,查询和分析您的时间序列数据。

比如:SELECT field_key FROM measurement_name

详细见官方教程: Query data with InfluxQL | InfluxDB OSS v2 Documentation