开发者

pyspark操作hive分区表及.gz.parquet和part-00000文件压缩问题

开发者 https://www.devze.com 2022-11-29 13:25 出处:网络 作者: 土豆啊你个马铃薯
目录pyspark操作hive表1saveAsTable写入2insertInto写入2.1问题说明2.2解决办法3saveAsTextFile写入直接操作文件pyspark操作hive表...
目录
  • pyspark 操作hive表
    • 1> saveAsTable写入
    • 2> insertInto写入
      • 2.1> 问题说明
      • 2.2> 解决办法
    • 3>saveAsTextFile写入直接操作文件

    pyspark 操作hive表

    pyspark 操作hive表,hive分区表动态写入;最近发现spark动态写入hive分区,和saveAsTable存表方式相比,文件压缩比大约 4:1。针对该问题整理了 spark 操作hive表的几种方式。

    1> saveAsTable写入

    saveAsTable(self, name, format=None, mode=None, partitionBy=None, **options)

    示例:

    df.write.saveAsTable("表名",mode='overwrite')

    注意:

    1、表不存在则创建表,表存在全覆盖写入;

    2、表存在,数据字段有变化,先删除后重新创建表;

    3、当正在存表时报错或者终止程序会导致表丢失;

    4、数据默认采用parquet压缩,文件名称 part-00000-5efbfc08-66fe-4fd1-bebb-944b34689e70.gz.parquet

    数据文件在hdfs上显示:

    pyspark操作hive分区表及.gz.parquet和part-00000文件压缩问题

    2> insertInto写入

    insertInto(self, tableName, overwrite=False):

    示例:

    # append 写入
    df.repartition(1).write.partitionBy('dt').insertInto("表名")
    # overwrite 写入
    df.repartition(1).write.partitionBy('dt').insertInto("表名",overwrite=True)
    # 动态分区使用该方法

    注意:

    1、df.write.mode("overwrite").partitionBy("dt").insertInto("表名") 不会覆盖数据

    2、需要表必须存在且当前DF的schema与目标表的schema必须一致

    3、插入的文件不会压缩;文件以part-00....结尾。文件较大

    数据文件在hdfs上显示:

    pyspark操作hive分区表及.gz.parquet和part-00000文件压缩问题

    2.1> 问题说明

    两种方式存储数据量一样的数据,磁盘文件占比却相差很大,.gz.parquet 文件 相比 part-00000文件要小很多。想用spark操作分区表,又想让文件压缩,百度了一些方式,都没有解决。

    从stackoverflow中有一个类似的问题 Spark compression when writing to external Hive table 。用里面的方法并没有解决。

    最终从hive表数据文件压缩角度思考,问题得到解决。

    hive 建表指定压缩格式

    下面是hive parquet的几种压缩方式

    -- 使用snappy
    CREATE TABLE if not exists ods.table_test(
        id string,
        open_time string
    	)
    COMMENT '测试'
    PARTITIONED BY (`dt` string COMMENT '按天分区')
    row format delimited fields twww.cppcns.comerminated by '\001' 
    STORED AS PARQUET 
    TBLPROPERTIES ('parquet.compression'='SNAPPY');
    
    -- 使用gzip
    CREATE TABLE if not exists ods.table_test(
        id string,
        open_time string
    	)
    COMMENT '测试'
    PARTITIONED BY (`dt` string COMMEhttp://www.cppcns.comNT '按天分区')
    row format delimited fields terminated by '\001' 
    STORED AS PARQUET 
    TBLPROPERTIES ('parquet.compression'='GZIP');
     
    -- 使用uncompressed
    CREATE TABLE if not exists ods.table_test(
        id string,
        open_time string
    	)
    COMMENT '测试'
    PARTITIONED BY (`dt` string COMMENT '按天分区')
    row format delimited fields terminated by '\001' 
    STORED AS PARQUET 
    TBLPROPERTIES ('parquet.compression'='UNCOMPRESSED');
    
     
    -- 使用默认
    CREATE TABLE if not exists ods.table_test(
        id string,
        open_time string
    	)
    COMMENT '测试'
    PARTITIONED BY (`dt` string COMMENT '按天分区')
    row format delimited fields terminated by '\001' 
    STORED AS PARQUET;
     
    -- 设置参数 set parquet.compression=SNAPPY;

    2.2&g编程客栈t; 解决办法

    建表时指定TBLPROPERTIES,采用gzip 压缩

    示例:

    drop table if exists ods.table_test
    CREATE TABLE if not exists ods.table_test(
    id string,
    open_time string编程客栈
    )
    COMMENT '测试'
    PARTITIONED BY (`dt` string COMMENT '按天分区')
    row format delimited fields terminated by '\001' 
    STORED AS PARQUET 
    TBLPROPERTIES ('parquet.compression'='GZIP');

    执行效果

    数据文件在hdfs上显示:

    pyspark操作hive分区表及.gz.parquet和part-00000文件压缩问题

    可以看到文件大小占比已经和 *.gz.parquet 文件格式一样了

    3>saveAsTextFile写入直接操作文件

    saveAsTextFile(self, path, compressionCodecClass=None)

    该方式通过rdd 以文件形式直接将数据存储在hdfs上。

    示例:

    rdd.saveAsTextFile('hdfs://表全路径')

    文件操作更多方式见官方文档

    到此这篇关于pyspark操作hive分区表及.gz.parquet和part-0编程客栈0000文件压缩问题的文章就介绍到这了,更多相关pyspark hive分区表parquet内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

    0

    精彩评论

    暂无评论...
    验证码 换一张
    取 消

    关注公众号