Continuous transforms 可以进行数据的转换,数据是不进行存储,主要是可以加入到其他的stream pipeline 中,或者写到其他外部
存储中,和存储过程结合使用,当前默认内置一个pipeline_stream_insert方便数据写入其他strem 注意不支持聚合操作docker-compose
version: '3.6'services: postgres: image: pipelinedb/pipelinedb ports: - "5432:5432"
参考语法
CREATE CONTINUOUS TRANSFORM name AS query [ THEN EXECUTE PROCEDURE function_name ( arguments ) ]query 查询说明SELECT expression [ [ AS ] output_name ] [, ...] [ FROM from_item [, ...] ] [ WHERE condition ] [ GROUP BY expression [, ...] ]where any expression in the SELECT statement can't contain an aggregate andfrom_item can be one of: stream_name [ [ AS ] alias [ ( column_alias [, ...] ) ] ] table_name [ [ AS ] alias [ ( column_alias [, ...] ) ] ] from_item [ NATURAL ] join_type from_item [ ON join_condition ]
Continuous transforms 输出流
Continuous transforms 输出流,可以方便其他transforms或者Continuous view 读取
- 参考
创建 CONTINUOUS TRANSFORMCREATE CONTINUOUS TRANSFORM t AS SELECT t.y FROM some_stream s JOIN some_table t ON s.x = t.x;使用CREATE CONTINUOUS VIEW v AS SELECT sum(y) FROM output_of('t');
参考例子
- 创建两个stream
CREATE STREAM mystream3 (x integer, y integer);CREATE STREAM mystream4 (x integer, y integer);
- 创建CONTINUOUS VIEW
CREATE CONTINUOUS VIEW v4 AS SELECT x,y FROM mystream3 ;CREATE CONTINUOUS VIEW v5 AS SELECT x,y FROM mystream4 ;
- 创建CONTINUOUS TRANSFORM
当insert 到mystream3 中的x为偶数的时候执行插入mystream4
CREATE CONTINUOUS TRANSFORM t3 AS SELECT x::int, y::int FROM mystream3 WHERE mod(x, 2) = 0 THEN EXECUTE PROCEDURE pipeline_stream_insert('mystream4');
- 数据插入&& 查询结果 x 插入数据为1奇数
insert into mystream3(x,y) values(1,2);select * from v4;select * from v5;
insert into mystream3(x,y) values(2,5);select * from v4;select * from v5;
- 使用CONTINUOUS TRANSFORM 的output steam
CREATE CONTINUOUS VIEW v6 AS SELECT x,y FROM output_of('t3') ;
插入数据&&查询
insert into mystream3(x,y) values(4,7);
select * from v6;