import package cn.net.dfo.dw.driver
import cn.net.dfo.dw.utils.DateHelper
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions
import org.apache.spark.storage.StorageLevel
import org.slf4j.{Logger, LoggerFactory}
object BatchCollectTotal {
val log: Logger = LoggerFactory.getLogger(this.getClass.getName)
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().
//setIfMissing("spark.master", "local[*]").
setIfMissing("spark.app.name", getClass.getName).
setIfMissing("spark.driver.allowMultipleContexts", "true").
setIfMissing("spark.sql.extensions", "org.apache.spark.sql.TiExtensions").
// 生产
setIfMissing("spark.tispark.pd.addresses", "ip:2379").
// 测试
// setIfMissing("spark.tispark.pd.addresses", "ip:2379").
setIfMissing("spark.driver.maxResultSize", "16g").
setIfMissing("spark.debug.maxToStringFields", "150").
// 生产
setIfMissing("spark.tispark.tidb.addr", "tidb1.prd.db").
setIfMissing("spark.tispark.tidb.port", "3390").
// 测试
// setIfMissing("spark.tispark.tidb.addr", "ip.122").
// setIfMissing("spark.tispark.tidb.port", "4000").
//如果要同时操作hive和tidb,加上enableHiveSupport方法。
// 如果有重名库 要操作Tidb表要加上前缀已区分是hive还是tidb的表
setIfMissing("spark.tispark.write.allow_spark_sql", "true").
setIfMissing("spark.tispark.db_prefix", "tidb_").
setIfMissing("spark.sql.crossJoin.enabled", "true").
setIfMissing("spark.tispark.use.tiflash", "true")
val spark = SparkSession.builder.config(sparkConf).getOrCreate()
var batchTime = ""
if (args.size == 2) {
batchTime = args(0) + " " + args(1) //yyyy-MM-dd HH:mm:ss
println("传入参数: " + batchTime)
} else {
batchTime = DateHelper.getCurrTimeSegment(10) //yyyy-MM-dd HH:mm:ss
println("默认参数:" + batchTime)
}
val historyTime = DateHelper.beforeMinute(batchTime, "yyyy-MM-dd HH:mm:ss", 10)
// T 日
val dateStr = batchTime.substring(0, 10)
// T-5 日
val date_t_5 = DateHelper.defineDayBefore(dateStr, -5, "yyyy-MM-dd")
println("batchTime => " + batchTime)
println("historyTime => " + historyTime)
try {
//
// deleteData(historyTime)
// (mdm_org_temp)
println(">>>>>>>>>>>>>>>>>>>>>>>>>>> ")
basicData(spark, batchTime)
getCarPeriod(spark, batchTime)
} catch {
case e: Exception =>
e.printStackTrace()
log.error(e.getMessage)
} finally {
spark.close()
System.exit(0)
}
}
def basicData(spark: SparkSession, batchTime: String) = {
val firstThreeDays = DateHelper.defineDayBefore(batchTime.substring(0, 10).replaceAll("-", ""), -3, "yyyyMMdd")
// 维度表
var sql =
s"""
|select org_code,
| org_name,
| region_code,
| region_name,
| transfer_code,
| transfer_name,
| transfer_prov_code,
| transfer_prov_name
| from (select org_code,
|
| else
| region_name
| end region_name,
| transfer_code,
| transfer_name,
| transfer_prov_code,
| transfer_prov_name,
| row_number() over(partition by org_code order by dt desc) rk
| from dim.t03_user_org_mdm_org
| where dt >= '${firstThreeDays}') m
| where rk = '1'
|""".stripMargin
/*var sql =
"""
|select sub_department_code org_code,
| sub_department_name org_name,
| region_code region_code,
| region_name region_name,
| transfer_code transfer_code,
| transfer_name transfer_name
| from default.dw_dim_dfo_org_trans
|""".stripMargin*/
println(sql)
spark.sql(sql).persist(StorageLevel.MEMORY_ONLY_SER).createOrReplaceTempView("temp_upcar_monitor_orgdim")
// spark.sql("select * from temp_upcar_monitor_orgdim limit 10").show()
sql =
s"""
|select /** BROADCAST(t) */ * from tidb_rpt.upcar_monitor_carno t
|""".stripMargin
spark.sql(sql).persist(StorageLevel.MEMORY_ONLY_SER).createOrReplaceTempView("temp_upcar_monitor_carno")
// spark.sql("select * from temp_upcar_monitor_carno limit 10").show()
}
def firstBatchFlag(historyTime: String) = {
val beginBatchTime = historyTime.split(" ")(1)
"09:00:00".equals(beginBatchTime)
}
/*def deleteData(historyTime: String) = {
if ("00:00:00".equals(historyTime.split(" ")(1))) {
val date = DateHelper.defineDayBefore(historyTime, -30, "yyyy-MM-dd")
@transient val conn = TidbClusterPool.getConnection
val stmt = conn.createStatement()
val sql = s"delete from tidb_ytrpt.upcar_monitor_carno where create_time<'${date}'"
println(sql)
stmt.execute(sql)
}
}*/
def getTotalDetail(spark: SparkSession, batchTime: String) = {
val dateStr = batchTime.substring(0, 10)
var ctStartDate = ""
// var ctEndDate = ""
var initDate = ""
if (DateHelper.theNextDay(batchTime) > 0) {
initDate = DateHelper.defineDayBefore(dateStr, -2, "yyyy-MM-dd") + " 09:00:00"
ctStartDate = DateHelper.defineDayBefore(dateStr, -1, "yyyy-MM-dd") + " 09:00:00"
// ctEndDate = dateStr + " 09:00:00"
} else {
initDate = DateHelper.defineDayBefore(dateStr, -1, "yyyy-MM-dd") + " 09:00:00"
ctStartDate = dateStr + " 09:00:00"
// ctEndDate = DateHelper.defineDayBefore(dateStr, 1, "yyyy-MM-dd") + " 09:00:00"
}
/*val sql =
s"""
|select t.waybillNo waybill_No,
| t.bagNo bag_No,
| t.containerNo container_No,
| t.expType exp_Type,
| t.orgCode org_Code,
| t.createTime create_Time,
| t.weight weight,
| t.createUserCode create_User_Code,
| t.createUserName create_User_Name,
| case when c.line_Name is null or c.line_name ='' then first_value(t.lineName) over(order by t.lineName) else c.line_name end line_Name,
| t.truckNo truck_No,
| t.truckTypeNew truck_Type_New,
| t.transCodeLink trans_Code_Link,
| t.predictDepartTime predict_Depart_Time,
| t.returnFlag return_Flag,
| t.datoubi datoubi,
| t.desRoute des_Route,
| t.desOrgCode des_Org_Code,
| t.masterNodeCode master_Node_Code,
| t.targetWeight target_Weight,
| t.startRegionCode start_Region_Code,
| t.desRegionCode des_Region_Code,
| t.peizaiseoncdOrgCode peizaiseoncd_Org_Code,
| t.peizaisecondRegionCode peizaisecond_Region_Code,
| t.peizaistartOrgCode peizaistart_Org_Code,
| t.peizaistartRegionCode peizaistart_Region_Code,
| t.startProvCode start_Prov_Code,
| t.waybillNob waybill_Nob,
| t.firstNodeCode first_Node_Code,
| t.centerCarType center_Car_Type,
| t.routeLineName route_Line_Name,
| c.car_Type car_Type,
| c.fache_Time fache_Time,
| t.batchTime batch_Time,
| '${batchTime}' rpt_date
| from (
| select *,
| row_number() over(partition by waybillNo,orgCode,containerNo order by createtime desc) rn
| from tidb_ytrpt.upcar_monitor_detail
| where createTime>= '${ctStartDate}' and createTime < '${batchTime}'
|) t left join temp_upcar_monitor_carno c on c.car_no = t.containerNo
|where t.rn = 1
|""".stripMargin*/
/*val sql =
s"""
|select t.waybillNo waybill_No,
| t.bagNo bag_No,
| t.containerNo container_No,
| t.expType exp_Type,
| t.orgCode org_Code,
| t.createTime create_Time,
| t.weight weight,
| t.createUserCode create_User_Code,
| t.createUserName create_User_Name,
| t.line_name line_Name,
| t.truckNo truck_No,
| t.truckTypeNew truck_Type_New,
| t.transCodeLink trans_Code_Link,
| t.predictDepartTime predict_Depart_Time,
| t.returnFlag return_Flag,
| t.datoubi datoubi,
| t.desRoute des_Route,
| t.desOrgCode des_Org_Code,
| t.masterNodeCode master_Node_Code,
| t.targetWeight target_Weight,
| t.startRegionCode start_Region_Code,
| t.desRegionCode des_Region_Code,
| t.peizaiseoncdOrgCode peizaiseoncd_Org_Code,
| t.peizaisecondRegionCode peizaisecond_Region_Code,
| t.peizaistartOrgCode peizaistart_Org_Code,
| t.peizaistartRegionCode peizaistart_Region_Code,
| t.startProvCode start_Prov_Code,
| t.waybillNob waybill_Nob,
| t.firstNodeCode first_Node_Code,
| t.centerCarType center_Car_Type,
| t.routeLineName route_Line_Name,
| t.car_Type car_Type,
| t.fache_Time fache_Time,
| t.batchTime batch_Time,
| '${batchTime}' rpt_date
| from (
| select * from (
| select *,
| row_number() over(partition by waybillNo,orgCode,containerNo order by createtime desc) rn
| from (
| select t1.*, c.*
| from tidb_ytrpt.upcar_monitor_detail t1
| left join (select * from temp_upcar_monitor_carno where create_time<='${batchTime}') c on c.car_no = t1.containerno
| where t1.createTime>= '${ctStartDate}' and t1.createTime < '${batchTime}'
| union all
| select t2.*, c.*
| from tidb_ytrpt.upcar_monitor_detail t2
| join (select * from temp_upcar_monitor_carno where status = 'INVALID' and car_type='待发车') c on c.car_no = t2.containerno
| where t2.createTime>= '${initDate}' and t2.createTime < '${ctStartDate}'
| ) tab
| ) temp where temp.rn = 1 and not exists (select 1 from temp_upcar_monitor_carno c where status = 'INVALID' and c.car_no=temp.containerNo)
|) t
|""".stripMargin
println(sql)
spark.sql(sql).repartition(100).persist(StorageLevel.MEMORY_ONLY_SER).createOrReplaceTempView("upcar_monitor_detail_temp")*/
// 九点批次固化发车状态
if ("09:00:00".equals(batchTime.split(" ", -1)(1))) {
val sql =
s"""
|select '${DateHelper.convertDate(batchTime, "yyyy-MM-dd")}' rpt_date
| ,t.car_no
| ,t.create_time
| ,t.car_type
| ,t.status
| ,t.fache_time
| ,t.line_name
| from temp_upcar_monitor_carno t
|""".stripMargin
spark.sql(sql).write.mode(saveMode = "append").format("jdbc")
.option("driver", "com.mysql.jdbc.Driver")
.option("url", "jdbc:mysql://tidb1.prd.db:3390/ytrpt?rewriteBatchedStatements=true&useUnicode=true&characterEncoding=UTF-8")
.option("useSSL", false)
.option(JDBCOptions.JDBC_BATCH_INSERT_SIZE, 10000)
.option("dbtable", "upcar_monitor_carno_day") // database name and table name here
.option("isolationLevel", "NONE") // set isolationLevel to NONE
.option("user", "root") // TiDB user here
.save()
}
val sql0 =
s"""
|select * from tidb_ytrpt.upcar_monitor_detail
|where createTime>= '${initDate}' and createTime < '${batchTime}'
|""".stripMargin
println(sql0)
// spark.sql(sql).createOrReplaceTempView("upcar_monitor_detail_incomplete")
spark.sql(sql0)
.repartition(256).persist(StorageLevel.MEMORY_ONLY_SER)
.createOrReplaceTempView("upcar_monitor_detail_base_temp")
val sql =
s"""
|select t.waybillNo waybill_No,
| t.bagNo bag_No,
| t.containerNo container_No,
| t.expType exp_Type,
| t.orgCode org_Code,
| t.createTime create_Time,
| t.weight weight,
| t.createUserCode create_User_Code,
| t.createUserName create_User_Name,
| t.line_name line_Name,
| t.truckNo truck_No,
| t.truckTypeNew truck_Type_New,
| t.transCodeLink trans_Code_Link,
| t.predictDepartTime predict_Depart_Time,
| t.returnFlag return_Flag,
| t.datoubi datoubi,
| t.desRoute des_Route,
| t.desOrgCode des_Org_Code,
| t.masterNodeCode master_Node_Code,
| t.startRegionCode start_Region_Code,
| t.desRegionCode des_Region_Code,
| t.peizaiseoncdOrgCode peizaiseoncd_Org_Code,
| t.peizaisecondRegionCode peizaisecond_Region_Code,
| t.peizaistartOrgCode peizaistart_Org_Code,
| t.peizaistartRegionCode peizaistart_Region_Code,
| t.startProvCode start_Prov_Code,
| t.waybillNob waybill_Nob,
| t.firstNodeCode first_Node_Code,
| t.centerCarType center_Car_Type,
| t.routeLineName route_Line_Name,
| case when t.car_Type is null or t.car_Type='' then t.carType else t.car_Type end as car_Type,
| t.run_mode run_mode,
| t.fache_Time fache_Time,
| t.batchTime batch_Time,
| '${batchTime}' rpt_date
| from (
| select tab.*, c.*,
| row_number() over(partition by tab.waybillNo,tab.orgCode,tab.containerNo order by tab.createtime desc) rn
| from (
| select t1.*
| from upcar_monitor_detail_base_temp t1
| where t1.createTime>= '${ctStartDate}' and t1.createTime < '${batchTime}'
| union all
| select t2.*
| from upcar_monitor_detail_base_temp t2
| join (select car_no from tidb_ytrpt.upcar_monitor_carno_day where rpt_date = '${DateHelper.convertDate(batchTime, "yyyy-MM-dd")}' and car_type='待发车') c on c.car_no = t2.containerno
| where t2.createTime>= '${initDate}' and t2.createTime < '${ctStartDate}'
| ) tab left join (select * from temp_upcar_monitor_carno where create_time<='${batchTime}') c on c.car_no = tab.containerno
|) t where rn = 1 and not exists (select 1 from temp_upcar_monitor_carno c where status = 'INVALID' and c.car_no=t.containerno)
|""".stripMargin
println(sql)
// spark.sql(sql).createOrReplaceTempView("upcar_monitor_detail_incomplete")
spark.sql(sql)
.repartition(256).persist(StorageLevel.MEM