Flink Docker Compose mysql binlog 同步到es
系统ubuntu es版本7.18 flink:1.17.2
目录
mkdir -p /usr/project/flink/{conf,job,logs}
chmod -R 777 /usr/project/flink
#资源情况
mysql8.0 Elasticsearch7.18 购买的google云的
# 目录结构
/usr/project/flink/
/usr/project/flink/
├── conf/
│ ├── flink-conf.yaml
│ └── log4j2.xml
├── job/
│ ├── flink-connector-elasticsearch7-3.0.1-1.17.jar
│ ├── flink-connector-elasticsearch-base-3.0.1-1.17.jar
│ ├── flink-sql-connector-mysql-cdc-3.1.1.jar
│ └── win_user.sql
├── logs/
└── docker-compose.yml
docker-compose.yml
vim /usr/project/flink/docker-compose.yml
使用云es
services:
jobmanager:
image: flink:1.17.2
restart: always
container_name: flink-jobmanager
ports:
- "8081:8081"
command: jobmanager
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
mem_limit: 4g
cpu_shares: 1024
volumes:
- ./conf:/opt/flink/conf
- ./job:/opt/flink/job
- /usr/project/flink/logs:/opt/flink/log
networks:
- flink-network
taskmanager:
image: flink:1.17.2
restart: always
container_name: flink-taskmanager
depends_on:
- jobmanager
command: >
taskmanager
-XX:MaxMetaspaceSize=512m
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
mem_limit: 8g
cpu_shares: 1024
volumes:
- ./conf:/opt/flink/conf
- ./job:/opt/flink/job
- /usr/project/flink/logs:/opt/flink/log
networks:
- flink-network
volumes:
es_data:
networks:
flink-network:
driver: bridge
本地创建es kibana
version: '3.8'
services:
jobmanager:
image: flink:1.17.2
container_name: flink-jobmanager
ports:
- "8081:8081"
command: jobmanager
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
volumes:
- ./conf:/opt/flink/conf
- ./job:/opt/flink/job
- /usr/project/flink/logs:/opt/flink/log
taskmanager:
image: flink:1.17.2
container_name: flink-taskmanager
depends_on:
- jobmanager
command: taskmanager
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
volumes:
- ./conf:/opt/flink/conf
- ./job:/opt/flink/job
- /usr/project/flink/logs:/opt/flink/log
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:7.10.2
container_name: elasticsearch
environment:
- discovery.type=single-node
- ELASTIC_PASSWORD=123456
ports:
- "9200:9200"
- "9300:9300"
volumes:
- es_data:/usr/share/elasticsearch/data
networks:
- flink-network
kibana:
image: docker.elastic.co/kibana/kibana:7.10.2
container_name: kibana
environment:
- ELASTICSEARCH_URL=http://elasticsearch:9200
- ELASTICSEARCH_USERNAME=elastic
- ELASTICSEARCH_PASSWORD=123456
ports:
- "5601:5601"
networks:
- flink-network
volumes:
es_data:
networks:
flink-network:
driver: bridge
es验证
curl -u elastic:123456 http://localhost:9200
Flink SQL Job 示例
文件
/usr/project/flink/job/win_user.sql
存量增量模式
scan.startup.mode 设置为 'initial',以从表的初始状态开始读取数据,然后再进行增量同步
将其设置为 'latest-offset',以从最新的偏移量开始读取数据,实现增量同步
验证表是否成功创建
/opt/flink/bin/sql-client.sh embedded
SHOW TABLES;
SELECT * FROM source_win_user LIMIT 10;
#验证表是否成功创建 进入flink sql
/opt/flink/bin/sql-client.sh embedded
SHOW TABLES;
SELECT * FROM source_win_user LIMIT 10;
#验证表是否成功创建 进入flink sql
/opt/flink/bin/sql-client.sh embedded
SHOW TABLES;
SELECT * FROM source_win_user LIMIT 10;
配置模块
vim /usr/project/flink/job/win_user.sql
CREATE TABLE source_win_user (
id INT,
username STRING,
merchant_id INT,
avatar STRING,
fcoin DECIMAL(15,4),
coin_commission DECIMAL(15,4),
level_id TINYINT,
role TINYINT,
is_promoter TINYINT,
flag INT,
real_name STRING,
signature STRING,
birthday STRING,
area_code STRING,
mobile STRING,
email STRING,
sex TINYINT,
bind_bank TINYINT,
address STRING,
score INT,
promo_code STRING,
id_path STRING,
sup_uid_1 INT,
sup_username_1 STRING,
sup_uid_2 INT,
sup_uid_3 INT,
sup_uid_4 INT,
sup_uid_5 INT,
sup_uid_6 INT,
sup_uid_top INT,
sup_username_top STRING,
sup_level_top INT,
password_hash STRING,
password_coin STRING,
ip STRING,
third_login_type STRING,
ip_region STRING,
status TINYINT,
last_login_ip STRING,
last_login_ip_region STRING,
last_login_time INT,
last_login_device_id STRING,
created_at INT,
updated_at INT,
freeze_cause STRING,
freeze_at INT,
operator_name STRING,
fb_pid STRING,
fb_cid STRING,
created_name STRING,
memberType TINYINT,
google_sub_id STRING,
facebook_sub_id STRING,
secret STRING,
code_url STRING,
code_status TINYINT,
user_type TINYINT,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '127.0.0.1',
'port' = '3306',
'username' = 'main',
'password' = '123456',
'database-name' = 'main',
'table-name' = 'win_user',
'scan.startup.mode' = 'initial', -- 读取存量数据
'debezium.snapshot.mode' = 'never', -- 使用快照模式initial 增量模式never 增量模式
'scan.incremental.snapshot.enabled' = 'true' -- 启用增量同步
);
CREATE TABLE es_sink_table_win_user (
id INT,
username STRING,
merchant_id INT,
avatar STRING,
fcoin DECIMAL(15,4),
coin_commission DECIMAL(15,4),
level_id TINYINT,
role TINYINT,
is_promoter TINYINT,
flag INT,
real_name STRING,
signature STRING,
birthday STRING,
area_code STRING,
mobile STRING,
email STRING,
sex TINYINT,
bind_bank TINYINT,
address STRING,
score INT,
promo_code STRING,
id_path STRING,
sup_uid_1 INT,
sup_username_1 STRING,
sup_uid_2 INT,
sup_uid_3 INT,
sup_uid_4 INT,
sup_uid_5 INT,
sup_uid_6 INT,
sup_uid_top INT,
sup_username_top STRING,
sup_level_top INT,
password_hash STRING,
password_coin STRING,
ip STRING,
third_login_type STRING,
ip_region STRING,
status TINYINT,
last_login_ip STRING,
last_login_ip_region STRING,
last_login_time INT,
last_login_device_id STRING,
created_at INT,
updated_at INT,
freeze_cause STRING,
freeze_at INT,
operator_name STRING,
fb_pid STRING,
fb_cid STRING,
created_name STRING,
memberType TINYINT,
google_sub_id STRING,
facebook_sub_id STRING,
secret STRING,
code_url STRING,
code_status TINYINT,
user_type TINYINT,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'elasticsearch-7',
'hosts' = 'https://127.0.0.1:9243',
'username' = 'elastic',
'password' = '123456',
'index' = 'win_user', -- 确保索引名称与 Elasticsearch 中的匹配
'sink.bulk-flush.interval' = '1s',
'sink.bulk-flush.backoff.max-retries' = '3', -- 设置最大重试次数
'sink.bulk-flush.max-actions' = '100', -- 一条数据也会同步不等待
'sink.bulk-flush.max-size' = '1mb', -- 达到 1MB 或 200 条数据时批量 flush
'sink.bulk-flush.backoff.delay' = '100ms', -- 设置重试的延迟
'sink.bulk-flush.backoff.strategy' = 'constant' -- 重试策略
);
-- 3. 执行数据插入任务
INSERT INTO es_sink_table_win_user
SELECT * FROM source_win_user;
验证
/opt/flink/bin/sql-client.sh embedded
#验证
SHOW TABLES;
desc es_sink_table_win_user;
DROP TABLE IF EXISTS es_sink_table_win_user;
DROP TABLE IF EXISTS source_win_user;
# Flink 1.17 中,您可以使用以下命令查看已注册的连接器
SHOW TABLES;
#作业状态
SHOW JOBS;
#详情
EXPLAIN SELECT * FROM source_win_user;
SELECT * FROM source_win_user LIMIT 10;
优化配置 必须要配置
/opt/flink/bin/sql-client.sh embedded
#增加的 Session 全局配置(SET)
SET execution.checkpointing.interval = '1s';
SET restart-strategy = 'fixed-delay';
SET restart-strategy.fixed-delay.attempts = '3';
SET restart-strategy.fixed-delay.delay = '5s';
SET parallelism.default = 4;
SET state.backend = 'rocksdb';
SET state.backend.rocksdb.memory.managed = 'true';
SET execution.parallelism = 8;
#-- 提交作业时设置 Sink 的并行度提升
SET parallelism.default = 2;
#最高作业任务
SET execution.parallelism = 8;
#查看验证配置
SET;
Flink 日志配置
mkdir -p /usr/project/flink/conf
vim /usr/project/flink/conf/flink-conf.yaml
任务曹 就是比如你需要创建10个作业 必须设置大于10
taskmanager.numberOfTaskSlots: 10
jobmanager.rpc.address: jobmanager
taskmanager.numberOfTaskSlots: 10
jobmanager.memory.process.size: 1024m
taskmanager.memory.process.size: 2048m
rest.port: 8081
state.backend: rocksdb
state.backend.rocksdb.memory.managed: true
execution.checkpointing.interval: 5000ms
execution.checkpointing.tolerable-failed-checkpoints: 1
execution.checkpointing.externalized-checkpoints: RETAIN_ON_CANCELLATION
日志
vim /usr/project/flink/conf/log4j2.xml
<Configuration status="warn">
<Appenders>
<RollingFile name="RollingFile" fileName="/opt/flink/logs/flink.log"
filePattern="/opt/flink/logs/${date:yyyy-MM}/flink-%d{yyyy-MM-dd}.log">
<PatternLayout>
<Pattern>%d{yyyy-MM-dd HH:mm:ss} %-5level %logger{36} - %msg%n</Pattern>
</PatternLayout>
<Policies>
<SizeBasedTriggeringPolicy size="100MB"/>
</Policies>
</RollingFile>
</Appenders>
<Loggers>
<Root level="info">
<AppenderRef ref="RollingFile"/>
</Root>
</Loggers>
</Configuration>
连接器下载配置
flink-connector-elasticsearch包官方下载地址
https://repo1.maven.org/maven2/org/apache/flink/ 要选对版本 es7.17
flink-1.17.2
cd /usr/project/flink/job
#删除当前目录除win_user.sql其他的文件
find . -maxdepth 1 ! -name 'win_user.sql' ! -name '.' -type f -exec rm -f {} +
# MySQL CDC
wget https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-mysql-cdc/3.1.1/flink-sql-connector-mysql-cdc-3.1.1.jar
wget https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.4.1/flink-sql-connector-mysql-cdc-2.4.1.jar
# Elasticsearch
wget https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-elasticsearch7/3.0.1-1.17/flink-sql-connector-elasticsearch7-3.0.1-1.17.jar
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-elasticsearch-base/3.0.1-1.17/flink-connector-elasticsearch-base-3.0.1-1.17.jar
# 补充依赖
wget https://repo1.maven.org/maven2/org/apache/httpcomponents/httpclient/4.5.13/httpclient-4.5.13.jar
wget https://repo1.maven.org/maven2/org/apache/httpcomponents/httpcore/4.4.13/httpcore-4.4.13.jar
wget https://repo1.maven.org/maven2/commons-logging/commons-logging/1.2/commons-logging-1.2.jar
wget https://repo1.maven.org/maven2/commons-codec/commons-codec/1.11/commons-codec-1.11.jar
启动并运行作业
Es 先创建对应的index 这一步不需要 测试的时候可以用下
#先创建对应的index
curl -X PUT "https://127.0.0.1:9243/win_user" \
-u "elastic:123456" \
-H 'Content-Type: application/json' \
-d '{
"settings": {
"number_of_shards": 1,
"number_of_replicas": 1
},
"mappings": {
"properties": {
"id": { "type": "integer" },
"username": { "type": "keyword" },
"merchant_id": { "type": "integer" },
"avatar": { "type": "keyword" },
"fcoin": { "type": "double" },
"coin_commission": { "type": "double" },
"level_id": { "type": "byte" },
"role": { "type": "byte" },
"is_promoter": { "type": "byte" },
"flag": { "type": "integer" },
"real_name": { "type": "keyword" },
"signature": { "type": "text" },
"birthday": { "type": "keyword" },
"area_code": { "type": "keyword" },
"mobile": { "type": "keyword" },
"email": { "type": "keyword" },
"sex": { "type": "byte" },
"bind_bank": { "type": "byte" },
"address": { "type": "text" },
"score": { "type": "integer" },
"promo_code": { "type": "keyword" },
"id_path": { "type": "keyword" },
"sup_uid_1": { "type": "integer" },
"sup_username_1": { "type": "keyword" },
"sup_uid_2": { "type": "integer" },
"sup_uid_3": { "type": "integer" },
"sup_uid_4": { "type": "integer" },
"sup_uid_5": { "type": "integer" },
"sup_uid_6": { "type": "integer" },
"sup_uid_top": { "type": "integer" },
"sup_username_top": { "type": "keyword" },
"sup_level_top": { "type": "integer" },
"password_hash": { "type": "keyword" },
"password_coin": { "type": "keyword" },
"ip": { "type": "keyword" },
"third_login_type": { "type": "keyword" },
"ip_region": { "type": "keyword" },
"status": { "type": "byte" },
"last_login_ip": { "type": "keyword" },
"last_login_ip_region": { "type": "keyword" },
"last_login_time": { "type": "integer" },
"last_login_device_id": { "type": "keyword" },
"created_at": { "type": "integer" },
"updated_at": { "type": "integer" },
"freeze_cause": { "type": "text" },
"freeze_at": { "type": "integer" },
"operator_name": { "type": "keyword" },
"fb_pid": { "type": "keyword" },
"fb_cid": { "type": "keyword" },
"created_name": { "type": "keyword" },
"memberType": { "type": "byte" },
"google_sub_id": { "type": "keyword" },
"facebook_sub_id": { "type": "keyword" },
"secret": { "type": "keyword" },
"code_url": { "type": "keyword" },
"code_status": { "type": "byte" },
"user_type": { "type": "byte" }
}
}
}'
插入测试数据到es
curl -X POST "https://127.0.0.1:9243/win_user/_doc/" \
-u "elastic:123456" \
-H 'Content-Type: application/json' \
-d '{
"id": 12,
"username": "user1",
"merchant_id": 123,
"avatar": "avatar1",
"fcoin": 100.5,
"coin_commission": 5.2,
"level_id": 1,
"role": 2,
"is_promoter": 1,
"flag": 0,
"real_name": "User One",
"signature": "Sample signature",
"birthday": "1990-01-01",
"area_code": "12345",
"mobile": "1234567890",
"email": "user1@example.com",
"sex": 1,
"bind_bank": 1,
"address": "123 Street Name",
"score": 1000,
"promo_code": "PROMO124",
"id_path": "1/2/3",
"sup_uid_1": 1,
"sup_username_1": "sup1",
"password_hash": "hashed_password",
"password_coin": "coin_hash",
"ip": "192.168.1.1",
"third_login_type": "google",
"status": 1,
"last_login_ip": "192.168.1.1",
"last_login_time": 1672530000,
"created_at": 1672530000,
"updated_at": 1672530000,
"freeze_cause": "none",
"freeze_at": 0,
"operator_name": "admin",
"fb_pid": "fb12345",
"fb_cid": "fb67890",
"created_name": "admin",
"memberType": 1,
"google_sub_id": "google123",
"facebook_sub_id": "fb123",
"secret": "secret_code",
"code_url": "http://example.com",
"code_status": 1,
"user_type": 1
}'
启动 Flink 集群
cd /usr/project/flink
docker-compose down -v
docker-compose up -d
执行 SQL CLI
docker exec -it flink-jobmanager /bin/bash
# 在容器内执行
cd /opt/flink
#启动win_user 任务
/opt/flink/bin/sql-client.sh embedded -j /opt/flink/job/flink-sql-connector-elasticsearch7-3.0.1-1.17.jar -j /opt/flink/job/flink-connector-elasticsearch-base-3.0.1-1.17.jar -j /opt/flink/job/flink-sql-connector-mysql-cdc-2.4.1.jar -j /opt/flink/job/httpclient-4.5.13.jar -j /opt/flink/job/httpcore-4.4.13.jar -j /opt/flink/job/commons-codec-1.11.jar -j /opt/flink/job/commons-logging-1.2.jar -f /opt/flink/job/win_user.sql
#查看作业列表
./bin/flink list
#取消指定的作业
./bin/flink cancel <JobID>
./bin/flink cancel 7d3022995c94511d477c3be5d1794168
#任务详情
./bin/flink info 7d3022995c94511d477c3be5d1794168
#查看flink sql库表创建情况
SHOW TABLES;
#必须要发现里面有对应的表 才是成功的
Flink SQL> show tables;
+------------------------+
| table name |
+------------------------+
| es_sink_table_win_user |
| source_win_user |
+------------------------+
2 rows in set
#查看作业
show jobs;
es验证
如何快速确定手动插入的sql是否成功 我发现count 类增不会试试更新 kibana下面方法验证即可
GET win_user/_search
{
"query": {
"wildcard": {
"username": {
"value": "test001015"
}
}
}
}
下面的查询可能有延迟 不建议使用
#启动后宿主机查看是否写入
#查看index列表
curl -u 'elastic:123456' -X GET "https://127.0.0.1.io:9243/_cat/indices?v"
#查看win_user index
curl -u 'elastic:123456' -X GET "https://127.0.0.1:9243/_cat/indices/win_user?v"
#查看win_index索引结构
curl -u 'elastic:123456' -X GET "https://127.0.0.1:9243/win_user/_mapping"
Es调优
检查副本分片的分配问题
curl -u 'elastic:123456' -X GET "https://127.0.0.1:9243/_cat/shards?v"
重新分配副本分片
curl -u 'elastic:123456' -X POST "https://127.0.0.1:9243/_cluster/reroute" -H 'Content-Type: application/json' -d '{
"commands": [
{
"allocate_empty_primary": {
"index": "win_user",
"shard": 0,
"node": "node_name",
"accept_data_loss": true
}
}
]
}'
调整副本数配置
curl -u 'elastic:123456' -X PUT "https://127.0.0.1:9243/win_user/_settings" -H 'Content-Type: application/json' -d '{
"settings": {
"number_of_replicas": 0
}
}'
账户
prd
http://127.0.0.1:8081
cooper
cooper
mysql配置检查
mysql -h 127.0.0.1 -u root -p
123456
#root账户配置main
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'main'@'%';
FLUSH PRIVILEGES;
#Flink CDC 所需的最小权限
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'main'@'%';
#验证
MySQL [(none)]> SHOW GRANTS FOR 'main'@'%';
+------------------------------------------------------------------------------+
| Grants for main@% |
+------------------------------------------------------------------------------+
| GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO `main`@`%` |
| GRANT `cloudsqlsuperuser`@`%` TO `main`@`%`
SHOW VARIABLES LIKE 'log_bin'; -- 应为 ON
SHOW VARIABLES LIKE 'binlog_format'; -- 应为 ROW
SHOW VARIABLES LIKE 'binlog_row_image'; -- 应为 FULL
Elasticsearch数据验证
#真实生产连接地址
#账户
elastic
#密码
123456
#es url
127.0.0.1:9243
#测试命令
curl -u elastic:123456 -k https://127.0.0.1:9243
启动 Flink CDC
启动 Flink 集群
cd /usr/project/flink
#启动集群
docker-compose down -v && docker-compose up -d
#验证容器状态
docker ps -a | grep flink
# 在容器内执行
cd /opt/flink
/opt/flink/bin/sql-client.sh embedded -j /opt/flink/job/flink-sql-connector-elasticsearch7-3.0.1-1.17.jar -j /opt/flink/job/flink-connector-elasticsearch-base-3.0.1-1.17.jar -j /opt/flink/job/flink-sql-connector-mysql-cdc-2.4.1.jar -j /opt/flink/job/httpclient-4.5.13.jar -j /opt/flink/job/httpcore-4.4.13.jar -j /opt/flink/job/commons-codec-1.11.jar -j /opt/flink/job/commons-logging-1.2.jar -f /opt/flink/job/win_user.sql
#查看作业列表
./bin/flink list
#查看作业列表
./bin/flink list
#取消指定的作业
./bin/flink cancel <JobID>
./bin/flink cancel 7d3022995c94511d477c3be5d1794168
#宿主机查询作业jobid
curl http://localhost:8081/jobs/f54e8909ed2c1a19b49ed3788f6454fe
进入 Flink SQL CLI
#Flink jobmanager 容器
docker exec -it flink-jobmanager /bin/bash
#进入容器后,运行 Flink SQL 客户端来提交 SQL 作业
/opt/flink/bin/sql-client.sh
#提交 SQL 脚本任务
/opt/flink/bin/sql-client.sh --embedded --init-file /flink/sql/mysql_to_es.sql
#查看
/opt/flink/bin/flink list
#查看作业id
/opt/flink/bin/flink info <job_id>
#查看作业状态
http://127.0.0.1:8081/#/job/running
端口开放
8081
配置nignx安全登录
安装 htpasswd 工具
#安装nginx
sudo apt update
sudo apt install nginx
# 安装 htpasswd 工具
sudo apt-get install apache2-utils
#创建 .htpasswd 文件
sudo htpasswd -c /etc/nginx/.htpasswd cooper
#设置密码
123456
配置 Nginx 反向代理
假设你的 Flink Web UI 正在本地的 8081 端口运行,我们需要配置 Nginx 来做反向代理,并且开启基本认证。
打开并编辑 Nginx 配置文件,通常是
/etc/nginx/sites-available/default 或 /etc/nginx/nginx.conf,根据你的系统配置。
vim /etc/nginx/sites-available/default
设置独立nginx.conf
vim /etc/nginx/conf.d/flink.conf
server {
listen 18081; # Nginx 监听 8081 端口(外部访问)
server_name 127.0.0.1;
location / {
proxy_pass http://localhost:8081; # 将请求转发到 Flink Web UI 的实际端口
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
# 启用基本认证
auth_basic "Restricted Access";
auth_basic_user_file /etc/nginx/.htpasswd; # 指定密码文件
}
}
重启
sudo systemctl reload nginx
账户密码
sudo htpasswd -c /etc/nginx/.htpasswd cooper
#设置密码
123456
http://127.0.0.1:18081/#/overview
cooper
123456
验证同步
查看 Flink UI
访问 http://<你的服务器IP>:8081 查看任务运行状态。
在 Elasticsearch 查询数据
curl -u root:'123456' "http://127.0.0.1:9400/win_user/_search?pretty"
#账户
elastic
#密码
123456
#es url
127.0.0.1:9400
如果配置过程中还遇到其他问题可以联系博主