基于PostgreSQL的流式计算数据库PipelineDB--以Kafka为实时消息源

浏览: 3273

PipelineDB可以把Kafka作为流的源,当然也可以通过SQL传消息给Kafka.

不过PipelineDB默认没有安装Kafka的接口,需要装extension,extension是Postgre的扩展,很多功能都可以通过扩展实现

1准备Kafka

按Kafka quick start 开始,见http://kafka.apache.org/quickstart

#安装
cd /postgre/Kafka
tar -xzf kafka_2.11-0.10.1.0.tgz
cd kafka_2.11-0.10.1.0
#启动zookeeper & Kafka
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties
#Create a topic一次就可以了
--bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
#put some message 在这里手工输入消息既可测试
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
#consumer test
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

2.安装pipeline_kafka extension

文档见http://docs.pipelinedb.com/integrations.html

#pipeline_kafka extension依赖librdkafka,需要先安装 
cd  /postgre/installfiles
git clone -b 0.9.1 https://github.com/edenhill/librdkafka.git ./librdkafka
cd ./librdkafka
./configure --prefix=/usr    #注意测试过安装到其他目录,但是有问题
make
sudo make install
#vi ~bash_profile  需要把pipelinedb.conf所在目录放到PATH路径中
PATH=$PATH:/postgre/pipelinedb/v096/testpipe
export PATH
#install  pipeline_kafka extension
cd /postgre/installfiles/
unzip pipeline_kafka-master.zip
cd pipeline_kafka-master
./configure    #注意如果pipelinedb.conf不在路径中或 librdkafka没装,会报错
make
sudo make install
#配置pipelinedb.conf
#shared_preload_libraries = ''          # (change requires restart)
shared_preload_libraries = pipeline_kafka

3.启动服务及注册EXTENSION

#按正常命令启动pipeline即可,EXTENSION会自动加载
pipeline-ctl -D /postgre/pipelinedb/v096/testpipe -l /postgre/pipelinedb/v096/logs/testpipe/pipelinedb.log start
#创建EXTENSION
CREATE EXTENSION pipeline_kafka;
#注册Kafka Broker
SELECT pipeline_kafka.add_broker('localhost:9092');

4.创建流及取数据

这个基本就和正常的stream类似了

#step 1
create stream kafka_stream ( project text);
#step 2 创建continuous view
create continuous view kafka_stats as                                                                 
select count(*) as total_pages from kafka_stream;
#step 3  启动流 不需要手工insert流了
SELECT pipeline_kafka.consume_begin('test', 'kafka_stream');
#step 4 访问continuous view
select * from kafka_stats;
 #step 5  stop consume if nessary
SELECT pipeline_kafka.consume_end('test', 'kafka_stream');

5.其他问题

postgre extentation安装问题,可能需要的系统 package

参考pg的文档 https://www.postgresql.org/docs/9.5/static/installation.html

yum -y install lrzsz sysstat e4fsprogs ntp readline-devel zlib zlib-devel openssl openssl-devel pam-devel libxml2-devel libxslt-devel Python-devel tcl-devel gcc make smartmontools flex bison perl perl-devel perl-ExtUtils* OpenIPMI-tools systemtap-sdt-devel
推荐 1
本文由 seng 创作,采用 知识共享署名-相同方式共享 3.0 中国大陆许可协议 进行许可。
转载、引用前需联系作者,并署名作者且注明文章出处。
本站文章版权归原作者及原出处所有 。内容为作者个人观点, 并不代表本站赞同其观点和对其真实性负责。本站是一个个人学习交流的平台,并不用于任何商业目的,如果有任何问题,请及时联系我们,我们将根据著作权人的要求,立即更正或者删除有关内容。本站拥有对此声明的最终解释权。

0 个评论

要回复文章请先登录注册