Note:
This topic has been translated from a Chinese forum by GPT and might contain errors.Original topic: 大佬们!sparksql将1.7亿hive表数据写入tidb表中,写入速度可以优化吗
|
username: w958045214
Spark code:
object Xunzhan2ToTidb {
def main(args: Array[String]) {
val spark = SparkSession.builder
.appName("Xunzhan2ToTidb")
//.master("local[*]")
.config("spark.driver.allowMultipleContexts", true)
.config("hive.exec.dynamic.partition.mode", "nonstrict")
.enableHiveSupport()
.getOrCreate()
//spark.sparkContext.setLogLevel("ERROR")
var part = ""
part = args(0)
val customer = spark.sql(
s"""
|select
| master_user_id as user_id --User ID
| ,story_id --Story ID
| ,100 as ks_play_end_time --Play end time
| ,100 as ks_media_file_play_start_time --Play start time
| ,1 as ks_media_finish_state --Play finish state
| ,listen_times as ks_media_duration_info --Play duration
| ,100 as ks_play_start_time --Play start time
| ,100 as ks_media_file_play_end_time --Media file play start time
| ,duration as ks_media_file_duration --File total duration
| ,100 as ks_media_current_progress --Play percentage
| ,from_unixtime(unix_timestamp(), 'yyyy-MM-dd HH:mm:ss') as create_time --Create time
| ,from_unixtime(unix_timestamp(), 'yyyy-MM-dd HH:mm:ss') as update_time --Update time
| ,1 as media_type --Media type
| ,unix_timestamp() as local_time_ms --Local time
| ,unix_timestamp() as server_time --Server time
|from tmp.tmp_xunzhan_history_play_2year_active_sum_a where user_part = '${part}'
|
|""".stripMargin)
println("Partition " + part + ", data volume to be written " + customer.count())
// you might repartition source to make it balance across nodes
// and increase concurrency
val df1 = customer.repartition(32)
df1.write
.mode(saveMode = "append")
.format("jdbc")
.option("driver", "com.mysql.jdbc.Driver")
// replace host and port as your and be sure to use rewrite batch
.option("url", "jdbc:mysql://:4000/ks_mission_data?rewriteBatchedStatements=true")
.option("useSSL", "false")
// As tested, 150 is good practice
.option(JDBCOptions.JDBC_BATCH_INSERT_SIZE, 1000) // Set batch insert size
.option("dbtable", "") // database name and table name here
.option("isolationLevel", "NONE") // Do not open transactions, recommended to set isolationLevel to NONE if you have a large DF to load.
.option("user", "") // TiDB user here
.option("password", "")
.save()
println("Write successful")
spark.stop()
}
}
Parameters:
--master yarn --deploy-mode client --queue hive2 --driver-memory 2g --executor-memory 30g --num-executors 20 --executor-cores 20
--class com.kaishu.warehouse.service.Xunzhan2ToTidb ossref://bigdata-mapreduce/res/spark/data_market_test-3c22fb43fb8.jar 8
The fastest time is 45 minutes to complete, slower takes more than an hour.
Yarn UI: