开发者

如何使用Flink CDC实现 Oracle数据库数据同步

开发者 https://www.devze.com 2024-08-22 08:57 出处:网络 作者: shandongwill
目录前言一、开启归档日志二、创建flinkcdc专属用户2.1 对于oracle 非CDB数据库,执行如下sql2.2 对于Oracle CDB数据库,执行如下sql三、指定oracle表、库级启用四、使用flink-connector-oracle-cdc实现数据库同步4.
目录
  • 前言
  • 一、开启归档日志
  • 二、创建flinkcdc专属用户
    • 2.1 对于oracle 非CDB数据库,执行如下sql
    • 2.2 对于Oracle CDB数据库,执行如下sql
  • 三、指定oracle表、库级启用
    • 四、使用flink-connector-oracle-cdc实现数据库同步
      • 4.1 引入pom依赖
      • 4.2 Java主代码
      • 4.3json转换为row

    前言

    Flink CDC 是一个基于流的数据集成工具,旨在为用户提供一套功能更加全面的编程接口(API)。 该工具使得用户能够以 YAML 配置文件的形式实现数据库同步,同时也提供了Flink CDC Source Connector API。 Flink CDC 在任务提交过程中进行了优化,并且增加了一些高级特性,如表结构变更自动同步(Schema Evolution)、数据转换(Data Transformation)、整库同步(Full Database Synchronization)以及 精确一次(Exactly-once)语义。

    本文通过flink-connector-oracle-cdc来实现Oracle数据库的数据同步。

    一、开启归档日志

    1)数据库服务器终端,使用sysdba角色连接数据库

     sqlplus / as sysdba
    或
    sqlplus /nolog
    CONNECT sys/password AS SYSDBA;

    2)检查归档日志是否开启

    archive log list;

    (“Database log mode: No Archive Mode”,日志归档未开启)

    (“Database log mode: Archiwww.devze.comve Mode”,日志归档已开启)

    3)启用归档日志

    alter system set db_recovery_file_dest_size = 10G;
    alter system set db_recovery_file_dest = '/opt/oracle/oradata/recovery_area' scope=spfile;
    shutdown immediate;
    startup mount;
    alter database archivelog;
    alter database open;

    注意:

    启用归档日志需要重启数据库。

    归档日志会占用大量的磁盘空间,应定期清除过期的日志文件

    4)启动完成后重新执行 archive log list; 查看归档打开状态

    二、创建flinkcdc专属用户

    2.1 对于Oracle 非CDB数据库,执行如下sql

      CREATE USER flinkuser IDENTIFIED BY flinkpw DEFAULT TABLESPACE LOGMINER_TBS QUOTA UNLIMITED ON LOGMINER_TBS;
      GRANT CREATE SESSION TO flinkuser;
      GRANT SET CONTAINER TO flinkuser;
      GRANT SELECT ON V_$DATABASE to flinkuser;
      GRANT FLASHBACK ANY TABLE TO flinkuser;
      GRANT SELECT ANY TABLE TO flinkuser;
      GRANT SELECT_CATALOG_ROLE TO flinkuser;
      GRANT EXECUTE_CATALOG_ROLE TO flinkuser;
      GRANT SELECT ANY TRANSACTION TO flinkuser;
      GRANT LOGMINING TO flinkuser;
      GRANT ANALYZE ANY TO flinkuser;
      GRANT CREATE TABLE TO flinkuser;
      -- need not to execute if set scan.incremental.snapshot.enabled=true(default)
      GRANT LOCK ANY TABLE TO flinkuser;
      GRANT ALTER ANY TABLE TO flinkuser;
      GRANT CREATE SEQUENCE TO flinkuser;
      GRANT EXECUTE ON DBMS_LOGMNR TO flinkuser;
      GRANT EXECUTE ON DBMS_LOGMNR_D TO flinkuser;
      GRANT SELECT ON V_$LOG TO flinkuser;
      GRANT SELECT ON V_$LOG_HISTORY TO flinkuser;
      GRANT SELECT ON V_$LOGMNR_LOGS TO flinkuser;
      GRANT SELECT ON V_$LOGMNR_CONTENTS TO flinkuser;
      GRANT SELECT ON V_$LOGMNR_PARAMETERS TO flinkuser;
      GRANT SELECT ON V_$LOGFILE TO flinkuser;
      GRANT SELECT ON Vjs_$ARCHIVED_LOG TO flinkuser;
      GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO flinkuser;

    2.2 对于Oracle CDB数据库,执行如下sql

      CREATE USER flinkuser IDENTIFIED BY flinkpw DEFAULT TABLESPACE logminer_tbs QUOTA UNLIMITED ON logminer_tbs CONTAINER=ALL;
      GRANT CREATE SESSION TO flinkuser CONTAINER=ALL;
      GRANT SET CONTAINER TO flinkuser CONTAINER=ALL;
      GRANT SELECT ON V_$DATABASE to flinkuser CONTAINER=ALL;
      GRANT FLASHBACK ANY TABLE TO flinkuser CONTAINER=ALL;
      GRANT SELECT ANY TABLE TO flinkuser CONTAINER=ALL;
      GRANT SELECT_CATALOG_ROLE TO flinkuser CONTAINER=ALL;
      GRANT EXECUTE_CATALOG_ROLE TO flinkuser CONTAINER=ALL;
      GRANT SELECT ANY TRANSACTION TO flinkuser CONTAINER=ALL;
      GRANT LOGMINING TO flinkuser CONTAINER=ALL;
      GRANT CREATE TABLE TO flinkuser CONTAINER=ALL;
      -- need not to execute if set scan.incremental.snapshot.enabled=true(default)
      GRANT LOCK ANY TABLE TO flinkuser CONTAINER=ALL;
      GRANT CREATE SEQUENCE TO flinkuser CONTAINER=ALL;
      GRANT EXECUTE ON DBMS_LOGMNR TO flinkuser CONTAINER=ALL;
      GRANT EXECUTE ON DBMS_LOGMNR_D TO flinkuser CONTAINER=ALL;
      GRANT SELECT ON V_$LOG TO flinkuser CONTAINER=ALL;
      GRANT SELECT ON V_$LOG_HISTORY TO flinkuser CONTAINER=ALL;
      GRANT SELECT ON V_$LOGMNR_LOGS TO flinkuser CONTAINER=ALL;
      GRANT SELECT ON V_$LOGMNR_CONTENTS TO flinkuser CONTAINER=ALL;
      GRANT SELECT ON V_$LOGMNR_PARAMETERS TO flinkuser CONTAINER=ALL;
      GRANT SELECT ON V_$LOGFILE TO flinkuser CONTAINER=ALL;
      GRANT SELECT ON V_$ARCHIVED_LOG TO flinkuser CONTAINER=ALL;
      GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO flinkuser CONTAINER=ALL;

    三、指定oracle表、库级启用

    -- 指定表启用补充日志记录:
    ALTER TABLE databasename.tablename ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
    -- 为数据库的所有表启用
    ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
    -- 指定数据库启用补充日志记录
    ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;

    四、使用flink-connector-oracle-cdc实现数据库同步

    4.1 引入pom依赖

     <dependency>
         <groupId>com.ververica</groupId>
         <artifactId>flink-connector-oracle-cdc</artifactId>
         <version>2.4.0</version>
     </dependency>

    4.2 Java主代码

    package test.datastream.cdc.oracle;
    import com.ververica.cdc.connectors.oracle.OracleSource;
    import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.source.SourceFunction;
    import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
    import org.apache.flink.streaming.api.windowing.time.Time;
    import org.apache.flink.types.Row;
    import test.datastream.cdc.oracle.function.CacheDataAllWindowFunction;
    import test.datastream.cdc.oracle.function.CdcString2RowMap;
    import test.datastream.cdc.oracle.function.DbCdcSinkFunction;
    import java.util.Properties;
    public class OracleCdcExample {
        public static void main(String[] args) throws Exception {
            Properties properties = new Properties();
            //数字类型数据 转换为字符
            properties.setProperty("decimal.handling.mode", "string");
            SourceFunction<String> sourceFunction = OracleSource.<String>builder()
    //                .startupOptions(StartupOptions.latest()) // 从最晚位点启动
                    .url("jdbc:oracle:thin:@localhost:1521:orcl")
                    .port(1521)
                    .database("ORCL") // monitor XE database
                    .schemaList("C##flink_user") // monitor inventory schema
                    .tableList("c##flink_user.TEST2") // monitor products table
                    .username("c##flink_user")
                    .password("flinkpw")
                    .debeziumProperties(properties)
                    .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
                    .build();
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            DataStreamSource&编程lt;String> source = env.addSource(sourceFunction).setParallelism(1);// use parallelism 1 for sink to keep message ordering
            SingleOutputStreamOperator<Row> mapStream = source.flatMap(new CdcString2RowMap());
            SingleOutputStreamOperator<Row[]> winStream = mapStream.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)))
                    .process(new CacheDataAllWindowFunction());
    		//批量同步
            winStream.addSink(new DbCdcSinkFunction(null));
            env.execute();
        }
    }

    4.3json转换为row

    package test.datastream.cdc.oracle.function;
    import cn.com.victorysoft.common.configuration.VsConfiguration;
    import org.apache.flink.api.common.functions.RichFlatMapFunction;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.types.Row;
    import org.apache.flink.types.RowKind;
    import org.apache.flink.util.Collector;
    import test.datastream.cdc.CdcConstants;
    import java.sql.Timestamp;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.Set;
    /**
     * @desc cdc json解析,并转换为Row
     */
    public class CdcString2RowMap extends RichFlatMapFunction<String, Row> {
        private Map<String,Integer> columnMap =new HashMap<>();
        @Override
    python    public void open(Configuration parameters) throws Exception {
            columnMap.put("ID",0);
            columnMap.put("NAME",1);
            columnMap.put("DESCRIPTION",2);
            columnMap.put("AGE",3);
            columnMap.put("CREATE_TIME",4);
            columnMap.put("SCORE",5);
            columnMap.put("C_1",6);
            columnMap.put("B_1",7);
        }
        @Override
        public void flatMap(String s, Collector<Row> collector) throws Exception {
            System.out.println("receive: "+s);
            VsConfiguration conf=VsConfiguration.from(s);
            String op = conf.getString(CdcConstants.K_OP);
            VsConfiguration before = conf.getConfiguration(CdcConstants.K_BEFORE);
            VsConfiguration after = conf.getConfiguration(CdcConstants.K_AFTER);
            Row row =null;
            if(CdcConstants.OP_C.equals(op)){
                //插入,使用after数据
                row = convertToRow(after);
                row.setKind(RowKind.INSERT);
            }else if(CdcConstants.OP_U.equals(op)){
                //更新,使用after数据
                row = convertToRow(after);
                row.setKind(RowKind.UPDATE_AFTER);
            }else if(CdcConstants.OP_D.equals(op)){
                //删除,使用before数据
                row = convertToRow(before);
                row.setKind(RowKind.DELETE);
            }else {
                //r 操作,使用after数据
                row = conveDCCNdtGdGrtToRow(after);
                row.setKind(RowKind.INSERT);
            }
            collector.collect(row);
        }
        private Row convertToRow(VsConfiguration data){
            Set<String> keys = data.getKeys();
            int size = keys.size();
            Row row=new Row(8);
            int i=0;
            for (String key:keys) {
                Integer index = this.columnMap.get(key);
                Object value=data.get(key);
                if(key.equals("CREATE_TIME")){
                    //long日期转timestamp
                    value=long2Timestamp((Long)value);
                }
                row.setField(index,value);
            }
            return row;
        }
        private static  java.sql.Timestamp long2Timestamp(Long time){
            Timestamp timestamp = new Timestamp(time/1000);
            System.out.println(timestamp);
            return timestamp;
        }
    }

    到此这篇关于使用Flink CDC实现 Oracle数据库数据同步的文章就介绍到这了,更多相关Flink CDC Oracle数据同步内容请搜索编程客栈(www.devze.com)以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程客栈(www.devze.com)!

    0

    精彩评论

    暂无评论...
    验证码 换一张
    取 消

    关注公众号