目录
- python从HDFS导出到关系数据库(如mysql、oracle、PostgreSQL)
- 操作步骤
- 1. 定义参数和变量
- 2. 构造SQL查询并提交给Hive
- 3. 获取目标数据库连接信息并解析结果为变量
- 4. 获取目标数据库密码
- 5. 预处理SQL语句
- 6. 数据导出并导入目标数据库
- 7. Python代码详解
- 完整python脚本
- 总结
python从HDFS导出到关系数据库(如MySQL、Oracle、PostgreSQL)
一整套从Hadoop HDFS中导出数据并通过DataX工具导入到关系数据库的过程。
操作步骤
1. 定义参数和变量
sql=$1 # 导出数据的SQL语句 s_tablename=$2 # 源表名 ds_name=$3 # 目标数据库名称 t_tablename=$4 # 目标表名 temptable="h2o_"`date +%s%N | md5sum | head -c 16` # 生成一个基于时间戳的临时表名 filename=${s_tablename}_${temptable} # 文件名 path="hdfs://prdhdfs/tmp/hdfs_to_rdb/$filename/" # HDFS路径 local_path="/data02/dcadmin/scripts/dataos_scripts/data_exp" # 本地脚本路径 flag=$5 # 标志,用来确定是否TRUNCATE表
2. 构造SQL查询并提交给Hive
echo "$sql" sql1=`echo "$sql"|cut -d ";" -f2` # 截取分号后的部分 sql0=`echo "$sql"|cut -d ";" -f1` # 截取分号前的部分 sql0="$sql0;insert overwrite directory '${path}' stored as ORC $sql1" # 构建最终的SQL echo "$sql0" kinit -kt /data02/dcadmin/keytab_shengchan/dcadmin.keytab dcadmin@SC.COM beeline -u "jdbc:hive2://prdnn1.yxbdprd.sc.ctc.com:2181, ... ,prddb1.yxbdprd.sc.ctc.com:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2" -e "set tez.queue.name=offline;$sql0 distribute by rand()"
3. 获取目标数据库连接信息并解析结果为变量
从PostgreSQL数据库中获取目标数据库的连接信息,并解析结果为变量。
re=$(PGPASSWORD=... psql -h 10.251.110.104 -p 18921 -U dacp -d dacp -t <<EOF SELECT CASE WHEN ds_type = 'mysql' THEN CONCAT ('jdbc:mysql://' ,ds_inst_loc,'/',(ds_conf::json ->>'physicalDbName'),'?characterEncoding=UTF-8') WHEN ds_type = 'oracle' THEN ds_conf::json ->> 'url' WHEN ds_type = 'pg' THEN CONCAT ('jdbc:postgresql://',ds_inst_loc,'/',(ds_conf::json ->>'physicalDbName')) END jdbc_url ,ds_acct ,ds_auth ,CASE WHEN ds_type = 'mysql' THEN 'mysqlwriter' WHEN ds_type = 'oracle' THEN 'oraclewriter' WHEN ds_type = 'pg' THEN 'postgresqlwriter' END ds_type FROM dacp_dev.dacp_meta_datasource WHERE ds_type IN ('mysql', 'oracle', 'pg') AND upper(trim(ds_name)) = upper(trim('$ds_name')) EOF ) eval $(echo $re| awk '{printf("jdbc_url=%s; ds_acct=%s; ds_auth=%s; ds_type=%s",$1,$3,$5,$7)}')
4. 获取目标数据库密码
通过执行Java程序解密数据库密码:
pw=`java -Dpwd=${ds_auth} -jar $local_path/AesCipher-1.0.jar`
5. 预处理SQL语句
根据标志变量flag
,确定是否执行TRUNCATE语句:
preSQL="select * from $t_tablename where 1=-1" if [ "$flag" = "T" ];then preSQL="truncate table $t_tablename" fi echo "preSQL=$preSQL"
6. 数据导出并导入目标数据库
使用datax
执行从HDFS导入到关系数据库的任务:
python $local_path/datax/bin/datax.py -p "-Dpath=$path -dwriter=$ds_type -Drdb_user=$ds_acct -Drdb_pass="$pw" -Drdb_jdbc="$jdbc_url" -Drdb_table=$t_tablename -DpreSql="$preSQL"" $local_path/hdfs_to_rdb.json
7. Python代码详解
此外,你还展示了大量的Python代码用于处理数据转换和传输。重点如下:
1. 初始化设置和依赖
加载必要的包,并初始化变量。
import time import datetime import os import threadpool import commands import calendar import random import pymssql import pymysql import cx_Oracle import psycopg2 import socket from pyhdfs import HdfsClient from hashlib import md5 # 其他初始化设置
2. 连接数据库并执行SQL
定义了连接数据库并执行SQL的函数:
def connect_database_to_select(conn, sql): cursor = conn.cursor() try: cursor.execute(sql) result = cursor.fetchall() conn.commit() return result except Exception as e: print('SQL执行错误:{},执行SQL:{}'.format(str(e), sql)) sys.exit(2) finally: cursor.close() def connect_database_to_commit(exe_type, conn, sql, insert_list): cursor = conn.cursor() try: if exe_type.lower() in ('delete', 'insert'): 编程客栈 cursor.execute(sql) conn.commit() elif exe_type.lower() == 'insertmany': cursor.executemany(sql, insert_list) conn.commit() except Exception as e: print('SQL执行错误:{},执行SQL:{}'.format(str(e), sql)) sys.exit(2) finally: cursor.close()
3. 数据导出处理
执行数据导出并提交给Hive:
def produce_exe_data(sql, s_tablename): global local_path_name local_path_01 = local_path_list[random.randrange(len(local_path_list))] + '/dataos_exp' local_path_name = "h2o_{0}_{1}".format(s_tablename, get_md5_str()).lower() local_path = local_path_01 + '/' + local_path_name if os.path.exists(local_path): cmd = 'rm -rf {}'.format(local_path) exe_system_cmd(cmd) os.mkdir(local_path) hdfs_path = "hdfs://prdhdfs/tmp/hdfs_to_rdb/{}".format(local_path_name) sql = sql.strip().strip(';') sql_list = sql.split(';') hive_conn = hive_connect() compress_sql = 'set hive.exec.compress.output=false' connect_database_to_commit('insert', hive_conn, compress_sql, '') for i in range(len(sql_list)): sql_str = sql_list[i] if i == len(sql_list)-1: # 如果是最后一条SQL,则执行insert overwrite directory sql_str='''INSERT OVERWRITE DIRECTORY '{0}' ROW FORMAT DELIMITED FIELDS TERMINATED BY '\u0001' COLLECTION ITEMS TERMINATED BY '\n' MAP KEYS TERMINATED BY ':' {1} '''.format(hdfs_path, sql_str) connect_database_to_commit('insert', hive_conn, sql_str, '') if hive_conn: hive_conn.close() cmd = ''' hdfs dfs -get {}/* {} '''.format(hdfs_path, local_path) exe_system_cmd(cmd) return local_path, hdfs_path
4. 多线程数据传输
利用多线程加速数据传输过程:
def thread_exe_exchange_data(g_tablename, flag,http://www.devze.com local_path, hdfs_path): global rdb_conn rdb_conn = get_rdb_database_conn() if flag.upper() == 'T': presql = 'truncate table {}'.format(g_tablename) connect_database_to_commit('insert', rdb_conn, presql, '') if ds_type.lower() == 'oracle': global oracle_table_field oracle_table_field = get_oracle_table_fields() localtime = str(time.strftime("%Y%m%d", time.localtime())) ora_dir = "/data03/datafile/sqlldrdata/{0}/".format(localtime) if not os.path.exists(ora_dir): os.mkdir(ora_dir) file_lists = os.listdir(local_path) global exp_num_list global log_list global exception_list exp_num_list = [] log_list = [] exception_list = [] thread_list = [] for file_name in file_lists: thread_list.append(local_path + '/' + file_name) pool = threadpool.ThreadPool(5) requests = threadpool.makeRequests(exchange_data, thread_list) [pool.putRequest(req) for req in requests] pool.wait() if exception_list: delete_local_path(local_path, hdfs_path) sys.exit(2) print('数据导出完成,导出数据总量为:{}'.format(sum(exp_num_list)))
完整python脚本
#!/bin/bash sql=$1 #导出数据sql s_tablename=$2 #源表 ds_name=$3 #目标库 t_tablename=$4 #目标表 temptable="h2o_"`date +%s%N | md5sum | head -c 16` #构造一个时间戳 filename=${s_tablename}_${temptable} #文件名 path="hdfs://prdhdfs/tmp/hdfs_to_rdb/$filename/" local_path="/data02/dcadmin/scripts/dataos_scripts/data_exp" flag=$5 #t TRUNCATE #hadoop fs -mkdir $path # 参数sql0为 待执行SQL echo "$sql" sql1=`echo "$sql"|cut -d ";" -f2` sql0=`echo "$sql"|cut -d ";" -f1` sql0="$sql0;insert overwrite directory '${path}' stored as ORC $sql1" echo "$sql0" # 向Hive提交HQL kinit -kt /data02/dcadmin/keytab_shengchan/dcadmin.keytab dcadmin@SC.COM #beeline <<EOF #!connect jdbc:hive2://devdataosambari:2181,devdataosnn1:2181,devdataosnn2:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2 #$sql0 beeline -u "jdbc:hive2://prdnn1.yxbdprd.sc.ctc.com:2181,prdnn2.yxbdprd.sc.ctc.com:2181,prdrm1.yxbdprd.sc.ctc.com:2181,prdDB2.yxbdprd.sc.ctc.com:2181,prddb1.yxbdprd.sc.ctc.com:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2" -e "set tez.queue.name=offline;$sql0 distribute by rand()" # 获取目标数据源地址 #eval $(mysql -h 10.251.88.71 -udacp -pdacp123456 dacp_dev -e "select case when ds_type = 'mysql' then concat('jdbc:mysql://', ds_inst_loc, '/',json_unquote(json_extract(ds_conf,'$.physicalDbName')),'?characterEncoding=UTF-8') #when ds_type = 'oracle' then concat('jdbc:oracle:thin:@', ds_inst_loc, '/',json_unquote(json_extract(ds_conf,'$.physicalDbName'))) #when ds_type = 'pg' then concat('jdbc:postgresql://', ds_inst_loc, '/',json_unquote(json_extract(ds_conf,'$.physicalDbName'))) end jdbc_url, #ds_acct, ds_auth, case when ds_type = 'mysql' then 'mysqlwriter' when ds_type = 'oracle' then 'oraclewriter' when ds_type = 'pg' then 'postgresqlwriter' end ds_type #from dacp_meta_datasource #where ds_type in ('mysql','oracle','pg') #and ds_name = '$ds_name'" | awk 'NR== 2 {printf("jdbc_url=%s; ds_acct=%s; ds_auth=%s; ds_type=%s",$1,$2,$3,$4)}') re=$(PGPASSWORD=jxFgCKv9GJw2ohS3 psql -h 10.251.110.104 -p 18921 -U dacp -d dacp -t <<EOF SELECT CASE WHEN ds_type = 'mysql' THEN CONCAT ('jdbc:mysql://' ,ds_inst_loc,'/',(ds_conf::json ->>'physicalDbName'),'?characterEncoding=UTF-8') WHEN ds_type = 'oracle' THEN ds_conf::json ->> 'url' WHEN ds_type = 'pg' THEN CONCAT ('jdbc:postgresql://',ds_inst_loc,'/',(ds_conf::json ->>'physicalDbName')) END jdbc_url ,ds_acct ,ds_auth ,CASE WHEN ds_type = 'mysql' THEN 'mysqlwriter' WHEN ds_type = 'oracle' THEN 'oraclewriter' WHEN ds_type = 'pg' THEN 'postgresqlwriter' END ds_type FROM dacp_dev.dacp_meta_datasource WHERE ds_type IN ('mysql', 'oracle', 'pg') AND upper(trim(ds_name)) = upper(trim('$ds_name')) EOF ) eval $(echo $re| awk '{printf("jdbc_url=%s; ds_acct=%s; ds_auth=%s; ds_type=%s",$1,$3,$5,$7)}') #eval $(java -jar /data01/etl/scripts/exec_aes.jar $ds_auth | awk ' {printf("pw=%s;",$1)}') #pw=`java -jar $local_path/exec_aes.jar $ds_auth` pw=`java -Dpwd=${ds_auth} -jar $local_path/AesCipher-1.0.jar` preSQL="select * from $t_tablename where 1=-1" if [ "$flag" = "T" ];then preSQL="truncate table $t_tablename" fi echo "preSQL=$preSQL" python $local_path/datax/bin/datax.py -p "-Dpath=$path -Dwriter=$ds_type -Drdb_user=$ds_acct -Drdb_pass=\"$pw\" -Drdb_jdbc=\"$jdbc_url\" -Drdb_table=$t_tablename -DpreSql=\"$preSQL\"" $local_path/hdfs_to_rdb.json # -*- coding:utf-8 -*- import time import datetime import os import threadpool import commands import calendar import random import pymssql import pymysql import cx_Oracle import psycopg2 import socket from pyhdfs import HdfsClient from hashlib import md5 import sys reload(sys) sys.setdefaultencoding('utf8') sys.path.append('/data02/dcadmin/scripts/common') from connect_postgresql import postgresql_connect from connect_hive import hive_connect pg_conn_str='dataos_71_pg_dev' # 本地磁盘目录,文件随机选择一个目录 local_path_list=['/data01','/data02','/data03','/data04','/data05'] def close_datadb_conn(): if rdb_conn: rdb_conn.close() def connect_database_to_select(conn,sql): # print('\r\n执行SQL:{}'.format(sql)) cursor = conn.cursor() try: cursor.execute(sql) #cursor.execute(sql.decode('utf-8').encode('gbk')) result = cursor.fetchall() conn.commit() return result except Exception as e: print('SQL执行错误:{},执行SQL:{}'.format(str(e),sql)) sys.exit(2) finally: cursor.close() def connect_database_to_commit(exe_type,conn,sql,insert_list): # print('\r\n执行SQL:{}'.format(sql)) cursor = conn.cursor() try: if exe_type.lower() in ('delete','insert'): cursor.execute(sql) conn.commit() print('执行SQL:{}'.format(sql)) elif exe_type.lower()=='insertmany': cursor.executemany(sql, insert_list) conn.commit() except Exception as e: print('SQL执行错误c:{},执行SQL:{}'.format(str(e),sql)) print(sql) sys.exit(2) finally: cursor.close() # 执行系统命令 def exe_system_cmd(cmd): status,output=commands.getstatusoutput(cmd) if status!=0: print('命令{}:执行失败,请检查!'.format(cmd)) print('失败日志:{}'.format(output)) sys.exit(2) return output # 返回MD5串 def get_md5_str(): # 时间戳 ts = calendar.timegm(time.gmtime()) md5_str=md5(str(ts).encode(encoding='utf-8')).hexdigest() return md5_str # 判断输入参数 def judge_input_parameters_num(): if len(sys.argv)!=6: print('参数有问题,请检查!') print(sys.argv) sys.exit(2) else: sql =sys.argv[1] # 导出数据sql s_tablename =sys.argv[2] # 源表名 ds_name =sys.argv[3] # 目标库 g_tablename =sys.argv[4] # 目标表 flag =sys.argv[5] # A:append,T:truncate return sql,s_tablename,ds_name,g_tablename,flag # 执行SQL语句,生成HDFS文件 def produce_exe_data(sql,s_tablename): global local_path_name # 1、创建本地文件夹 # 随机选择一个磁盘目录:'/data01','/data02','/data03','/data04','/data05' local_path_01 = local_path_list[random.randrange(len(local_path_list))]+'/dataos_exp' # /data01/dataos_exp local_path_name="h2o_{0}_{1}".format(s_tablename,get_md5_str()).lower() # local_path_name='h2o_app_hub_resource_value_level_d_e6963bad13299e939a3a4cc2b2a26a47' local_path=local_path_01+'/'+local_path_name if os.path.exists(local_path): cmd='rm -rf {}'.format(local_path) exe_system_cmd(cmd) os.mkdir(local_path) # 创建hdfs文件夹 hdfs_path="hdfs://prdhdfs/tmp/hdfs_to_rdb/{}".format(local_path_name) # 处理SQL,先去除两边的空格,再去除两边的分号 sql=sql.strip().strip(';') sql_list=sql.split(';') # 依次执行切分的SQL hive_conn=hive_connect() # 连接生产HIVE compress_sql='set hive.exec.compress.output=false' print('执行SQL:{}'.format(compress_sql)) connect_database_to_commit('insert',hive_conn,compress_sql,'') for i in range(len(sql_list)): sql_str=sql_list[i] if i==len(sql_list)-1: # 如果是最后一条SQL,则执行insert overwrite directory sql_str='''INSERT OVERWRITE DIRECTORY '{0}' ROW FORMAT DELIMITED FIELDS TERMINATED BY '\\u0001' COLLECTION ITEMS TERMINATED BY '\\n' MAP KEYS TERMINATED BY ':' {1} '''.format(hdfs_path,sql_str) print('执行SQL:{}'.format(sql_str)) connect_database_to_commit('insert',hive_conn,sql_str,'') # 关闭HIVE连接 if hive_conn: hive_conn.close() # 将hdfs文件从hdfs_path路径get到local_path下 cmd=''' hdfs dfs -get {}/* {} '''.format(hdfs_path,local_path) exe_system_cmd(cmd) print('文件GET成功,当前主机:{},数据临时文件夹:{}'.format(socket.gethostname(),local_path)) return local_path,hdfs_path # 获取目标端的连接信息 def get_rdb_conn_msg(ds_name): global ds_type global ds_acct global ds_auth global host global port global database global jdbc_url sql=''' SELECT ds_name ,ds_type ,ds_acct ,ds_auth ,split_part(ds_inst_loc,':',1) as host ,case when split_part(ds_inst_loc,':',2)='' and ds_type='oracle' then '1521' else split_part(ds_inst_loc,':',2) end as port ,case when lower(ds_type)='oracle' then split_part(replace(replace(replace(ds_conf::json->>'url','jdbc:oracle:thin:@',''),':1521',''),'/',':'),':',2) else ds_conf::json->>'physicalDbName' end as database ,CASE WHEN ds_type = 'mysql' THEN CONCAT ('jdbc:mysql://' ,ds_inst_loc,'/',(ds_conf::json ->>android'physicalDbName'),'?characterEncoding=UTF-8') WHEN ds_type = 'oracle' THEN ds_conf::json ->>'url' WHEN ds_type = 'pg' THEN CONCAT ('jdbc:postgresql://',ds_inst_loc,'/',(ds_conf::json ->>'physicalDbName')) END as jdbc_url FROM dacp_dev.dacp_meta_datasource WHERE ds_type IN ('mysql', 'oracle', 'pg') AND upper(trim(ds_name)) = upper(trim('{}')) '''.format(ds_name) pg_conn=postgresql_connect(pg_conn_str) results=connect_database_to_select(pg_conn,sql) # print(results) if not results: print('未查询到数据库连接信息,请检查,DS_NAME:{}'.format(ds_name)) sys.exit(2) # 关闭数据库连接 if pg_conn: pg_conn.close() # 解密密码 cmd='''java -Dpwd='{0}' -jar /data02/dcadmin/scripts/common/AesCipher-1.0.jar'''.format(results[0][3]) pw=exe_system_cmd(cmd).replace('\r','').replace('\n','') ds_type = results[0][1]ds_acct = results[0][2] ds_auth = pw host = results[0][4] port = int(results[0][5]) database= results[0][6] jdbc_url= results[0][7] # 判断连接的数据库类型,并返回数据库连接conn def get_rdb_database_conn(): dbms_conn=None try: if ds_type.upper()=='SQLSERVER': dbms_conn = pymssql.connect(host=host , user=ds_acct, password=ds_auth, port=port, database=database, charset='utf8') elif ds_type.upper()=='MYSQL': dbms_conn = pymysql.connect(host=host , user=ds_acct, passwd=ds_auth , port=port, database=database, charset='utf8', local_infile=True) elif ds_type.upper()=='ORACLE': listener = '{0}:{1}/{2}'.format(host,port,database) print('listener:{}'.format(listener)) dbms_conn = cx_Oracle.connect(ds_acct,ds_auth,listener,encoding='utf-8') elif ds_typjse.upper() in ('POSTGRESQL','PG'): dbms_conn = psycopg2.connect(host=host , user=ds_acct, password=ds_auth , port=port, database=database, client_encoding='utf8') else: print("未知源端数据库类型{}~~~~~,请检查!".format(ds_type.upper())) sys.exit(2) except Exception as e: print('{0},{1}数据库连接失败,请检查~~~~~~!'.format(ds_type,ds_name)) print('报错日志:{}'.format(e)) print(host) print(ds_acct) print(ds_auth) print(port) print(database) sys.exit(2) return dbms_conn def thread_exe_exchange_data(g_tablename,flag,local_path,hdfs_path): global rdb_conn rdb_conn=get_rdb_database_conn() # 执行预处理SQL if flag.upper()=='T': presql='truncate table {}'.format(g_tablename) print('执行SQL:{}'.format(presql)) connect_database_to_commit('insert',rdb_conn,presql,'') # 获取Oracle表结构 if ds_type.lower() in ('oracle'): global oracle_table_field # oracle 表结构 oracle_table_field=get_oracle_table_fields() # 创建ctl,bad,log存放目录 global ora_dir localtime = str(time.strftime("%Y%m%d", time.localtime())) ora_dir = "/data03/datafile/sqlldrdata/{0}/".format(localtime) if not os.path.exists(ora_dir): os.mkdir(ora_dir) # 文件列表 file_lists=os.listdir(local_path) # 多线程导数 global exp_num_list # 存储导数数量 global log_list # 存储多线程的日志信息 global exception_list # 存储多线程异常信息 exp_num_list =[] log_list =[] exception_list=[] thread_list=[] # 存储多线程任务 for file_name in file_lists: thread_list.append(local_path+'/'+file_name) # 创建线程池 pool=threadpool.ThreadPool(5) # 存放任务列表 requests = threadpool.makeRequests(exchange_data,thread_list) [pool.putRequest(req) for req in requests] pool.wait() # 处理异常 if exception_list: # 导数出现异常,删除文件 delete_local_path(local_path,hdfs_path) print('导数失败,异常日志信息如下:') for except_msg in exception_list: print(except_msg) sys.exit(2) # 打印多线程日志 # log_list.sort() # for log in log_list: # print(log) # 打印导出结果 print('数据导出完成,导出数据总量为:{}'.format(sum(exp_num_list))) # 获取Oracle表结构 def get_oracle_table_fields(): sql = ''' SELECT COLUMN_NAME || SUFFIX AS AA FROM (SELECT A.COLUMN_NAME ,A.COLUMN_ID ,CASE WHEN UPPER(A.DATA_TYPE) LIKE '%DATE%' THEN ' DATE "yyyy-mm-dd hh24:mi:ss"' WHEN UPPER(A.DATA_TYPE) LIKE '%TIMESTAMP%' THEN ' DATE "yyyy-mm-dd hh24:mi:ss.ff"' WHEN UPPER(A.DATA_TYPE) LIKE '%VARCHAR%' THEN ' CHAR(3000)' ELSE '' END AS SUFFIX FROM ALL_TAB_COLUMNS A WHERE UPPER(A.OWNER||'.'||A.TABLE_NAME) = UPPER(TRIM('{0}')) ORDER BY A.COLUMN_ID) ''' if '.' in g_tablename: sql=sql.format(g_tablename) else: sql=sql.format(database+'.'+g_tablename) oracle_table_fields=connect_database_to_select(rdb_conn,sql) if not oracle_table_fields: print('未查询到表结构,表名:{}'.format(g_tablename)) sys.exit(2) oracle_table_field = ",\n".join([str(list[0]) for list in oracle_table_fields]) return oracle_table_field # 执行单个导出任务 def exchange_data(file_path): try: output='' # 执行导数任务 if ds_type.lower() in ('pg','postgresql','telpg','antdb'): cmd='''psql "port={0} host={1} user={2} dbname={3} password={4} " -c "\copy {5} from '{6}' DELIMITER AS E'\u0001' " ''' cmd=cmd.format(port,host,ds_acct,database,ds_auth,g_tablename,file_path) status,output=commands.getstatusoutput(cmd) if status!=0: exception_list.append('命令{}:执行失败,请检查!失败日志:{}'.format(cmd,output)) elif ds_type.lower() in ('mysql','teldb'): mysql_conn = pymysql.connect(host=host , user=ds_acct, passwd=ds_auth , port=port, database=database, charset='utf8', local_infile=True) mysql_cursor=mysql_conn.cursor() sql='SET NAMES UTF8' mysql_cursor.execute(sql) sql='''load data local iwww.devze.comnfile '{}' into table {} fields terminated by X'01' lines terminated by '\\n' '''.format(file_path,g_tablename) #print(sql) output=mysql_cursor.execute(sql) mysql_conn.commit() mysql_conn.close() # cmd='''mysql -h {} -P {} -u {} -p{} -D {} -e "SET NAMES UTF8;load data local infile '{}' into table {} fields terminated by X'01' lines terminated by '\\n'" ''' # cmd=cmd.format(host,port,ds_acct,ds_auth,database,file_path,g_tablename) elif ds_type.lower() in ('oracle'): tns='''\'{}/"{}"\'@{}:1521/{}'''.format(ds_acct,ds_auth,host,database) ora_file_name=file_path.replace(local_path+'/','') ora_file_path=ora_dir+'/'+local_path_name+'_'+ora_file_name control_file = ora_file_path+".ctl" log_file = ora_file_path+".log" bad_file = ora_file_path+".bad" dis_file = ora_file_path+".dis" content =''' UNRECOVERABLE LOAD DATA CHARACTERSET AL32UTF8 APPEND INTO TABLE {0} FIELDS TERMINATED BY x'01' TRAILING NULLCOLS ({1}) '''.format(g_tablename, oracle_table_field) # 如果控制文件存在,则先删除 if os.path.exists(control_file): cmd='rm -rf {}'.format(control_file) exe_system_cmd(cmd) # 再创建控制文件 with open(control_file, "w") as file: file.write(content) cmd='''export ORACLE_HOME=/data03/apps/db_1;export LD_LIBRARY_PATH=$ORACLE_HOME/lib:$LD_LIBRARY_PATH;cat {0} | /data03/apps/db_1/bin/sqlldr userid={1} control={2} data=\\"-\\" log={3} bad={4} discard={5} errors=0 direct=true parallel=true multithreading=true columnarrayrows=100000 STREAMSIZE=20971520 readsize=20971520 bindsize=20971520 date_cache=0 ''' cmd=cmd.format(file_path, tns, control_file, log_file, bad_file, dis_file) status,output=commands.getstatusoutput(cmd) if status!=0: exception_list.append('命令{}:执行失败,请检查!失败日志:{}'.format(cmd,output)) else: exception_list.append('目标端数据库类型为:{},此类型暂未支持!'.format(db_type.lower())) # 计算导出行数 if ds_type.lower() in ('pg','postgresql','telpg','antdb'): file_row_num=int(output.split('COPY ')[1].strip()) exp_num_list.append(file_row_num) elif ds_type.lower() in ('oracle'): try: output=output.decode('gbk') except: output=output file_row_num=int(output.split('逻辑记录计数 ')[1].replace('。','').strip()) exp_num_list.append(file_row_num) elif ds_type.lower() in ('mysql','teldb'): exp_num_list.append(output) # 插入日志 log_list.append(output) except Exception as e: exception_list.append(e) def delete_local_path(local_path,hdfs_path): cmd='rm -rf {}'.format(local_path) exe_system_cmd(cmd) print('本地文件夹删除成功。') cmd='hdfs dfs -rm -r {}'.format(hdfs_path) exe_system_cmd(cmd) print('HDFS文件夹删除成功。') if __name__ == '__main__': starttime = datetime.datetime.now() print('开始时间:{0}'.format(starttime.strftime('%Y-%m-%d %H:%M:%S'))) # 1、判断输入参数 sql,s_tablename,ds_name,g_tablename,flag=judge_input_parameters_num() # 2、执行SQL,生产文件,并返回本地目录 local_path,hdfs_path=produce_exe_data(sql,s_tablename) hdfs_time=datetime.datetime.now() #print('当前时间:{}'.format(hdfs_time)) print("生成HDFS文件耗时:{0}秒".format((hdfs_time - starttime).seconds)) # 3、获取目标端连接信息(host,port等) get_rdb_conn_msg(ds_name) # 4、执行导数任务 thread_exe_exchange_data(g_tablename,flag,local_path,hdfs_path) # 5、删除本地文件夹 delete_local_path(local_path,hdfs_path) # 6、关闭数据库连接 close_datadb_conn() endtime = datetime.datetime.now() print('结束时间:{0}'.format(endtime.strftime('%Y-%m-%d %H:%M:%S'))) print('导数耗时:{0}秒'.format((endtime - hdfs_time).seconds)) print("一共耗时:{0}秒".format((endtime - starttime).seconds))
总结
整个脚本有效地实现了从HDFS到关系数据库的数据迁移,确保数据的完整性和一致性。首先通过Hive导出数据,再利用多线程和DataX工具导入到目标数据库。本地化和多线程处理使过程更高效,适合大数据处理和数据仓库迁移。
请务必按需调整脚本中的具体参数和配置以适应你的环境和数据架构。
以上就是python从Hadoop HDFS导出数据到关系数据库的详细内容,更多关于python HDFS导出数据到数据库的资料请关注编程客栈(www.devze.com)其它相关文章!
精彩评论