PySaprk saves DataFrame data as Hive partition table

Create a SparkSession

from pyspark.sql import SparkSession

spark = SparkSession.builder.enableHiveSupport().appName('test_app').getOrCreate()
sc = spark.sparkContext
hc = HiveContext(sc)

1. Spark creates partition table

# You can change append to overwrite, so that if the table already exists, the previous table will be deleted and a new table will be created
df.write.saveAsTable(save_table, mode='append', partitionBy=['pt_day'])

saveAsTable will automatically create a hive table. partitionBy specifies the partition field. It is saved in the format of parquet file by default. For dataframes generated from files, field types are also automatically converted, sometimes to types that do not meet the requirements.

If you need to customize the field type, you can specify the type when creating the DataFrame:

from pyspark.sql.types import StringType, StructType, BooleanType, StructField

schema = StructType([
    StructField("vin", StringType(), True),
    StructField("cust_id", StringType(), True),
    StructField("is_maintain", BooleanType(), True),
    StructField("is_wash", BooleanType(), True),
    StructField("pt_day", StringType(), True),
]
)

data = pd.read_csv('/path/to/data.csv', header=0)
df = spark.createDataFrame(data, schema=schema)
# The specified data type is written to the hive table
df.write.saveAsTable(save_table, mode='append', partitionBy=['pt_day'])

2. Insert data into an existing table

2.1 partition table created by spark

In fact, this situation is the same as the table creation statement
No need to turn on dynamic partition

df.write.saveAsTable(save_table, mode='append', partitionBy=['pt_day'])

2.2 table created in Hive command line or Hive sql statement

  • This mainly means that the file format of the table created by Spark is different. The default file format of Spark is PARQUET, and the default file format of Hive on the command line is TEXTFILE. This difference also causes exceptions.
  • pyspark.sql.utils.AnalysisException: u"The format of the existing table default.csd_test_partition is `HiveFileFormat`. It doesn't match the specified format `ParquetFileFormat`.;"
    

     

  • Dynamic partition needs to be enabled. If it is not enabled, there will be exceptions:
  • org.apache.spark.SparkException: Dynamic partition strict mode requires at least one static partition column. To turn this off set hive.exec.dynamic.partition.mode=nonstrict
    

    code

  • # Table building
    sql_str = "CREATE TABLE IF NOT EXISTS default.csd_test_partition (cust_id string, vin string, is_maintain boolean, is_wash boolean) partitioned by (pt_day string)"
    hc.sql(sql_str) 
    # Turn on dynamic partition
    spark.sql("set hive.exec.dynamic.partition.mode = nonstrict")
    spark.sql("set hive.exec.dynamic.partition=true")
    # Specify file format
    df.write.saveAsTable(save_table, format='Hive', mode='append', partitionBy=['pt_day'])
    

    Create from temporary view

  • # If the data contains a partition field, do not specify a partition. Turn on dynamic partition
    df1.createTempView('temp_table')
    # Or DF1. Registertemptable ('temp table ')
    hc.sql('insert into default.csd_test_partition select * from temp_table')
    
    # If the data does not contain the partition field, you can directly specify the partition to insert, and you can not open the dynamic partition
    df2 = df1.drop('pt_day')
    df2.registerTempTable('temp_table')
    hc.sql('insert into default.csd_test_partition partition(pt_day="20190516") select * from temp_table1')
    

    3. Summary

    3.1 df.write.saveAsTable() method

    Mode = 'overwrite' mode, a new table will be created. If the table name already exists, it will be deleted and the whole table will be rewritten. Mode = 'append' mode will directly add new data to the original data.

    3.2 inserting SQL statements

    sql statement insertion can only create a table first, and then execute the insert operation.

    INSERT INTO tableName PARTITION(pt=pt_value) select * from temp_table is similar to append.

    The statement of insert overwrite table tablename partition (PT = Pt "value) select * from temp" table can specify the partition to be rewritten without rewriting the whole table.

    The sql statement is more flexible than the. write.saveAsTable() method.

    3.3 save the file number setting of hive table

    By default, a large number of small files will be saved in the hive partition table. repartition() will be used to repartition the DataFrame before saving, so that the number of saved files can be controlled.

    For example:

  • df.repartition(5).write.saveAsTable(...)
    //or
    df.repartition(5).registerTempTable('temp_table')
    

     

Tags: Big Data SQL hive Spark Apache

Posted on Mon, 11 May 2020 01:18:45 -0700 by [xNet]DrDre