这是PipelineDB这个工具介绍的最后一篇了,其他基础内容可以看一下前面的我的几个博客。
验证工作完成后接下来的工作,主要是在稳定性的具体功能的应用上了,那部分需要使用一阵再给大家介绍了,希望能够达到预期,不要像Oracle 物化视图一样^-^。
具体以一张零售小票的流的流转为示例 ,应用系统将数据以json格式传递到Kafka后,PipelineDB如何把从流上的数据保存到关系表中,并作初步统计。
实例内容:
Kafka数据的json的格式如下,目标就是把Posdetail信息流入表:
{"Posheader":[{"SHOPID":"STORE01","SALEDATE":"2016-06-15T11:36:57","AMOUNT":100.0000"Selected":false}],
"Posdetail":[{"SHOPID":"STORE01","SALENO":1,"PRODUCT":"product01","AMOUNT":60},{"SHOPID":"STORE01","SALENO":2,"PRODUCT":"product02","AMOUNT":40} ]
}
具体应用:
环境配置好之后是很简单的,而且postgresql集成了非常方便的json解析功能,应用整体步骤如下:
step 1:创建结果表
创建一个和Posdetail的结构一样的表,注意字段的大小写要和json一致,用引号
CREATE TABLE Posdetail
("SHOPID" VARCHAR(20 ) ,"SALENO" NUMERIC(18,1) ,"PRODUCT" NUMERIC(18,1),"PRODID" CHAR(11), "AMOUNT" NUMERIC(18,4));
step 2:创建流并启动流
create stream kafka_pos_stream ( pos_data json);
SELECT pipeline_kafka.consume_begin('jsontest', 'kafka_pos_stream');
step 3:创建流入结果集的函数
--continuous function for insert data
create or replace function cf_insert_posdtail()
returns trigger as
$$
begin
insert into pos_inc_bill_saledetail
select * from json_populate_recordset(null::pos_inc_bill_saledetail,
(select json_extract_path from json_extract_path(new.pos_data,'Billsaledetailinfo')));
return new;
end;
$$
language plpgsql;
--continuous view to link stream and function
create continuous transform ct_insert_posdetail as
select pos_data::json from kafka_pos_stream
then execute procedure cf_insert_postable_base();
step 4: 统计函数示例
create continuous view ct_pos_stream_stats as
select count(*) as total_pos_num from kafka_pos_stream;