基于PostgreSQL的流式计算数据库PipelineDB-Kafka应用的实时零售小票实例

浏览: 2724

这是PipelineDB这个工具介绍的最后一篇了,其他基础内容可以看一下前面的我的几个博客。

验证工作完成后接下来的工作,主要是在稳定性的具体功能的应用上了,那部分需要使用一阵再给大家介绍了,希望能够达到预期,不要像Oracle 物化视图一样^-^。

具体以一张零售小票的流的流转为示例 ,应用系统将数据以json格式传递到Kafka后,PipelineDB如何把从流上的数据保存到关系表中,并作初步统计。

实例内容:

Kafka数据的json的格式如下,目标就是把Posdetail信息流入表:

{"Posheader":[{"SHOPID":"STORE01","SALEDATE":"2016-06-15T11:36:57","AMOUNT":100.0000"Selected":false}],
 "Posdetail":[{"SHOPID":"STORE01","SALENO":1,"PRODUCT":"product01","AMOUNT":60},{"SHOPID":"STORE01","SALENO":2,"PRODUCT":"product02","AMOUNT":40} ]
}

具体应用:

环境配置好之后是很简单的,而且postgresql集成了非常方便的json解析功能,应用整体步骤如下:

step 1:创建结果表

创建一个和Posdetail的结构一样的表,注意字段的大小写要和json一致,用引号

CREATE TABLE Posdetail
("SHOPID" VARCHAR(20 ) ,"SALENO" NUMERIC(18,1) ,"PRODUCT" NUMERIC(18,1),"PRODID" CHAR(11), "AMOUNT" NUMERIC(18,4));

step 2:创建流并启动流

create stream kafka_pos_stream ( pos_data json);
SELECT pipeline_kafka.consume_begin('jsontest', 'kafka_pos_stream');

step 3:创建流入结果集的函数

--continuous function  for insert data
create or replace function cf_insert_posdtail()
  returns trigger as
  $$
  begin
    insert into pos_inc_bill_saledetail
      select * from json_populate_recordset(null::pos_inc_bill_saledetail,
(select json_extract_path from json_extract_path(new.pos_data,'Billsaledetailinfo')));
    return new;
  end;
  $$
  language plpgsql;
--continuous view to link stream and function
create continuous transform ct_insert_posdetail as
  select pos_data::json from kafka_pos_stream
  then execute procedure cf_insert_postable_base();

step 4: 统计函数示例

create continuous view ct_pos_stream_stats as 
select count(*) as total_pos_num from kafka_pos_stream;
推荐 1
本文由 seng 创作,采用 知识共享署名-相同方式共享 3.0 中国大陆许可协议 进行许可。
转载、引用前需联系作者,并署名作者且注明文章出处。
本站文章版权归原作者及原出处所有 。内容为作者个人观点, 并不代表本站赞同其观点和对其真实性负责。本站是一个个人学习交流的平台,并不用于任何商业目的,如果有任何问题,请及时联系我们,我们将根据著作权人的要求,立即更正或者删除有关内容。本站拥有对此声明的最终解释权。

0 个评论

要回复文章请先登录注册