PipelineDB可以把Kafka作为流的源,当然也可以通过SQL传消息给Kafka.
不过PipelineDB默认没有安装Kafka的接口,需要装extension,extension是Postgre的扩展,很多功能都可以通过扩展实现
1准备Kafka
按Kafka quick start 开始,见http://kafka.apache.org/quickstart
#安装
cd /postgre/Kafka
tar -xzf kafka_2.11-0.10.1.0.tgz
cd kafka_2.11-0.10.1.0
#启动zookeeper & Kafka
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties
#Create a topic一次就可以了
--bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
#put some message 在这里手工输入消息既可测试
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
#consumer test
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
2.安装pipeline_kafka extension
文档见http://docs.pipelinedb.com/integrations.html
#pipeline_kafka extension依赖librdkafka,需要先安装
cd /postgre/installfiles
git clone -b 0.9.1 https://github.com/edenhill/librdkafka.git ./librdkafka
cd ./librdkafka
./configure --prefix=/usr #注意测试过安装到其他目录,但是有问题
make
sudo make install
#vi ~bash_profile 需要把pipelinedb.conf所在目录放到PATH路径中
PATH=$PATH:/postgre/pipelinedb/v096/testpipe
export PATH
#install pipeline_kafka extension
cd /postgre/installfiles/
unzip pipeline_kafka-master.zip
cd pipeline_kafka-master
./configure #注意如果pipelinedb.conf不在路径中或 librdkafka没装,会报错
make
sudo make install
#配置pipelinedb.conf
#shared_preload_libraries = '' # (change requires restart)
shared_preload_libraries = pipeline_kafka
3.启动服务及注册EXTENSION
#按正常命令启动pipeline即可,EXTENSION会自动加载
pipeline-ctl -D /postgre/pipelinedb/v096/testpipe -l /postgre/pipelinedb/v096/logs/testpipe/pipelinedb.log start
#创建EXTENSION
CREATE EXTENSION pipeline_kafka;
#注册Kafka Broker
SELECT pipeline_kafka.add_broker('localhost:9092');
4.创建流及取数据
这个基本就和正常的stream类似了
#step 1
create stream kafka_stream ( project text);
#step 2 创建continuous view
create continuous view kafka_stats as
select count(*) as total_pages from kafka_stream;
#step 3 启动流 不需要手工insert流了
SELECT pipeline_kafka.consume_begin('test', 'kafka_stream');
#step 4 访问continuous view
select * from kafka_stats;
#step 5 stop consume if nessary
SELECT pipeline_kafka.consume_end('test', 'kafka_stream');
5.其他问题
postgre extentation安装问题,可能需要的系统 package
参考pg的文档 https://www.postgresql.org/docs/9.5/static/installation.html
yum -y install lrzsz sysstat e4fsprogs ntp readline-devel zlib zlib-devel openssl openssl-devel pam-devel libxml2-devel libxslt-devel Python-devel tcl-devel gcc make smartmontools flex bison perl perl-devel perl-ExtUtils* OpenIPMI-tools systemtap-sdt-devel