自由能原理与AI感知:为什么“预测“才是智能的本质
2026/5/23 3:34:17
本文深入讲解DolphinDB时间序列引擎。从引擎原理到创建配置,从窗口聚合到滑动计算,从多设备分组到实时输出,全面介绍时间序列引擎的核心功能。通过丰富的代码示例,帮助读者掌握实时聚合计算的核心技能。
时间序列引擎是DolphinDB内置的流计算引擎,用于实时时间窗口聚合:
| 特点 | 说明 |
|---|---|
| 实时聚合 | 毫秒级延迟 |
| 窗口计算 | 支持多种窗口 |
| 分组聚合 | 支持多设备分组 |
| 自动触发 | 窗口结束自动输出 |
| 场景 | 说明 |
|---|---|
| 实时监控 | 设备指标实时统计 |
| 告警检测 | 实时阈值告警 |
| 趋势分析 | 实时趋势计算 |
| 数据降采样 | 高频数据降采样 |
//创建时间序列引擎 agg=createTimeSeriesEngine("engine_name",//引擎名称 windowSize,//窗口大小(毫秒) metrics,//聚合指标 outputTable,//输出表 timeColumn,//时间列[keyColumn],//分组列(可选)[garbageSize],//垃圾回收阈值[roundTime]//时间对齐方式)//创建输入流表 share streamTable(1:0,`timestamp`temperature,[TIMESTAMP,DOUBLE])asinput_stream//创建输出表 share table(1:0,`timestamp`avg_temp`max_temp`min_temp`cnt,[TIMESTAMP,DOUBLE,DOUBLE,DOUBLE,LONG])asoutput_table//创建时间序列引擎 agg=createTimeSeriesEngine("ts_engine",60000,//60秒窗口<[avg(temperature)asavg_temp,max(temperature)asmax_temp,min(temperature)asmin_temp,count(temperature)ascnt]>,output_table,`timestamp)//订阅流表 subscribeTable(,"input_stream","ts_agg",-1,agg,true)//创建输入流表 share streamTable(1:0,`device_id`timestamp`temperature,[INT,TIMESTAMP,DOUBLE])asinput_stream//创建输出表 share table(1:0,`device_id`timestamp`avg_temp`max_temp`min_temp`cnt,[INT,TIMESTAMP,DOUBLE,DOUBLE,DOUBLE,LONG])asoutput_table//创建分组时间序列引擎 agg=createTimeSeriesEngine("grouped_ts_engine",60000,<[avg(temperature)asavg_temp,max(temperature)asmax_temp,min(temperature)asmin_temp,count(temperature)ascnt]>,output_table,`timestamp,`device_id)//订阅流表 subscribeTable(,"input_stream","grouped_ts_agg",-1,agg,true)//固定窗口:窗口大小固定,不重叠//例如:每60秒一个窗口 agg=createTimeSeriesEngine("fixed_window",60000,<[avg(temperature)asavg_temp,count(*)ascnt]>,output_table,`timestamp,`device_id)//窗口划分://[00:00:00,00:01:00)->窗口1//[00:01:00,00:02:00)->窗口2//[00:02:00,00:03:00)->窗口3//滑动窗口:窗口按步长滑动//使用createTimeSeriesEngine的step参数 agg=createTimeSeriesEngine("sliding_window",60000,//窗口大小60秒<[avg(temperature)asavg_temp]>,output_table,`timestamp,`device_id,,,,10000)//step=10秒//窗口划分(每10秒输出一次)://[00:00:00,00:01:00)->00:01:00输出//[00:00:10,00:01:10)->00:01:10输出//[00:00:20,00:01:20)->00:01:20输出//时间对齐:窗口边界对齐到整点时间 agg=createTimeSeriesEngine("aligned_window",60000,<[avg(temperature)asavg_temp]>,output_table,`timestamp,`device_id,,,true)//roundTime=true,时间对齐//对齐效果://数据时间00:00:30->窗口[00:00:00,00:01:00)//数据时间00:01:15->窗口[00:01:00,00:02:00)//基本聚合函数 agg=createTimeSeriesEngine("basic_agg",60000,<[avg(temperature)asavg_temp,//平均值sum(temperature)assum_temp,//总和max(temperature)asmax_temp,//最大值min(temperature)asmin_temp,//最小值 count(temperature)ascnt,//计数 std(temperature)asstd_temp,//标准差 var(temperature)asvar_temp]>,//方差 output_table,`timestamp,`device_id)//百分位聚合 agg=createTimeSeriesEngine("percentile_agg",60000,<[percentile(temperature,50)asmedian,//中位数 percentile(temperature,95)asp95,//95分位 percentile(temperature,99)asp99]>,//99分位 output_table,`timestamp,`device_id)//自定义聚合指标 agg=createTimeSeriesEngine("custom_agg",60000,<[avg(temperature)asavg_temp,max(temperature)-min(temperature)asrange,//极差 std(temperature)/avg(temperature)ascv]>,//变异系数 output_table,`timestamp,`device_id)//多列聚合 agg=createTimeSeriesEngine("multi_col_agg",60000,<[avg(temperature)asavg_temp,avg(humidity)asavg_humidity,avg(pressure)asavg_pressure,corr(temperature,humidity)ascorr_temp_humid]>,output_table,`timestamp,`device_id)//查看引擎状态 getStreamEngineStat()//查看特定引擎 getStreamEngineStat("ts_engine")//删除引擎 dropStreamEngine("ts_engine")//删除所有引擎 dropStreamEngine()//引擎监控函数defmonitorEngine(){stat=getStreamEngineStat()for(rowinstat){print("引擎: "+row.name)print(" 状态: "+row.status)print(" 处理行数: "+string(row.processedRows))print(" 输出行数: "+string(row.outputRows))}}monitorEngine()//==========1.创建流表==========share streamTable(100000:0,`device_id`timestamp`temperature`humidity`pressure,[INT,TIMESTAMP,DOUBLE,DOUBLE,DOUBLE])assensor_stream//==========2.创建输出表==========share table(1:0,`device_id`timestamp`avg_temp`max_temp`min_temp`avg_humidity`cnt,[INT,TIMESTAMP,DOUBLE,DOUBLE,DOUBLE,DOUBLE,LONG])asmonitor_result//==========3.创建时间序列引擎==========agg=createTimeSeriesEngine("monitor_engine",60000,//1分钟窗口<[avg(temperature)asavg_temp,max(temperature)asmax_temp,min(temperature)asmin_temp,avg(humidity)asavg_humidity,count(*)ascnt]>,monitor_result,`timestamp,`device_id)//==========4.订阅流表==========subscribeTable(,"sensor_stream","monitor_agg",-1,agg,true)//==========5.启用持久化==========enableTablePersistence(sensor_stream,true,true,1000000)//==========6.模拟数据写入==========defsimulateData(){for(iin1..100){sensor_stream.append!(table(take(1..100,1000)asdevice_id,take(now(),1000)astimestamp,rand(20.0..30.0,1000)astemperature,rand(40.0..60.0,1000)ashumidity,rand(1000.0..1020.0,1000)aspressure))sleep(100)}}//执行模拟 simulateData()//查看结果 select top20*frommonitor_result//==========1.创建流表==========share streamTable(100000:0,`device_id`timestamp`temperature,[INT,TIMESTAMP,DOUBLE])assensor_stream//==========2.创建输出表==========share table(1:0,`device_id`timestamp`avg_temp`alert,[INT,TIMESTAMP,DOUBLE,BOOL])asalert_result//==========3.创建告警引擎==========agg=createTimeSeriesEngine("alert_engine",30000,//30秒窗口<[avg(temperature)asavg_temp,avg(temperature)>30asalert]>,//温度>30告警 alert_result,`timestamp,`device_id)//==========4.订阅流表==========subscribeTable(,"sensor_stream","alert_agg",-1,agg,true)//==========5.查询告警==========select*fromalert_result where alert=true//==========1.创建高频数据流表==========share streamTable(100000:0,`device_id`timestamp`value,[INT,TIMESTAMP,DOUBLE])ashigh_freq_stream//==========2.创建降采样输出表==========share table(1:0,`device_id`timestamp`open`high`low`close`cnt,[INT,TIMESTAMP,DOUBLE,DOUBLE,DOUBLE,DOUBLE,LONG])asohlc_result//==========3.创建降采样引擎==========agg=createTimeSeriesEngine("ohlc_engine",60000,//1分钟降采样<[first(value)asopen,//开盘max(value)ashigh,//最高min(value)aslow,//最低 last(value)asclose,//收盘 count(*)ascnt]>,//数量 ohlc_result,`timestamp,`device_id)//==========4.订阅流表==========subscribeTable(,"high_freq_stream","ohlc_agg",-1,agg,true)| 场景 | 建议窗口大小 |
|---|---|
| 实时监控 | 10-60秒 |
| 告警检测 | 30-60秒 |
| 趋势分析 | 1-5分钟 |
| 数据降采样 | 1-10分钟 |
//分组数量建议//单引擎分组数<10000//如果分组数过多,考虑://1.使用多个引擎//2.使用哈希分组//设置垃圾回收阈值 agg=createTimeSeriesEngine("ts_engine",60000,<[avg(temperature)asavg_temp]>,output_table,`timestamp,`device_id,100000)//garbageSize=100000,超过10万条触发GC本文详细介绍了DolphinDB时间序列引擎:
思考题: