应用debezium将postgresql数据送至kafka(官网示例 本地docker部署)

news/2024/7/9 22:01:34 标签: postgresql, docker, debezium, kafka

版本

conncet 2.2
postgresql 15.2

postgresql_3">1 postgresql

1.1 获取

docker pull debezium/example-postgres

1.2 运行

docker run -d --name postgres -p 5432:5432 -e POSTGRES_PASSWORD=postgres debezium/example-postgres

1.3 特殊配置

观察镜像中/var/lib/postgresql/data下postgresql.conf,最关键的配置为:

shared_preload_libraries = 'decoderbufs'
wal_level = logical

2 zookeeper

2.1 获取

docker pull debezium/zookeeper

2.2 运行

docker run -it -d --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 debezium/zookeeper:latest

kafka_27">3 kafka

3.1 获取

docker pull debezium/kafka

3.2 运行

docker run -it -d --name kafka -p 9092:9092 --link zookeeper:zookeeper debezium/kafka:latest

3.3 检查

如果本地已有kafka客户端,可以使用如下命令查看已有topic:

bin/kafka-topics.sh --bootstrap-server 172.17.0.1:9092 --list

connect

4.1 获取

docker pull debezium/connect

4.2 启动

docker run -d --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses --link zookeeper:zookeeper --link kafka:kafka --link postgres:postgres debezium/connect

4.3 创建connect

创建pgsql-inventory-connector.json文件

{
  "name": "localhost-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "172.17.0.1",
    "database.port": "5432",
    "database.user": "postgres",
    "database.password": "postgres",
    "database.dbname" : "postgres",
    "topic.prefix": "dbserver1",
    "table.include.list": "inventory.customers"
  }
}

通过http创建connect

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" 172.17.0.1:8083/connectors/ -d @pgsql-inventory-connector.json

4.4 测试命令

curl -H "Accept:application/json" 172.17.0.1:8083
curl -H "Accept:application/json" 172.17.0.1:8083/connectors
curl -i -X GET -H "Accept:application/json" 172.17.0.1:8083/connectors/localhost-connector

5 测试

kafka_78">5.1 kafka

创建消费者

bin/kafka-console-consumer.sh --topic dbserver1.inventory.customers --from-beginning --bootstrap-server 172.17.0.1:9092

postgresql_83">5.2 postgresql

insert into inventory.customers values (1005,'aA','bB','aAbB@home.com');

kafka_87">5.3 kafka显示结果

{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"default":0,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"default":0,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.customers.Envelope","version":1},"payload":{"before":null,"after":{"id":1005,"first_name":"aA","last_name":"bB","email":"aAbB@home.com"},"source":{"version":"2.2.0.Alpha3","connector":"postgresql","name":"dbserver1","ts_ms":1687946054175,"snapshot":"false","db":"postgres","sequence":"[\"34244288\",\"34244576\"]","schema":"inventory","table":"customers","txId":758,"lsn":34244576,"xmin":null},"op":"c","ts_ms":1687946054536,"transaction":null}}

附录-图形界面

debeziumui_92">debezium-ui

docker run -d --name debezium-ui -p 8080:8080 -e KAFKA_CONNECT_URIS=http://172.17.0.1:8083 debezium/debezium-ui:latest

kakfa-ui

待研究


http://www.niftyadmin.cn/n/699030.html

相关文章

python 魔法函数学习

一:魔法函数特点 1、 魔法函数也就是魔术方法,是python定义的方法,不属于某个类,不是因为继承而拥有的方法,每一个类都可以添加魔法函数,以双下划线开头和双下划线结尾,例如__getitem__&#xf…

Vue - i18n 国际化的使用

1、下载 yarn add vue-i18n8.26.52、main.js 中引入 // 引入国际化 import VueI18n from vue-i18n Vue.use(VueI18n) const i18n {locale: zh-CN, // 语言标识fallbackLocale: zh-CN, //没有英文的时候默认中文语言silentFallbackWarn: true, //抑制警告//this.$i18n.local…

【Rust 基础篇】Rust引用详解

文章目录 引言一、什么是引用?二、不可变引用三、可变引用四、引用的规则五、引用的使用建议六、示例代码总结 引言 在Rust中,引用是一种轻量级的指向数据的方式,它允许我们在不获取所有权的情况下访问和操作数据。引用是Rust中处理借用操作…

Java 8 的Stream流那么强大,你知道它的原理吗?

Java 8 API添加了一个新的抽象称为流Stream,可以让你以一种声明的方式处理数据。 Stream 使用一种类似用 SQL 语句从数据库查询数据的直观方式来提供一种对 Java 集合运算和表达的高阶抽象。 Stream API可以极大提高Java程序员的生产力,让程序员写出高效…

如何入门深度学习

深度学习是一种机器学习的分支,它通过构建多层神经网络来实现对数据的自动分析和学习。深度学习已经在许多领域取得了重大的突破,如计算机视觉、自然语言处理、语音识别等。如果你想入门深度学习,以下是一些建议。 学习基础数学知识 深度学…

使用GDB调试C++程序

以下是GDB调试的常用流程和一些常用命令: 编译源代码:使用C编译器(如g)将源代码编译为可执行文件。例如,使用以下命令编译名为program.cpp的源代码文件: g -g program.cpp -o program启动GDB调试器&#xf…

LeetCode算法题---无重复字符的最长子串、寻找两个正序数组的中位数(三)

3. 无重复字符的最长子串 给定一个字符串 s ,请你找出其中不含有重复字符的 最长子串 的长度。 示例 1: 输入: s "abcabcbb" 输出: 3 解释: 因为无重复字符的最长子串是 "abc",所以其长度为 3。 示例 2: 输入: s "bbbb…

C++中线程的使用方法

在C中&#xff0c;可以使用以下两种方式创建线程&#xff1a; 使用pthread库 C中可以使用pthread库来创建线程。pthread库是一个开源的线程库&#xff0c;可以用来管理和创建线程。以下是一个使用pthread库创建线程的示例代码&#xff1a; #include <pthread.h> #includ…