新闻  |   论坛  |   博客  |   在线研讨会
flink-cdc之mysql到es
天翼云开发者 | 2025-08-28 19:13:45    阅读:4   发布文章

本文分享自天翼云开发者社区《flink-cdc之mysql到es》,作者:刘****猛

环境搭建

version: '2'services:
  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:7.6.1
    ports:
      - "9200:9200"
      - "9300:9300"
    environment:
      discovery.type: single-node
  kibana:
    image: docker.elastic.co/kibana/kibana:7.6.1
    ports:
      - "5601:5601"
    environment:
      ELASTICSEARCH_URL: ://elasticsearch:9200

es加密开启,配置文件映射到宿主机

docker cp 39:/usr/share/elasticsearch/config /root/docker-build/es/configdocker cp 7b:/usr/share/kibana/config /root/docker-build/es/kibana/config

需要在配置文件中开启x-pack验证, 修改config目录下面的elasticsearch.yml文件,在里面添加如下内容,

xpack.security.enabled: truexpack.license.self_generated.type: basic
xpack.security.transport.ssl.enabled: true

重启es 

再次进入容器

修改kibana的配置文件kibana.yml

server.name: kibana
server.host: "0"elasticsearch.hosts: [ "://elasticsearch:9200" ]xpack.monitoring.ui.container.elasticsearch.enabled: trueelasticsearch.username: "elastic"  # es账号elasticsearch.password: "*******"   # es密码

 通过kibana的develop界面执行相关指令

创建索引

PUT order_index{
    "settings":{
        "index":{
            "number_of_shards":1,            "number_of_replicas":0        }
    }}

创建mapping

PUT order_index/_mapping{
    "properties":{
        "order_id":{
            "type":"long"
        },        "goods_name":{
            "type":"text"
        },        "goods_count":{
            "type":"long"
        },        "goods_price":{
            "type":"text"
        },        "order_money":{
            "type":"text"
        }
    }}

查看索引详情

GET order_index  返回值

{
  "order_index" : {
    "aliases" : { },    "mappings" : {
      "properties" : {
        "goods_count" : {
          "type" : "long"
        },        "goods_name" : {
          "type" : "text"
        },        "goods_price" : {
          "type" : "text"
        },        "order_id" : {
          "type" : "long"
        },        "order_money" : {
          "type" : "text"
        }
      }
    },    "settings" : {
      "index" : {
        "creation_date" : "1685094234700",        "number_of_shards" : "1",        "number_of_replicas" : "0",        "uuid" : "YLwsxO1pS6qWolb2N7cG5w",        "version" : {
          "created" : "7060199"
        },        "provided_name" : "order_index"
      }
    }
  }}

 

mysql创建数据表及数据

CREATE TABLE `my_order` (
  `order_id` int(8) UNSIGNED NOT NULL AUTO_INCREMENT COMMENT '订单id',  `order_money` decimal(8,2) NOT NULL COMMENT '订单金额',  `user_id` int(8) NOT NULL COMMENT '用户id',  `sub_province` varchar(20) NOT NULL COMMENT '下单时 省',  `sub_city` varchar(20)  NOT NULL COMMENT '下单时 市',  `sub_district` varchar(20) NOT NULL COMMENT '下单时 区',  `payment_status` int(1) NOT NULL DEFAULT '0' COMMENT '付款状态 0正常 1作废',  `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  PRIMARY KEY (`order_id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='拟订单表';CREATE TABLE `my_order_goods` (
  `order_goods_id` int(8) UNSIGNED NOT NULL AUTO_INCREMENT COMMENT '订单商品id',  `order_id` int(8) NOT NULL COMMENT '订单id',  `goods_id` int(8) NOT NULL COMMENT '商品id',  `sub_goods_name` varchar(50)  NOT NULL COMMENT '下单时商品名称',  `sub_goods_price` decimal(8,2) NOT NULL COMMENT '下单时商品价格',  `goods_count` int(11) NOT NULL COMMENT '下单了多少件',
  PRIMARY KEY (`order_goods_id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='拟订单下单商品表';CREATE TABLE `my_goods` (
  `goods_id` int(8) UNSIGNED NOT NULL AUTO_INCREMENT COMMENT '商品id',  `goods_price` decimal(8,2) NOT NULL COMMENT '商品价格',  `goods_name` varchar(50) NOT NULL COMMENT '商品名称',  `goods_details` varchar(255) DEFAULT NULL COMMENT '商品详情',
  PRIMARY KEY (`goods_id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='拟商品表';

写入样本数据

-- 初始化订单数据
INSERT INTO `my_order`(`order_id`, `order_money`, `user_id`, `sub_province`, `sub_city`, `sub_district`, `payment_status`, `create_time`) VALUES (1, 19.80, 1, '北京', '北京市', '西城区', 0, '2021-06-10 11:02:29');INSERT INTO `my_order`(`order_id`, `order_money`, `user_id`, `sub_province`, `sub_city`, `sub_district`, `payment_status`, `create_time`) VALUES (2, 9.90, 1, '北京', '北京市', '丰台区', 0, '2021-06-10 11:02:59');INSERT INTO `my_order`(`order_id`, `order_money`, `user_id`, `sub_province`, `sub_city`, `sub_district`, `payment_status`, `create_time`) VALUES (3, 300.00, 1, '北京', '北京市', '朝阳区', 0, '2021-06-10 11:03:16');INSERT INTO `my_order`(`order_id`, `order_money`, `user_id`, `sub_province`, `sub_city`, `sub_district`, `payment_status`, `create_time`) VALUES (4, 66.60, 1, '北京', '北京市', '顺义区', 0, '2021-06-10 11:03:32');-- 初始化商品数据
INSERT INTO `my_goods`(`goods_id`, `goods_price`, `goods_name`, `goods_details`) VALUES (1, 9.90, '两次性保温杯-改名称了~', '我是一只保温杯~');INSERT INTO `my_goods`(`goods_id`, `goods_price`, `goods_name`, `goods_details`) VALUES (2, 100.00, '欧莱雅男士洗面奶', '只买贵的,不买对的~');INSERT INTO `my_goods`(`goods_id`, `goods_price`, `goods_name`, `goods_details`) VALUES (3, 66.60, 'ipone13双面曲折屏', '是苹果,不是吃的那种...');-- 初始化订单商品数据(暂时不考虑一对多)INSERT INTO `my_order_goods`(`order_goods_id`, `order_id`, `goods_id`, `sub_goods_name`, `sub_goods_price`, `goods_count`) VALUES (1, 1, 1, '一次性保温杯', 9.90, 2);INSERT INTO `my_order_goods`(`order_goods_id`, `order_id`, `goods_id`, `sub_goods_name`, `sub_goods_price`, `goods_count`) VALUES (2, 2, 1, '一次性保温杯', 9.90, 1);INSERT INTO `my_order_goods`(`order_goods_id`, `order_id`, `goods_id`, `sub_goods_name`, `sub_goods_price`, `goods_count`) VALUES (3, 3, 2, '欧莱雅洗面奶', 100.00, 3);INSERT INTO `my_order_goods`(`order_goods_id`, `order_id`, `goods_id`, `sub_goods_name`, `sub_goods_price`, `goods_count`) VALUES (4, 4, 3, '吃的苹果', 66.60, 1);

 

flinksql

CREATE TABLE my_order (
  order_id INT primary key not enforced,
  order_money DECIMAL(8, 2)) WITH (
 'connector' = 'mysql-cdc',  'hostname' = '101.43.164.4',  'port' = '3306',  'database-name' = 'cdc-source',  'table-name' = 'my_order',  'username' = 'root',  'password' = '******',  'jdbc.properties.useSSL' = 'false'
  );CREATE TABLE my_goods (
  goods_id INT primary key not enforced,
  goods_name STRING,
  goods_price DECIMAL(8, 2)) WITH (
 'connector' = 'mysql-cdc',  'hostname' = '101.43.164.4',  'port' = '3306',  'database-name' = 'cdc-source',  'table-name' = 'my_goods',  'username' = 'root',  'password' = '******',  'jdbc.properties.useSSL' = 'false');CREATE TABLE my_order_goods (
  order_id INT primary key not enforced,
  goods_id INT,
  goods_count INT) WITH (
   'connector' = 'mysql-cdc',  'hostname' = '101.43.164.4',  'port' = '3306',  'database-name' = 'cdc-source',  'table-name' = 'my_order_goods',  'username' = 'root',  'password' = '******',  'jdbc.properties.useSSL' = 'false');CREATE TABLE order_index(
  order_id INT,
  goods_name STRING,
  goods_count INT,
  goods_price DECIMAL(8, 2),
  order_money DECIMAL(8, 2),
  PRIMARY KEY (order_id) NOT ENFORCED) WITH (
    'connector' = 'elasticsearch-7',    'hosts' = '101.43.164.4:9200',    'index' = 'order_index',    'username' = 'elastic',    'password' = '******');insert into order_indexselect mo.order_id, mg.goods_name, mog.goods_count,
 mg.goods_price, mo.order_money
from my_order mo 
left join my_order_goods mog on mo.order_id = mog.order_id
left join my_goods mg on mog.goods_id = mg.goods_id;

kibana中查询es数据

POST order_index/_search{
  "size": 20,  "query": {"match_all": {
   
  }}}

 

{
  "took" : 0,  "timed_out" : false,  "_shards" : {
    "total" : 1,    "successful" : 1,    "skipped" : 0,    "failed" : 0
  },  "hits" : {
    "total" : {
      "value" : 4,      "relation" : "eq"
    },    "max_score" : 1.0,    "hits" : [
      {
        "_index" : "order_index",        "_type" : "_doc",        "_id" : "3",        "_score" : 1.0,        "_source" : {
          "order_id" : 3,          "goods_name" : "欧莱雅男士洗面奶",          "goods_count" : 3,          "goods_price" : 100.0,          "order_money" : 300.0
        }
      },      {
        "_index" : "order_index",        "_type" : "_doc",        "_id" : "4",        "_score" : 1.0,        "_source" : {
          "order_id" : 4,          "goods_name" : "ipone13双面曲折屏~",          "goods_count" : 1,          "goods_price" : 66.6,          "order_money" : 66.6
        }
      },      {
        "_index" : "order_index",        "_type" : "_doc",        "_id" : "1",        "_score" : 1.0,        "_source" : {
          "order_id" : 1,          "goods_name" : "两次性保温杯-3342",          "goods_count" : 2,          "goods_price" : 9.9,          "order_money" : 19.8
        }
      },      {
        "_index" : "order_index",        "_type" : "_doc",        "_id" : "2",        "_score" : 1.0,        "_source" : {
          "order_id" : 2,          "goods_name" : "两次性保温杯-3342",          "goods_count" : 1,          "goods_price" : 9.9,          "order_money" : 9.9
        }
      }
    ]
  }}

此时修改mysql商品表

UPDATE `cdc-source`.`my_goods` SET `goods_name` = '两次性保温杯-我又改名了' WHERE `goods_id` = 1

 此时查看es中的数据

{
  "took" : 363,  "timed_out" : false,  "_shards" : {
    "total" : 1,    "successful" : 1,    "skipped" : 0,    "failed" : 0
  },  "hits" : {
    "total" : {
      "value" : 4,      "relation" : "eq"
    },    "max_score" : 1.0,    "hits" : [
      {
        "_index" : "order_index",        "_type" : "_doc",        "_id" : "4",        "_score" : 1.0,        "_source" : {
          "order_id" : 4,          "goods_name" : "ipone13双面曲折屏",          "goods_count" : 1,          "goods_price" : 66.6,          "order_money" : 66.6
        }
      },      {
        "_index" : "order_index",        "_type" : "_doc",        "_id" : "3",        "_score" : 1.0,        "_source" : {
          "order_id" : 3,          "goods_name" : "欧莱雅男士洗面奶",          "goods_count" : 3,          "goods_price" : 100.0,          "order_money" : 300.0
        }
      },      {
        "_index" : "order_index",        "_type" : "_doc",        "_id" : "2",        "_score" : 1.0,        "_source" : {
          "order_id" : 2,          "goods_name" : "两次性保温杯-我又改名了",          "goods_count" : 1,          "goods_price" : 9.9,          "order_money" : 9.9
        }
      },      {
        "_index" : "order_index",        "_type" : "_doc",        "_id" : "1",        "_score" : 1.0,        "_source" : {
          "order_id" : 1,          "goods_name" : "两次性保温杯-我又改名了",          "goods_count" : 2,          "goods_price" : 9.9,          "order_money" : 19.8
        }
      }
    ]
  }}


*博客内容为网友个人发布,仅代表博主个人观点,如有侵权请联系工作人员删除。

参与讨论
登录后参与讨论
天翼云提供云主机、CDN、云电脑、大数据及AI等全线产品和场景化解决方案。
推荐文章
最近访客