"); //-->
本文分享自天翼云开发者社区《flink-cdc之mysql到es》,作者:刘****猛
环境搭建
version: '2'services: elasticsearch: image: docker.elastic.co/elasticsearch/elasticsearch:7.6.1 ports: - "9200:9200" - "9300:9300" environment: discovery.type: single-node kibana: image: docker.elastic.co/kibana/kibana:7.6.1 ports: - "5601:5601" environment: ELASTICSEARCH_URL: ://elasticsearch:9200
es加密开启,配置文件映射到宿主机
docker cp 39:/usr/share/elasticsearch/config /root/docker-build/es/configdocker cp 7b:/usr/share/kibana/config /root/docker-build/es/kibana/config
需要在配置文件中开启x-pack验证, 修改config目录下面的elasticsearch.yml文件,在里面添加如下内容,
xpack.security.enabled: truexpack.license.self_generated.type: basic xpack.security.transport.ssl.enabled: true
重启es
再次进入容器
修改kibana的配置文件kibana.yml
server.name: kibana server.host: "0"elasticsearch.hosts: [ "://elasticsearch:9200" ]xpack.monitoring.ui.container.elasticsearch.enabled: trueelasticsearch.username: "elastic" # es账号elasticsearch.password: "*******" # es密码
通过kibana的develop界面执行相关指令
创建索引
PUT order_index{ "settings":{ "index":{ "number_of_shards":1, "number_of_replicas":0 } }}
创建mapping
PUT order_index/_mapping{ "properties":{ "order_id":{ "type":"long" }, "goods_name":{ "type":"text" }, "goods_count":{ "type":"long" }, "goods_price":{ "type":"text" }, "order_money":{ "type":"text" } }}
查看索引详情
GET order_index 返回值
{ "order_index" : { "aliases" : { }, "mappings" : { "properties" : { "goods_count" : { "type" : "long" }, "goods_name" : { "type" : "text" }, "goods_price" : { "type" : "text" }, "order_id" : { "type" : "long" }, "order_money" : { "type" : "text" } } }, "settings" : { "index" : { "creation_date" : "1685094234700", "number_of_shards" : "1", "number_of_replicas" : "0", "uuid" : "YLwsxO1pS6qWolb2N7cG5w", "version" : { "created" : "7060199" }, "provided_name" : "order_index" } } }}
mysql创建数据表及数据
CREATE TABLE `my_order` ( `order_id` int(8) UNSIGNED NOT NULL AUTO_INCREMENT COMMENT '订单id', `order_money` decimal(8,2) NOT NULL COMMENT '订单金额', `user_id` int(8) NOT NULL COMMENT '用户id', `sub_province` varchar(20) NOT NULL COMMENT '下单时 省', `sub_city` varchar(20) NOT NULL COMMENT '下单时 市', `sub_district` varchar(20) NOT NULL COMMENT '下单时 区', `payment_status` int(1) NOT NULL DEFAULT '0' COMMENT '付款状态 0正常 1作废', `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', PRIMARY KEY (`order_id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='拟订单表';CREATE TABLE `my_order_goods` ( `order_goods_id` int(8) UNSIGNED NOT NULL AUTO_INCREMENT COMMENT '订单商品id', `order_id` int(8) NOT NULL COMMENT '订单id', `goods_id` int(8) NOT NULL COMMENT '商品id', `sub_goods_name` varchar(50) NOT NULL COMMENT '下单时商品名称', `sub_goods_price` decimal(8,2) NOT NULL COMMENT '下单时商品价格', `goods_count` int(11) NOT NULL COMMENT '下单了多少件', PRIMARY KEY (`order_goods_id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='拟订单下单商品表';CREATE TABLE `my_goods` ( `goods_id` int(8) UNSIGNED NOT NULL AUTO_INCREMENT COMMENT '商品id', `goods_price` decimal(8,2) NOT NULL COMMENT '商品价格', `goods_name` varchar(50) NOT NULL COMMENT '商品名称', `goods_details` varchar(255) DEFAULT NULL COMMENT '商品详情', PRIMARY KEY (`goods_id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='拟商品表';
-- 初始化订单数据 INSERT INTO `my_order`(`order_id`, `order_money`, `user_id`, `sub_province`, `sub_city`, `sub_district`, `payment_status`, `create_time`) VALUES (1, 19.80, 1, '北京', '北京市', '西城区', 0, '2021-06-10 11:02:29');INSERT INTO `my_order`(`order_id`, `order_money`, `user_id`, `sub_province`, `sub_city`, `sub_district`, `payment_status`, `create_time`) VALUES (2, 9.90, 1, '北京', '北京市', '丰台区', 0, '2021-06-10 11:02:59');INSERT INTO `my_order`(`order_id`, `order_money`, `user_id`, `sub_province`, `sub_city`, `sub_district`, `payment_status`, `create_time`) VALUES (3, 300.00, 1, '北京', '北京市', '朝阳区', 0, '2021-06-10 11:03:16');INSERT INTO `my_order`(`order_id`, `order_money`, `user_id`, `sub_province`, `sub_city`, `sub_district`, `payment_status`, `create_time`) VALUES (4, 66.60, 1, '北京', '北京市', '顺义区', 0, '2021-06-10 11:03:32');-- 初始化商品数据 INSERT INTO `my_goods`(`goods_id`, `goods_price`, `goods_name`, `goods_details`) VALUES (1, 9.90, '两次性保温杯-改名称了~', '我是一只保温杯~');INSERT INTO `my_goods`(`goods_id`, `goods_price`, `goods_name`, `goods_details`) VALUES (2, 100.00, '欧莱雅男士洗面奶', '只买贵的,不买对的~');INSERT INTO `my_goods`(`goods_id`, `goods_price`, `goods_name`, `goods_details`) VALUES (3, 66.60, 'ipone13双面曲折屏', '是苹果,不是吃的那种...');-- 初始化订单商品数据(暂时不考虑一对多)INSERT INTO `my_order_goods`(`order_goods_id`, `order_id`, `goods_id`, `sub_goods_name`, `sub_goods_price`, `goods_count`) VALUES (1, 1, 1, '一次性保温杯', 9.90, 2);INSERT INTO `my_order_goods`(`order_goods_id`, `order_id`, `goods_id`, `sub_goods_name`, `sub_goods_price`, `goods_count`) VALUES (2, 2, 1, '一次性保温杯', 9.90, 1);INSERT INTO `my_order_goods`(`order_goods_id`, `order_id`, `goods_id`, `sub_goods_name`, `sub_goods_price`, `goods_count`) VALUES (3, 3, 2, '欧莱雅洗面奶', 100.00, 3);INSERT INTO `my_order_goods`(`order_goods_id`, `order_id`, `goods_id`, `sub_goods_name`, `sub_goods_price`, `goods_count`) VALUES (4, 4, 3, '吃的苹果', 66.60, 1);
flinksql
CREATE TABLE my_order ( order_id INT primary key not enforced, order_money DECIMAL(8, 2)) WITH ( 'connector' = 'mysql-cdc', 'hostname' = '101.43.164.4', 'port' = '3306', 'database-name' = 'cdc-source', 'table-name' = 'my_order', 'username' = 'root', 'password' = '******', 'jdbc.properties.useSSL' = 'false' );CREATE TABLE my_goods ( goods_id INT primary key not enforced, goods_name STRING, goods_price DECIMAL(8, 2)) WITH ( 'connector' = 'mysql-cdc', 'hostname' = '101.43.164.4', 'port' = '3306', 'database-name' = 'cdc-source', 'table-name' = 'my_goods', 'username' = 'root', 'password' = '******', 'jdbc.properties.useSSL' = 'false');CREATE TABLE my_order_goods ( order_id INT primary key not enforced, goods_id INT, goods_count INT) WITH ( 'connector' = 'mysql-cdc', 'hostname' = '101.43.164.4', 'port' = '3306', 'database-name' = 'cdc-source', 'table-name' = 'my_order_goods', 'username' = 'root', 'password' = '******', 'jdbc.properties.useSSL' = 'false');CREATE TABLE order_index( order_id INT, goods_name STRING, goods_count INT, goods_price DECIMAL(8, 2), order_money DECIMAL(8, 2), PRIMARY KEY (order_id) NOT ENFORCED) WITH ( 'connector' = 'elasticsearch-7', 'hosts' = '101.43.164.4:9200', 'index' = 'order_index', 'username' = 'elastic', 'password' = '******');insert into order_indexselect mo.order_id, mg.goods_name, mog.goods_count, mg.goods_price, mo.order_money from my_order mo left join my_order_goods mog on mo.order_id = mog.order_id left join my_goods mg on mog.goods_id = mg.goods_id;
kibana中查询es数据
POST order_index/_search{ "size": 20, "query": {"match_all": { }}}
{ "took" : 0, "timed_out" : false, "_shards" : { "total" : 1, "successful" : 1, "skipped" : 0, "failed" : 0 }, "hits" : { "total" : { "value" : 4, "relation" : "eq" }, "max_score" : 1.0, "hits" : [ { "_index" : "order_index", "_type" : "_doc", "_id" : "3", "_score" : 1.0, "_source" : { "order_id" : 3, "goods_name" : "欧莱雅男士洗面奶", "goods_count" : 3, "goods_price" : 100.0, "order_money" : 300.0 } }, { "_index" : "order_index", "_type" : "_doc", "_id" : "4", "_score" : 1.0, "_source" : { "order_id" : 4, "goods_name" : "ipone13双面曲折屏~", "goods_count" : 1, "goods_price" : 66.6, "order_money" : 66.6 } }, { "_index" : "order_index", "_type" : "_doc", "_id" : "1", "_score" : 1.0, "_source" : { "order_id" : 1, "goods_name" : "两次性保温杯-3342", "goods_count" : 2, "goods_price" : 9.9, "order_money" : 19.8 } }, { "_index" : "order_index", "_type" : "_doc", "_id" : "2", "_score" : 1.0, "_source" : { "order_id" : 2, "goods_name" : "两次性保温杯-3342", "goods_count" : 1, "goods_price" : 9.9, "order_money" : 9.9 } } ] }}
此时修改mysql商品表
UPDATE `cdc-source`.`my_goods` SET `goods_name` = '两次性保温杯-我又改名了' WHERE `goods_id` = 1
此时查看es中的数据
{ "took" : 363, "timed_out" : false, "_shards" : { "total" : 1, "successful" : 1, "skipped" : 0, "failed" : 0 }, "hits" : { "total" : { "value" : 4, "relation" : "eq" }, "max_score" : 1.0, "hits" : [ { "_index" : "order_index", "_type" : "_doc", "_id" : "4", "_score" : 1.0, "_source" : { "order_id" : 4, "goods_name" : "ipone13双面曲折屏", "goods_count" : 1, "goods_price" : 66.6, "order_money" : 66.6 } }, { "_index" : "order_index", "_type" : "_doc", "_id" : "3", "_score" : 1.0, "_source" : { "order_id" : 3, "goods_name" : "欧莱雅男士洗面奶", "goods_count" : 3, "goods_price" : 100.0, "order_money" : 300.0 } }, { "_index" : "order_index", "_type" : "_doc", "_id" : "2", "_score" : 1.0, "_source" : { "order_id" : 2, "goods_name" : "两次性保温杯-我又改名了", "goods_count" : 1, "goods_price" : 9.9, "order_money" : 9.9 } }, { "_index" : "order_index", "_type" : "_doc", "_id" : "1", "_score" : 1.0, "_source" : { "order_id" : 1, "goods_name" : "两次性保温杯-我又改名了", "goods_count" : 2, "goods_price" : 9.9, "order_money" : 19.8 } } ] }}
*博客内容为网友个人发布,仅代表博主个人观点,如有侵权请联系工作人员删除。