PostgreSQL逻辑复制到kafka 4年前

kafka 安装

wget http://mirror.bit.edu.cn/apache/kafka/2.3.0/kafka_2.12-2.3.0.tgz

  cp kafka_2.12-2.0.1.tgz  kafka.tgz

      sudo tar xzvf kafka.tgz --directory=/opt/java/kafka --strip 1

启动 kafka需要先启动本地的 zookeeper,注意修改配置文件中zk****的连接地址

  /opt/kafka/bin/kafka-server-start.sh   /opt/kafka/config/server.properties

kafkacat 是一个C语言编写的 kafka 生产者、消费者程序。安装过程需要一些库可能需要手动下载。

postgres 逻辑解码, 程序 jsoncdc

jsoncdc 依赖于rust,****可能需要先安装 rust 或者可以使用 wal2json替代

编译好之后****本地目录下有 jsoncdc.so 或者 wal2json.so

postgres 安装解码插件:

vim  $PGDATA/postgresql.conf

shared_preload_libraries = 'jsoncdc.so'

安装完成插件

postgres 插入****数据

生产数据到****本地 kafka

/opt/bin/pgsql/pg_10/bin/pg_recvlogical -d postgres -S jsoncdc --start -o pretty-print=1 -f - | ./kafkacat/kafkacat -b 127.0.0.1:9092 -t pg

 Auto-selecting Producer mode (use -P or -C to override)

消费数据测试:

./kafkacat/kafkacat -b 127.0.0.1:9092 -t pg

kafka****自消费实验:

启动zk,我这边有zk服务器,因此不需要启动:

bin/zookeeper-server-start.sh config/zookeeper.properties &

启动kafka

bin/kafka-server-start.sh config/server.properties

在里面修改zk****连接信息

创建topic

bin/kafka-topics.sh --create --zookeeper --replication-factor 1 --partitions 1 --topic test

创建一个叫做“test”topic,它只有一个分区,一个副本。

> bin/kafka-topics.sh --create --zookeeper 10.9.5.20:4119,10.9.5.21:4119,10.9.5.22:4119,10.9.5.24:4119,10.9.5.35:4119,10.9.5.36:4119 --replication-factor 1 --partitions 1 --topic test

可以通过list命令查看创建的****topic:

> bin/kafka-topics.sh --list --zookeeper localhost:2181

test

运行producer并在控制台中输一些消息,这些消息将被发送到服务端:

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

This is a messageThis is another message

ctrl+c可以退出发送。

消费消息:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --consumer.config config/server.properties --topic test --from-beginning

接下来想写一个nodejs程序,将pg的变化动态回放到web中,显示出来。

水熊
|。・㉨・)っ♡ 喜欢你♪
5
发布数
1
关注者
2102
累计阅读

热门教程文档

Docker
62小节
Spring Boot
24小节
Djiango
17小节
爬虫
6小节
Redis
14小节
广告