DolphinDB时间序列引擎:实时聚合计算
2026/5/23 2:29:58 网站建设 项目流程

目录

    • 摘要
    • 一、时间序列引擎概述
      • 1.1 什么是时间序列引擎
      • 1.2 时间序列引擎特点
      • 1.3 适用场景
    • 二、创建时间序列引擎
      • 2.1 基本语法
      • 2.2 创建简单引擎
      • 2.3 创建分组引擎
    • 三、窗口类型
      • 3.1 固定窗口
      • 3.2 滑动窗口
      • 3.3 时间对齐
    • 四、聚合指标
      • 4.1 基本聚合
      • 4.2 百分位聚合
      • 4.3 自定义聚合
      • 4.4 多列聚合
    • 五、引擎管理
      • 5.1 查看引擎状态
      • 5.2 删除引擎
      • 5.3 引擎监控
    • 六、实战案例
      • 6.1 设备实时监控
      • 6.2 实时告警系统
      • 6.3 数据降采样
    • 七、性能优化
      • 7.1 窗口大小选择
      • 7.2 分组数量
      • 7.3 内存管理
    • 八、总结
    • 参考资料

摘要

本文深入讲解DolphinDB时间序列引擎。从引擎原理到创建配置,从窗口聚合到滑动计算,从多设备分组到实时输出,全面介绍时间序列引擎的核心功能。通过丰富的代码示例,帮助读者掌握实时聚合计算的核心技能。


一、时间序列引擎概述

1.1 什么是时间序列引擎

时间序列引擎是DolphinDB内置的流计算引擎,用于实时时间窗口聚合:

时间序列引擎

数据流

时间窗口

窗口聚合

实时输出

核心功能

固定窗口

滑动窗口

会话窗口

1.2 时间序列引擎特点

特点说明
实时聚合毫秒级延迟
窗口计算支持多种窗口
分组聚合支持多设备分组
自动触发窗口结束自动输出

1.3 适用场景

场景说明
实时监控设备指标实时统计
告警检测实时阈值告警
趋势分析实时趋势计算
数据降采样高频数据降采样

二、创建时间序列引擎

2.1 基本语法

//创建时间序列引擎 agg=createTimeSeriesEngine("engine_name",//引擎名称 windowSize,//窗口大小(毫秒) metrics,//聚合指标 outputTable,//输出表 timeColumn,//时间列[keyColumn],//分组列(可选)[garbageSize],//垃圾回收阈值[roundTime]//时间对齐方式)

2.2 创建简单引擎

//创建输入流表 share streamTable(1:0,`timestamp`temperature,[TIMESTAMP,DOUBLE])asinput_stream//创建输出表 share table(1:0,`timestamp`avg_temp`max_temp`min_temp`cnt,[TIMESTAMP,DOUBLE,DOUBLE,DOUBLE,LONG])asoutput_table//创建时间序列引擎 agg=createTimeSeriesEngine("ts_engine",60000,//60秒窗口<[avg(temperature)asavg_temp,max(temperature)asmax_temp,min(temperature)asmin_temp,count(temperature)ascnt]>,output_table,`timestamp)//订阅流表 subscribeTable(,"input_stream","ts_agg",-1,agg,true)

2.3 创建分组引擎

//创建输入流表 share streamTable(1:0,`device_id`timestamp`temperature,[INT,TIMESTAMP,DOUBLE])asinput_stream//创建输出表 share table(1:0,`device_id`timestamp`avg_temp`max_temp`min_temp`cnt,[INT,TIMESTAMP,DOUBLE,DOUBLE,DOUBLE,LONG])asoutput_table//创建分组时间序列引擎 agg=createTimeSeriesEngine("grouped_ts_engine",60000,<[avg(temperature)asavg_temp,max(temperature)asmax_temp,min(temperature)asmin_temp,count(temperature)ascnt]>,output_table,`timestamp,`device_id)//订阅流表 subscribeTable(,"input_stream","grouped_ts_agg",-1,agg,true)

三、窗口类型

3.1 固定窗口

//固定窗口:窗口大小固定,不重叠//例如:每60秒一个窗口 agg=createTimeSeriesEngine("fixed_window",60000,<[avg(temperature)asavg_temp,count(*)ascnt]>,output_table,`timestamp,`device_id)//窗口划分://[00:00:00,00:01:00)->窗口1//[00:01:00,00:02:00)->窗口2//[00:02:00,00:03:00)->窗口3

3.2 滑动窗口

//滑动窗口:窗口按步长滑动//使用createTimeSeriesEngine的step参数 agg=createTimeSeriesEngine("sliding_window",60000,//窗口大小60<[avg(temperature)asavg_temp]>,output_table,`timestamp,`device_id,,,,10000)//step=10//窗口划分(每10秒输出一次)://[00:00:00,00:01:00)->00:01:00输出//[00:00:10,00:01:10)->00:01:10输出//[00:00:20,00:01:20)->00:01:20输出

3.3 时间对齐

//时间对齐:窗口边界对齐到整点时间 agg=createTimeSeriesEngine("aligned_window",60000,<[avg(temperature)asavg_temp]>,output_table,`timestamp,`device_id,,,true)//roundTime=true,时间对齐//对齐效果://数据时间00:00:30->窗口[00:00:00,00:01:00)//数据时间00:01:15->窗口[00:01:00,00:02:00)

四、聚合指标

4.1 基本聚合

//基本聚合函数 agg=createTimeSeriesEngine("basic_agg",60000,<[avg(temperature)asavg_temp,//平均值sum(temperature)assum_temp,//总和max(temperature)asmax_temp,//最大值min(temperature)asmin_temp,//最小值 count(temperature)ascnt,//计数 std(temperature)asstd_temp,//标准差 var(temperature)asvar_temp]>,//方差 output_table,`timestamp,`device_id)

4.2 百分位聚合

//百分位聚合 agg=createTimeSeriesEngine("percentile_agg",60000,<[percentile(temperature,50)asmedian,//中位数 percentile(temperature,95)asp95,//95分位 percentile(temperature,99)asp99]>,//99分位 output_table,`timestamp,`device_id)

4.3 自定义聚合

//自定义聚合指标 agg=createTimeSeriesEngine("custom_agg",60000,<[avg(temperature)asavg_temp,max(temperature)-min(temperature)asrange,//极差 std(temperature)/avg(temperature)ascv]>,//变异系数 output_table,`timestamp,`device_id)

4.4 多列聚合

//多列聚合 agg=createTimeSeriesEngine("multi_col_agg",60000,<[avg(temperature)asavg_temp,avg(humidity)asavg_humidity,avg(pressure)asavg_pressure,corr(temperature,humidity)ascorr_temp_humid]>,output_table,`timestamp,`device_id)

五、引擎管理

5.1 查看引擎状态

//查看引擎状态 getStreamEngineStat()//查看特定引擎 getStreamEngineStat("ts_engine")

5.2 删除引擎

//删除引擎 dropStreamEngine("ts_engine")//删除所有引擎 dropStreamEngine()

5.3 引擎监控

//引擎监控函数defmonitorEngine(){stat=getStreamEngineStat()for(rowinstat){print("引擎: "+row.name)print(" 状态: "+row.status)print(" 处理行数: "+string(row.processedRows))print(" 输出行数: "+string(row.outputRows))}}monitorEngine()

六、实战案例

6.1 设备实时监控

//==========1.创建流表==========share streamTable(100000:0,`device_id`timestamp`temperature`humidity`pressure,[INT,TIMESTAMP,DOUBLE,DOUBLE,DOUBLE])assensor_stream//==========2.创建输出表==========share table(1:0,`device_id`timestamp`avg_temp`max_temp`min_temp`avg_humidity`cnt,[INT,TIMESTAMP,DOUBLE,DOUBLE,DOUBLE,DOUBLE,LONG])asmonitor_result//==========3.创建时间序列引擎==========agg=createTimeSeriesEngine("monitor_engine",60000,//1分钟窗口<[avg(temperature)asavg_temp,max(temperature)asmax_temp,min(temperature)asmin_temp,avg(humidity)asavg_humidity,count(*)ascnt]>,monitor_result,`timestamp,`device_id)//==========4.订阅流表==========subscribeTable(,"sensor_stream","monitor_agg",-1,agg,true)//==========5.启用持久化==========enableTablePersistence(sensor_stream,true,true,1000000)//==========6.模拟数据写入==========defsimulateData(){for(iin1..100){sensor_stream.append!(table(take(1..100,1000)asdevice_id,take(now(),1000)astimestamp,rand(20.0..30.0,1000)astemperature,rand(40.0..60.0,1000)ashumidity,rand(1000.0..1020.0,1000)aspressure))sleep(100)}}//执行模拟 simulateData()//查看结果 select top20*frommonitor_result

6.2 实时告警系统

//==========1.创建流表==========share streamTable(100000:0,`device_id`timestamp`temperature,[INT,TIMESTAMP,DOUBLE])assensor_stream//==========2.创建输出表==========share table(1:0,`device_id`timestamp`avg_temp`alert,[INT,TIMESTAMP,DOUBLE,BOOL])asalert_result//==========3.创建告警引擎==========agg=createTimeSeriesEngine("alert_engine",30000,//30秒窗口<[avg(temperature)asavg_temp,avg(temperature)>30asalert]>,//温度>30告警 alert_result,`timestamp,`device_id)//==========4.订阅流表==========subscribeTable(,"sensor_stream","alert_agg",-1,agg,true)//==========5.查询告警==========select*fromalert_result where alert=true

6.3 数据降采样

//==========1.创建高频数据流表==========share streamTable(100000:0,`device_id`timestamp`value,[INT,TIMESTAMP,DOUBLE])ashigh_freq_stream//==========2.创建降采样输出表==========share table(1:0,`device_id`timestamp`open`high`low`close`cnt,[INT,TIMESTAMP,DOUBLE,DOUBLE,DOUBLE,DOUBLE,LONG])asohlc_result//==========3.创建降采样引擎==========agg=createTimeSeriesEngine("ohlc_engine",60000,//1分钟降采样<[first(value)asopen,//开盘max(value)ashigh,//最高min(value)aslow,//最低 last(value)asclose,//收盘 count(*)ascnt]>,//数量 ohlc_result,`timestamp,`device_id)//==========4.订阅流表==========subscribeTable(,"high_freq_stream","ohlc_agg",-1,agg,true)

七、性能优化

7.1 窗口大小选择

场景建议窗口大小
实时监控10-60秒
告警检测30-60秒
趋势分析1-5分钟
数据降采样1-10分钟

7.2 分组数量

//分组数量建议//单引擎分组数<10000//如果分组数过多,考虑://1.使用多个引擎//2.使用哈希分组

7.3 内存管理

//设置垃圾回收阈值 agg=createTimeSeriesEngine("ts_engine",60000,<[avg(temperature)asavg_temp]>,output_table,`timestamp,`device_id,100000)//garbageSize=100000,超过10万条触发GC

八、总结

本文详细介绍了DolphinDB时间序列引擎:

  1. 引擎原理:实时时间窗口聚合
  2. 创建方法:简单引擎、分组引擎
  3. 窗口类型:固定窗口、滑动窗口、时间对齐
  4. 聚合指标:基本聚合、百分位、自定义
  5. 引擎管理:状态查看、删除、监控
  6. 实战应用:实时监控、告警系统、数据降采样

思考题

  1. 如何选择合适的窗口大小?
  2. 固定窗口和滑动窗口有什么区别?
  3. 如何设计实时告警系统?

参考资料

  • DolphinDB时间序列引擎
  • DolphinDB流计算

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询