Commit 5ab38e6c by WangJinfeng

update rtdmp add lazada

parent fcb21923
......@@ -86,7 +86,7 @@ public class RTDmpFetch {
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat("RTDmpFetch-%d").build();
ExecutorService pool = new ThreadPoolExecutor(5, 10,
ExecutorService pool = new ThreadPoolExecutor(10, 20,
120L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(64), threadFactory, new ThreadPoolExecutor.AbortPolicy());
......
......@@ -80,7 +80,7 @@ class RTDmpMergeCK extends CommonSparkJob with Serializable {
Thread.sleep(120000)
df.saveToClickHouse(database, table, Seq(dt, hour_part), Seq("dt", "hour"), clusterName, batchSize = 100000)
df.saveToClickHouse(database, table, Seq(dt, hour_part), Seq("dt", "hour"), clusterName, batchSize = 200000)
MySQLUtil.update(database, table, date_time)
......
package mobvista.dmp.datasource.rtdmp.lazada
/**
* @package: mobvista.dmp.datasource.rtdmp.lazada
* @author: wangjf
* @date: 2021/8/5
* @time: 6:13 下午
* @email: jinfeng.wang@mobvista.com
*/
object Constant {
val etl_install_sql: String =
"""
|SELECT campaign_id,gaid
| FROM dwh.ods_adn_trackingnew_install
| WHERE CONCAT(yyyy,mm,dd) = @dt AND UPPER(country_code) IN ('ID','PH')
| AND ext_campaignpackagename = 'com.lazada.android'
| GROUP BY campaign_id,gaid
|""".stripMargin
val etl_event_sql: String =
"""
|SELECT campaign_id,gaid
| FROM dwh.ods_adn_tracking_ss_event
| WHERE CONCAT(yyyy,mm,dd) = @dt AND country in('ID','PH') AND event_name = 'REGISTRATION'
| GROUP BY campaign_id,gaid
|""".stripMargin
val merge_install_sql: String =
"""
|SELECT campaign_id, gaid, '@new_date' update_date
| FROM dwh.etl_adn_tracking_install
| WHERE dt = '@dt'
| UNION ALL
|SELECT campaign_id, gaid, update_date
| FROM dwh.merge_adn_tracking_install
| WHERE dt = '@dt' AND update_date > '@update_date'
|""".stripMargin
val process_rtdmp_audience_sql: String =
"""
|SELECT t2.gaid
| FROM
| (SELECT campaign_id,gaid
| FROM dwh.etl_adn_tracking_event WHERE dt = '@dt'
| ) t1
| RIGHT JOIN
| (SELECT campaign_id,gaid
| FROM dwh.merge_adn_tracking_install WHERE dt = '@dt'
| ) t2
| ON t1.campaign_id = t2.campaign_id AND t1.gaid = t2.gaid
| WHERE t1.gaid IS NULL
| GROUP BY t2.gaid
|""".stripMargin
}
package mobvista.dmp.datasource.rtdmp.lazada
import mobvista.dmp.common.{CommonSparkJob, MobvistaConstant}
import org.apache.commons.cli.{BasicParser, Options}
import org.apache.spark.sql.SaveMode
/**
* @package: mobvista.dmp.datasource.rtdmp.lazada
* @author: wangjf
* @date: 2021/8/5
* @time: 7:23 下午
* @email: jinfeng.wang@mobvista.com
*/
class ETLJob extends CommonSparkJob with Serializable {
def commandOptions(): Options = {
val options = new Options()
options.addOption("dt", true, "dt")
options.addOption("tb_type", true, "tb_type")
options.addOption("output", true, "output")
options
}
override protected def run(args: Array[String]): Int = {
val parser = new BasicParser()
val options = commandOptions()
val commandLine = parser.parse(options, args)
val dt = commandLine.getOptionValue("dt")
val tb_type = commandLine.getOptionValue("tb_type")
val output = commandLine.getOptionValue("output")
val spark = MobvistaConstant.createSparkSession(s"ETLJob.${tb_type}.${dt}")
val sc = spark.sparkContext
try {
val sql: String = if (tb_type.equals("install")) {
Constant.etl_install_sql
} else {
Constant.etl_event_sql
}
spark.sql(sql.replace("@dt", dt))
.coalesce(20)
.write
.option("orc.compress", "zlib")
.mode(SaveMode.Overwrite)
.orc(output)
} finally {
if (sc != null) {
sc.stop()
}
if (spark != null) {
spark.stop()
}
}
0
}
}
object ETLJob {
def main(args: Array[String]): Unit = {
new ETLJob().run(args)
}
}
\ No newline at end of file
package mobvista.dmp.datasource.rtdmp.lazada
import mobvista.dmp.common.{CommonSparkJob, MobvistaConstant}
import mobvista.dmp.util.DateUtil
import org.apache.commons.cli.{BasicParser, Options}
import org.apache.spark.sql.SaveMode
/**
* @package: mobvista.dmp.datasource.rtdmp.lazada
* @author: wangjf
* @date: 2021/8/5
* @time: 7:23 下午
* @email: jinfeng.wang@mobvista.com
*/
class MergeInstallJob extends CommonSparkJob with Serializable {
def commandOptions(): Options = {
val options = new Options()
options.addOption("dt", true, "dt")
options.addOption("tb_type", true, "tb_type")
options.addOption("output", true, "output")
options
}
override protected def run(args: Array[String]): Int = {
val parser = new BasicParser()
val options = commandOptions()
val commandLine = parser.parse(options, args)
val dt = commandLine.getOptionValue("dt")
val output = commandLine.getOptionValue("output")
val spark = MobvistaConstant.createSparkSession(s"MergeInstallJob.${dt}")
val sc = spark.sparkContext
try {
val new_date = MobvistaConstant.sdf1.format(MobvistaConstant.sdf2.parse(dt))
val update_date = DateUtil.getDay(DateUtil.parse(dt, "yyyyMMdd"), "yyyy-MM-dd", -30)
val sql: String = Constant.merge_install_sql
.replace("@dt", dt)
.replace("@new_date", new_date)
.replace("@update_date", update_date)
spark.sql(sql)
.coalesce(100)
.write
.option("orc.compress", "zlib")
.mode(SaveMode.Overwrite)
.orc(output)
} finally {
if (sc != null) {
sc.stop()
}
if (spark != null) {
spark.stop()
}
}
0
}
}
object MergeInstallJob {
def main(args: Array[String]): Unit = {
new MergeInstallJob().run(args)
}
}
\ No newline at end of file
package mobvista.dmp.datasource.rtdmp.lazada
import com.alibaba.fastjson.{JSONArray, JSONObject}
import mobvista.dmp.common.{CommonSparkJob, MobvistaConstant}
import mobvista.dmp.datasource.rtdmp.{Logic, ServerUtil}
import org.apache.commons.cli.{BasicParser, Options}
import org.apache.hadoop.io.compress.GzipCodec
import scala.collection.mutable
/**
* @package: mobvista.dmp.datasource.rtdmp.lazada
* @author: wangjf
* @date: 2021/8/5
* @time: 7:23 下午
* @email: jinfeng.wang@mobvista.com
*/
class ProcessRTJob extends CommonSparkJob with Serializable {
def commandOptions(): Options = {
val options = new Options()
options.addOption("dt", true, "dt")
options.addOption("tb_type", true, "tb_type")
options.addOption("output", true, "output")
options
}
override protected def run(args: Array[String]): Int = {
val parser = new BasicParser()
val options = commandOptions()
val commandLine = parser.parse(options, args)
val dt = commandLine.getOptionValue("dt")
val output = commandLine.getOptionValue("output")
val spark = MobvistaConstant.createSparkSession(s"ProcessRTJob.${dt}")
val sc = spark.sparkContext
try {
val sql: String = Constant.process_rtdmp_audience_sql
.replace("@dt", dt)
spark.sql(sql)
.rdd.map(row => {
row.getAs[String]("gaid")
}).coalesce(20)
.saveAsTextFile(output, classOf[GzipCodec])
val package_name = "com.lazada.android"
val map = Logic.getAudienceInfo("lazada")
Logic.writeAudienceInfo("lazada", Logic.getAudienceMap(mutable.HashSet(package_name)))
if (map.contains(package_name)) {
val uploadArray = new JSONArray()
val jsonObject = new JSONObject()
jsonObject.put("id", map(package_name))
jsonObject.put("s3_path", s"$output/${package_name}/gaid/")
jsonObject.put("status", 1)
jsonObject.put("audience_data_status", 1)
uploadArray.add(jsonObject)
ServerUtil.upload(uploadArray)
} else {
val updateArray = new JSONArray()
val jsonObject = new JSONObject()
jsonObject.put("s3_path", s"$output/${package_name}/gaid/")
jsonObject.put("status", 1)
jsonObject.put("audience_data_status", 1)
jsonObject.put("platform", 1)
jsonObject.put("match_device_type", 3)
jsonObject.put("audience_type", 2)
jsonObject.put("data_update_method", 1)
jsonObject.put("audience_name", package_name)
jsonObject.put("audience_gender", 3)
jsonObject.put("audience_count", 1)
jsonObject.put("is_sync_dmpserver", 1)
updateArray.add(jsonObject)
ServerUtil.update(updateArray)
}
} finally {
if (sc != null) {
sc.stop()
}
if (spark != null) {
spark.stop()
}
}
0
}
}
object ProcessRTJob {
def main(args: Array[String]): Unit = {
new ProcessRTJob().run(args)
}
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment