无侵入式的mysql的binlog采集——maxwell采集binlog放到kafka中——成功! - Go语言中文社区

无侵入式的mysql的binlog采集——maxwell采集binlog放到kafka中——成功!


调研过flume,目前采用datax,但是都是具有侵入式,即使再增量也会影响服务器性能,详细藐视可以查看我以前的文章。

 

调研flume、cannal、outter、maxwell

最后无侵入式的,实时的。

主要区别:
1、虽然Maxwell不能直接支持HA,但是它支持断点还原,即错误解决后重启继续上次点儿读取数据。
2、Canal是服务端,数据过来了并不能直接写出去,需要一个客户端:syncClient去获取数据
Maxwell即是服务端也是客户端。
3、Maxwell支持Bootstrap,即刷全量的数据,而Canal不支持。
4、Maxwell只支持Json,而Canel数据格式自由

个人选择Maxwell:
a、服务端和客户端是一体的
b、Maxwell是轻量级的,出错风险低,Canal经常出错
c、虽然部署的是单台,但是具有断点还原能力,出错好解决
d、Maxwell代码质量非常好,且社区更加的活跃
 

 

首先来看下下载与安装配置。

 

Download:
https://github.com/zendesk/maxwell/releases/download/v1.22.1/maxwell-1.22.1.tar.gz 
Source:
https://github.com/zendesk/maxwell 

 

来看下mysql的配置。

#开启binlog
#修改my.cnf配置文件 增加如下内容
[root@node2 /root]# vim /etc/my.cnf

[mysqld]
#binlog文件保存目录及binlog文件名前缀
#binlog文件保存目录: /var/lib/mysql/
#binlog文件名前缀: mysql-binlog
#mysql向文件名前缀添加数字后缀来按顺序创建二进制日志文件 如mysql-binlog.000006 mysql-binlog.000007
log-bin=/var/lib/mysql/mysql-binlog
#选择基于行的日志记录方式
binlog-format=ROW
#服务器 id
#binlog数据中包含server_id,标识该数据是由那个server同步过来的
server_id=1
 

 

 

 

这里不要直接用,各位看客老爷可以稍微改改配置嘛,对吧。

CREATE USER 'maxwell_sync'@'%' IDENTIFIED BY 'Ezhiyang2019!';
create database maxwell DEFAULT CHARSET utf8 COLLATE utf8_general_ci;
GRANT ALL on maxwell.* to 'maxwell_sync'@'%';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'maxwell_sync'@'%';
FLUSH PRIVILEGES;

 

 

mysql建表

create database test_maxwell;
use test_maxwell;
create table if not exists `user_info`(
   `userid` int,
   `name` varchar(100),
   `age` int
)engine=innodb default charset=utf8;
 

 

 

下载解压maxwell

[root@node2 /data/software]# wget https://github.com/zendesk/maxwell/releases/download/v1.17.1/maxwell-1.17.1.tar.gz

[root@node2 /data/software]# tar -zxvf maxwell-1.17.1.tar.gz

 

 

 

 

 

启动maxwell

#输入来源于mysql binlog 
#输出到kafka
#配置说明
#1)kafka_topic 
#可配置成如 namespace_%{database}_%{table} %{database} 和 %{table}会被替换成真正的值
#2)kafka_version 
#注意和kafka版本匹配。
#3)额外配置 
#kafka.acks、kafka.compression.type、kafka.retries
#4)filter
#可排除库、表、过滤掉某些行。也可用一段js灵活处理数据 
#如 exclude: test_maxwell.user_info.userid = 1 排除test_maxwell库user_info表userid值为1的行
#5)monitor
#可配置的监控方式jmx、http等
#http_bind_address 监控绑定的IP
#http_port 监控绑定的Port
#http_path_prefix http请求前缀

[root@node2 /data/software/maxwell-1.17.1]# bin/maxwell 
--host='localhost' 
--port=3306 
--user='maxwell_sync' 
--password='maxwell_sync_1' 
--filter='exclude: *.*,include:test_maxwell.user_info,exclude: test_maxwell.user_info.userid = 1' 
--producer=kafka 
--kafka_version='0.11.0.1' 
--kafka.bootstrap.servers='node1:6667,node2:6667,node3:6667' 
--kafka_topic=qaTopic 
--metrics_type=http 
--metrics_jvm=true 
--http_bind_address=node2 
--http_port=8090 
--http_path_prefix=db_test_maxwell

#输出到控制台用如下配置

[root@node2 /data/software/maxwell-1.17.1]# bin/maxwell 
--host='localhost' 
--port=3306 
--user='maxwell_sync' 
--password='maxwell_sync_1' 
--producer=stdout

 

 

 

所以,来操作。


创建主题
kafka-topics  --create --zookeeper pro-app-174:2181,pro-app-175:2181,pro-app-176:2181 --replication-factor 3 --partitions 1  --topic  housekeeper_realtime


kafka-topics --list --zookeeper pro-app-174:2181


bin/maxwell  --host='pro-app-174'  --port=3306  --user='root'  --password='Ezhiyang2019!'  --filter='exclude: *.*,include:maxwell.user_info,exclude:maxwell.user_info.userid = 1'  --producer=kafka  --kafka_version='4.0.0'  --kafka.bootstrap.servers='pro-app-174:9092,pro-app-175:9092,pro-app-176:9092'  --kafka_topic=housekeeper_realtime  --metrics_type=http  --metrics_jvm=true  --http_bind_address=pro-app-174  --http_port=8090  --http_path_prefix=db_maxwell


报错版本太高了。
Error: No matches for kafka version: 4.0.0


控制台:
Error: No matches for kafka version: 4.0.0
Supported versions:
 - 0.10.0.1
 - 0.10.2.1
 - 0.11.0.1
 - 0.8.2.2
 - 0.9.0.1
 - 1.0.0


来看看解决方案

 

 

 

我们只能身高maxwell的版本了。

来看看解决方案

https://github.com/zendesk/maxwell/releases

来这里找一下高版本的maxwell,到了1.22了,不知道能不能用。

下载解压maxwell

tar -zxvf maxwell-

 

 

 

发现还是不行,要么我的卡夫卡版本其实没有那么高。

还有个办法,我很聪明的把kafka的配置改成了1,哈哈。果然可以了,来看一下日志。

 

[root@pro-app-174 maxwell-1.22.1]# bin/maxwell  --host='pro-app-174'  --port=3306  --user='root'  --password='Ezhiyang2019!'  --filte
r='exclude: *.*,include:maxwell.user_info,exclude:maxwell.user_info.userid = 1'  --producer=kafka  --kafka_version='1.0.0'  --kafka.bootstrap.servers='pro-app-174:9092,pro-app-175:9092,pro-app-176:9092'  --kafka_topic=housekeeper_realtime  --metrics_type=http  --metrics_jvm=true  --http_bind_address=pro-app-174  --http_port=8090  --http_path_prefix=db_maxwellUsing kafka version: 1.0.0
17:44:32,032 INFO  SchemaStoreSchema - Creating maxwell database
17:44:32,297 INFO  ProducerConfig - ProducerConfig values: 
	acks = 1
	batch.size = 16384
	bootstrap.servers = [pro-app-174:9092, pro-app-175:9092, pro-app-176:9092]
	buffer.memory = 33554432
	client.id = 
	compression.type = none
	connections.max.idle.ms = 540000
	enable.idempotence = false
	interceptor.classes = null
	key.serializer = class org.apache.kafka.common.serialization.StringSerializer
	linger.ms = 0
	max.block.ms = 60000
	max.in.flight.requests.per.connection = 5
	max.request.size = 1048576
	metadata.max.age.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
	receive.buffer.bytes = 32768
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 30000
	retries = 0
	retry.backoff.ms = 100
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.mechanism = GSSAPI
	security.protocol = PLAINTEXT
	send.buffer.bytes = 131072
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
	ssl.endpoint.identification.algorithm = null
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLS
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	transaction.timeout.ms = 60000
	transactional.id = null
	value.serializer = class org.apache.kafka.common.serialization.StringSerializer

17:44:32,354 INFO  AppInfoParser - Kafka version : 1.0.0
17:44:32,354 INFO  AppInfoParser - Kafka commitId : aaa7af6d4a11b29d
17:44:32,431 INFO  Maxwell - Maxwell v1.22.1 is booting (MaxwellKafkaProducer), starting at Position[BinlogPosition[mysql-binlog.0000
01:358319], lastHeartbeat=0]17:44:32,588 INFO  AbstractSchemaStore - Maxwell is capturing initial schema
17:44:33,225 INFO  BinlogConnectorReplicator - Setting initial binlog pos to: mysql-binlog.000001:358319
17:44:33,229 INFO  MaxwellHTTPServer - Maxwell http server starting
17:44:33,232 INFO  MaxwellHTTPServer - Maxwell http server started on port 8090
17:44:33,288 INFO  log - Logging initialized @2078ms
17:44:33,345 INFO  Server - jetty-9.3.27.v20190418, build timestamp: 2019-04-19T02:11:38+08:00, git hash: d3e249f86955d04bc646bb62090
5b7c1bc596a8d17:44:33,368 INFO  BinaryLogClient - Connected to pro-app-174:3306 at mysql-binlog.000001/358319 (sid:6379, cid:1271)
17:44:33,368 INFO  BinlogConnectorLifecycleListener - Binlog connected.
17:44:33,385 INFO  ContextHandler - Started o.e.j.s.ServletContextHandler@3f08ad5f{/db_maxwell,null,AVAILABLE}
17:44:33,398 INFO  AbstractConnector - Started ServerConnector@60089079{HTTP/1.1,[http/1.1]}{pro-app-174:8090}
17:44:33,398 INFO  Server - Started @2189ms

 

 

 

 

 

 

到了这里要监控kafka的消费者客户端了。

kafka-console-consumer --bootstrap-server pro-app-174:2181,pro-app-175:2181,pro-app-176:2181 --from-beginning --topic housekeeper_realtime

 

来看下日志

[shouzhuang.li@pro-app-174 maxwell-1.17.1]$ kafka-console-consumer --bootstrap-server pro-app-174:2181,pro-app-175:2181,pro-app-176:2
181 --from-beginning --topic housekeeper_realtime19/06/05 17:49:09 INFO utils.Log4jControllerRegistration$: Registered kafka:type=kafka.Log4jController MBean
19/06/05 17:49:09 INFO consumer.ConsumerConfig: ConsumerConfig values: 
	auto.commit.interval.ms = 5000
	auto.offset.reset = earliest
	bootstrap.servers = [pro-app-174:2181, pro-app-175:2181, pro-app-176:2181]
	check.crcs = true
	client.dns.lookup = default
	client.id = 
	connections.max.idle.ms = 540000
	default.api.timeout.ms = 60000
	enable.auto.commit = false
	exclude.internal.topics = true
	fetch.max.bytes = 52428800
	fetch.max.wait.ms = 500
	fetch.min.bytes = 1
	group.id = console-consumer-82209
	heartbeat.interval.ms = 3000
	interceptor.classes = []
	internal.leave.group.on.close = true
	isolation.level = read_uncommitted
	key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
	max.partition.fetch.bytes = 1048576
	max.poll.interval.ms = 300000
	max.poll.records = 500
	metadata.max.age.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
	receive.buffer.bytes = 65536
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 30000
	retry.backoff.ms = 100
	sasl.client.callback.handler.class = null
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.login.callback.handler.class = null
	sasl.login.class = null
	sasl.login.refresh.buffer.seconds = 300
	sasl.login.refresh.min.period.seconds = 60
	sasl.login.refresh.window.factor = 0.8
	sasl.login.refresh.window.jitter = 0.05
	sasl.mechanism = GSSAPI
	security.protocol = PLAINTEXT
	send.buffer.bytes = 131072
	session.timeout.ms = 10000
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
	ssl.endpoint.identification.algorithm = null
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLS
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer

19/06/05 17:49:09 INFO utils.AppInfoParser: Kafka version : 2.1.0-kafka-4.0.0
19/06/05 17:49:09 INFO utils.AppInfoParser: Kafka commitId : unknown

 

 

 

好的,看起来也没有什么东西啊,继续往下走。插入数据试一下。

没有反应,消费一下也没有反映。

delete FROM maxwell.user_info;
insert into maxwell.user_info(userid,name,age) values (1,'name1',10),(2,'name2',20),(3,'name3',30);



update maxwell.user_info set name='name3',age=23 where userid=3;

插入数据报错

“UPDATE user_course SET userid = 200 WHERE id = 28;”,结果报[Err] 1055 - Expression #1 of ORDER BY clause is not in GROUP BY clause and contains nonaggregated column 'information_schema.PROFILING.SEQ' which is not functionally dependent on columns in GROUP BY clause; this is incompatible with sql_mode=only_full_group_by

解决办法:


SHOW VARIABLES LIKE '%sql_mode%';

set sql_mode = '';
set sql_mode = 'NO_ENGINE_SUBSTITUTION,STRICT_TRANS_TABLES';
SHOW VARIABLES LIKE '%sql_mode%';

然后依然是毫无反应啊。

 

 

 

 

 

查看监控

http://pro-app-174:8090/db_maxwell/metrics

看不出来啥的,学习一下怎么用,然后等下我们再来看一下,maxwell为什么没有发到kafka中吧。

 

maxwell健康状态

http://node2:8090/db_test_maxwell/healthcheck

 

ping

http://node2:8090/db_test_maxwell/ping

 

 

 

Maxwell优缺点
优点
(1) 相比较canal,配置简单,开箱即用。
(2) 可自定义发送目的地(java 继承类,实现方法),数据处理灵活(js)。
(3) 自带多种监控。

缺点
(1) 需要在待同步的业务库上建schema_database库(默认maxwell),用于存放元数据,如binlog消费偏移量。但按maxwell的意思,这并不是缺点。
(2) 不支持HA。而canal可通过zookeeper实现canal server和canal client的HA,实现failover。

 

 

 

 

 

好的,接下来看一下为什么没有打通了,首先先来调试kafka端,打开producer来创建生产者

生产者。

kafka-console-producer --broker-list pro-app-174:9092 --topic housekeeper_realtime


19/06/05 18:19:42 INFO utils.AppInfoParser: Kafka version : 2.1.0-kafka-4.0.0
19/06/05 18:19:42 INFO utils.AppInfoParser: Kafka commitId : unknown
>1
19/06/05 18:19:44 INFO clients.Metadata: Cluster ID: Nxyzks2RRUmrO0cRTrStwg
>2
>df
>wer
>asdf
>asdf
>sad
>fasd
f>sa
>dfsa
>df
>sadf
>sdf
>sd
>fsd
>fs
>df
>sadf
>sadf
>sf
>asdf
>sadf
>asdf
>asdf
>asdf
>sadf
>sadf
>sad
>fsad
>fsa
>dfsa
>f
>

 

 

消费者

kafka-console-consumer --bootstrap-server pro-app-174:9092 --topic housekeeper_realtime

up19/06/05 18:22:22 INFO internals.AbstractCoordinator: [Consumer clientId=consumer-1, groupId=console-consumer-75791] Successfully joi
ned group with generation 119/06/05 18:22:22 INFO internals.ConsumerCoordinator: [Consumer clientId=consumer-1, groupId=console-consumer-75791] Setting newly as
signed partitions [housekeeper_realtime-0]19/06/05 18:22:22 INFO internals.Fetcher: [Consumer clientId=consumer-1, groupId=console-consumer-75791] Resetting offset for partiti
on housekeeper_realtime-0 to offset 20.asdf
sadf
asdf
asdf
asdf
sadf
sadf
sad
fsad
fsa
dfsa
f

 

那就是maxwell这的问题了。

 

 

 

 

 

 

那我们继续看哈,当我们使用stdout,也就是生产者就是命令行,而不是kafka的时候,来看看。

bin/maxwell --user='root' --password='44ang2019!' --host='pro-app-174' --producer=stdout

 

控制台有打印了。

Using kafka version: 1.0.0
10:30:01,482 WARN  MaxwellMetrics - Metrics will not be exposed: metricsReportingType not configured.
10:30:01,767 INFO  Maxwell - Maxwell v1.22.1 is booting (StdoutProducer), starting at Position[BinlogPosition[mysql-binlog.000003:471
3815], lastHeartbeat=1559788182450]10:30:01,910 INFO  MysqlSavedSchema - Restoring schema id 3 (last modified at Position[BinlogPosition[mysql-binlog.000001:544743], la
stHeartbeat=1559728989336])10:30:02,061 INFO  MysqlSavedSchema - Restoring schema id 1 (last modified at Position[BinlogPosition[mysql-binlog.000001:358319], la
stHeartbeat=0])10:30:02,110 INFO  MysqlSavedSchema - beginning to play deltas...
10:30:02,112 INFO  MysqlSavedSchema - played 2 deltas in 2ms
10:30:02,138 INFO  BinlogConnectorReplicator - Setting initial binlog pos to: mysql-binlog.000003:4713815
10:30:02,237 INFO  BinaryLogClient - Connected to pro-app-174:3306 at mysql-binlog.000003/4713815 (sid:6379, cid:1362)
10:30:02,237 INFO  BinlogConnectorLifecycleListener - Binlog connected.
10:35:50,403 INFO  AbstractSchemaStore - storing schema @Position[BinlogPosition[mysql-binlog.000003:4741091], lastHeartbeat=15597885
 MODIFY COLUMN `userid`  int(12) NULL DEFAULT NULL FIRST" to test, new schema id is 410:36:26,530 INFO  AbstractSchemaStore - storing schema @Position[BinlogPosition[mysql-binlog.000003:4744913], lastHeartbeat=15597885
 MODIFY COLUMN `userid`  int(13) NULL DEFAULT NULL FIRST" to test, new schema id is 5{"database":"ralph","table":"org","type":"insert","ts":1559788797,"xid":108943,"commit":true,"data":{"id":1,"uc_org_id":1,"org_name":
"1","org_type":1,"create_date":"2019-06-06 10:39:54","create_by":11,"update_date":"2019-06-06 10:39:50","update_by":1,"is_deleted":1}}

插入数据为。

 

 

好的,看来是没有问题,可能只是表有问题吧。

然后我们换成kafka消费;

bin/maxwell --user='root' --password='sdfang2019!' --host='pro-app-174' --producer=kafka --kafka.bootstrap.servers=pro-app-174:9092,pro-app-175:9092,pro-app-176:9092  --output_ddl=true --kafka_topic=housekeeper_realtime

 

来了!

19/06/05 18:22:22 INFO internals.Fetcher: [Consumer clientId=consumer-1, groupId=console-consumer-75791] Resetting offset for partiti
on housekeeper_realtime-0 to offset 20.asdf
sadf
asdf
asdf
asdf
sadf
sadf
sad
fsad
fsa
dfsa
f

sdf
asdf
asdf
safd
{"database":"rawrh","table":"org","type":"insert","ts":1559789081,"xid":109663,"commit":true,"data":{"id":3,"uc_org_id":2,"org_name":
"2","org_type":2,"create_date":"2019-06-06 10:43:04","create_by":2,"update_date":"2019-06-06 10:43:08","update_by":2,"is_deleted":0}}{"database":"rasdfph","table":"org","type":"delete","ts":1559789100,"xid":109699,"commit":true,"data":{"id":1,"uc_org_id":1,"org_name":
"1","org_type":1,"create_date":"2019-06-06 10:39:54","create_by":11,"update_date":"2019-06-06 10:39:50","update_by":1,"is_deleted":1}}

 

可以看到,数据也已经来了。

 

很棒!

 

到此,maxwell到kafka到我们消费基本上成了。

接下来的任务,就是,考虑到update、delete的语句先后顺序了,顺序会导致结果完全不一样的。所以需要调研,还有一个业务就是,如何来分析这些日志,实时的mysql日志了,解析一下json就可以了,很棒,现在非常的棒棒。

 

 

 

版权声明:本文来源CSDN,感谢博主原创文章,遵循 CC 4.0 by-sa 版权协议,转载请附上原文出处链接和本声明。
原文链接:https://blog.csdn.net/qq_33792843/article/details/90905686
站方申明:本站部分内容来自社区用户分享,若涉及侵权,请联系站方删除。
  • 发表于 2020-03-06 22:47:07
  • 阅读 ( 1495 )
  • 分类:数据库

0 条评论

请先 登录 后评论

官方社群

GO教程

猜你喜欢