Electron 入门:Web 应用打包成桌面软件
2026/5/25 17:18:25
INSERT用于把查询结果或字面量数据写入目标表(sink 表)。在 Flink 里,执行 INSERT 会提交一个 Flink Job(流式作业通常是长期运行)。
executeSql()执行 INSERT 会立刻提交 Job,并返回TableResult,你可以通过它拿到 JobClient 查询状态。
TableEnvironmenttEnv=TableEnvironment.create(...);// source & sinktEnv.executeSql("CREATE TABLE Orders (`user` BIGINT, product VARCHAR, amount INT) WITH (...)");tEnv.executeSql("CREATE TABLE RubberOrders(product VARCHAR, amount INT) WITH (...)");// submit job immediatelyTableResultr1=tEnv.executeSql("INSERT INTO RubberOrders "+"SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");System.out.println(r1.getJobClient().get().getJobStatus());当你要把同一个源表数据分流写入多个 sink(比如 RubberOrders、GlassOrders),用StatementSet更合适:先addInsertSql()收集多条语句,最后execute()一次性提交。
tEnv.executeSql("CREATE TABLE GlassOrders(product VARCHAR, amount INT) WITH (...)");StatementSetstmtSet=tEnv.createStatementSet();stmtSet.addInsertSql("INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");stmtSet.addInsertSql("INSERT INTO GlassOrders SELECT product, amount FROM Orders WHERE product LIKE '%Glass%'");TableResultr2=stmtSet.execute();System.out.println(r2.getJobClient().get().getJobStatus());注意:
addInsertSql()每次只能接收一条 INSERT 语句(不要把多个 INSERT 拼一条字符串)。
[EXECUTE]INSERT{INTO|OVERWRITE }[catalog.][db.]table_name[PARTITIONpart_spec][column_list]select_statementINSERT OVERWRITE会覆盖目标表或目标分区已有数据。假设目标表是分区表:
CREATETABLEcountry_page_view(userSTRING,cntINT,dateSTRING,country STRING)PARTITIONEDBY(date,country)WITH(...);INSERTINTOcountry_page_viewPARTITION(date='2019-8-30',country='China')SELECTuser,cntFROMpage_view_source;INSERTINTOcountry_page_viewPARTITION(date='2019-8-30')SELECTuser,cnt,countryFROMpage_view_source;INSERTOVERWRITE country_page_viewPARTITION(date='2019-8-30',country='China')SELECTuser,cntFROMpage_view_source;INSERTOVERWRITE country_page_viewPARTITION(date='2019-8-30')SELECTuser,cnt,countryFROMpage_view_source;Flink 允许在 INSERT 前面加EXECUTE,用于强调“我要执行这条语句”,但语义上等价于不加。
EXECUTEINSERTINTOcountry_page_viewPARTITION(date='2019-8-30',country='China')SELECTuser,cntFROMpage_view_source;Flink 支持指定目标列列表,把 SELECT 的列按列表顺序写入指定列,未写到的列会被置为NULL(前提:该列可空)。
例:表T(a INT, b INT, c INT):
INSERTINTOT(c,b)SELECTx,yFROMS;含义是:
x写入cy写入ba被置为NULL(如果a允许为 NULL)对 connector/sink 开发者:可以通过
DynamicTableSink.Context.getTargetColumns()获取用户指定的目标列,决定如何处理“部分列更新”。
除了INSERT INTO ... SELECT ...,也可以直接写 values:
[EXECUTE]INSERT{INTO|OVERWRITE } table_nameVALUES(val1,val2,...),(val1,val2,...);示例:
CREATETABLEstudents(name STRING,ageINT,gpaDECIMAL(3,2))WITH(...);INSERTINTOstudentsVALUES('fred flintstone',35,1.28),('barney rubble',32,2.32);如果你在 SQL 层就想“一次提交多条 insert”,可以用:
EXECUTESTATEMENTSETBEGINinsert_statement;insert_statement;END;其中insert_statement可以是INSERT ... SELECT或INSERT ... VALUES。