package mobvista.dmp.datasource.retargeting import java.sql.Timestamp import java.text.SimpleDateFormat import java.util.{Calendar, Date} import mobvista.dmp.common.{CommonSparkJob, MobvistaConstant} import mobvista.dmp.util.DateUtil import org.apache.commons.cli.{BasicParser, Options} import org.apache.spark.sql.{SaveMode, SparkSession} /** * @package: mobvista.dmp.datasource.retargeting * @author: wangjf * @date: 2019-06-06 * @time: 17:29 * @emial: jinfeng.wang@mobvista.com * @phone: 152-1062-7698 */ class GetPackage extends CommonSparkJob { def commandOptions(): Options = { val options = new Options() options.addOption("date", true, "date") options } val sdf1 = new SimpleDateFormat("yyyy-MM-dd") val sdf2 = new SimpleDateFormat("yyyyMMdd") override protected def run(args: Array[String]): Int = { val parser = new BasicParser() val options = commandOptions() val commandLine = parser.parse(options, args) val date = commandLine.getOptionValue("date") val spark = SparkSession .builder() .appName("GetPackage") .config("spark.rdd.compress", "true") .config("spark.shuffle.compress", "true") .config("spark.sql.orc.filterPushdown", "true") .config("spark.io.compression.codec", "lz4") .config("spark.io.compression.lz4.blockSize", "64k") .config("spark.sql.autoBroadcastJoinThreshold", "209715200") .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .enableHiveSupport() .getOrCreate() try { import spark.implicits._ val mysqlDF = Constant.jdbcConnection(spark, "mob_adn", "dmp_app_map").rdd.map(r => { r.getAs("app_package_name").toString.toLowerCase }).toDF("app_package_name") val fm = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") val updateDate = DateUtil.getDayByString(date, "yyyyMMdd", -6) val update_date = sdf1.format(sdf2.parse(updateDate)) val sql = Constant.package_sql.replace("@date", date).replace("@update_date", update_date) val installDF = spark.sql(sql).map(r => { var app_package_name = r.getAs("package_name").toString.toLowerCase if (app_package_name.length > 255) { app_package_name = app_package_name.substring(0, 255) } app_package_name }).filter(r => { MobvistaConstant.adrPkgPtn.matcher(r).matches() || MobvistaConstant.iosPkgPtn.matcher(r).matches() }).dropDuplicates .toDF("app_package_name") val cal = Calendar.getInstance() cal.setTime(new Date()) cal.add(Calendar.HOUR, 8) val df = installDF.except(mysqlDF) .map(r => { Result(0, r.getAs("app_package_name"), Timestamp.valueOf(fm.format(cal.getTime)), Timestamp.valueOf(fm.format(cal.getTime))) }).toDF() .repartition(200) Constant.writeMySQL(df, "mob_adn", "dmp_app_map", SaveMode.Append) } finally { if (spark != null) { spark.stop() } } 0 } } object GetPackage { def main(args: Array[String]): Unit = { new GetPackage().run(args) } }