package mobvista.dmp.datasource.setting import java.net.URI import java.util.Properties import mobvista.dmp.common.CommonSparkJob import org.apache.commons.cli.Options import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.sql.{SaveMode, SparkSession} class SettingTotal extends CommonSparkJob { override protected def buildOptions(): Options = { val options = new Options options.addOption("outputtotal", true, "[must] outputtotal") options.addOption("coalesce", true, "[must] coalesce") options.addOption("today", true, "[must] today") options } override protected def run(args: Array[String]): Int = { val commandLine = commParser.parse(options, args) if (!checkMustOption(commandLine)) { printUsage(options) return -1 } else printOptions(commandLine) val outputtotal = commandLine.getOptionValue("outputtotal") val coalesce = commandLine.getOptionValue("coalesce") val today = commandLine.getOptionValue("today") val spark = SparkSession.builder() .appName("AppsFlyerTotal") .config("spark.rdd.compress", "true") .config("spark.io.compression.codec", "snappy") .config("spark.sql.orc.filterPushdown", "true") .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .enableHiveSupport() .getOrCreate() FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(outputtotal), true) try { val sqlContext = spark.sqlContext val properties = new Properties() properties.put("user", "adnro") properties.put("password", "YcM123glh") val url = "jdbc:mysql://adn-mysql-external.mobvista.com:3306/mob_adn" sqlContext.read.jdbc(url, "publisher_channel", properties).select("id", "package_name", "platform").toDF("id", "package_name", "platform").createOrReplaceTempView("publisher_channel") val sqlDaily= s""" | select t1.app_id,coalesce(t2.package_name,t1.package_name) package_name,t1.platform | from (select app_id,package_name,platform | from dwh.ods_adn_ngix_setting_global where concat(yyyy,mm,dd) = '${today}') t1 | left join (select id appid,replace(package_name,"id", "") package_name,platform | from publisher_channel where platform = '2') t2 | on(t1.app_id = t2.appid) """.stripMargin spark.sql(sqlDaily) .coalesce(coalesce.toInt).rdd .map(_.mkString(",")) .saveAsTextFile(outputtotal) } finally { spark.stop() } 0 } } object SettingTotal { def main(args: Array[String]): Unit = { new SettingTotal().run(args) } }