Commit 69acc18a by fan.jiang

com.eg.android.AlipayGphone_reyun

parent 1e4fa068
type=command
command=sh -x com_eg_android_AlipayGphone_reyun.sh
\ No newline at end of file
#!/usr/bin/env bash
# 从dm_install_list_v2表在reyun分区里的com.eg.android.AlipayGphone设备拉出来到某个路径里(天级别更新), 后续需要把这个人群包入rtdmp,命名为com.eg.android.AlipayGphone_reyun
source ../../dmp_env.sh
dt_today=$(date -d "$ScheduleTime 1 days ago" +"%Y%m%d")
dt_dash_today=$(date -d "$ScheduleTime 1 days ago" +"%Y/%m/%d")
business="reyun"
INPUT_PATH="${DM_INSTALL_LIST}_v2/${dt_dash_today}/${business}"
check_await ${INPUT_PATH}/_SUCCESS
OUTPUT_PATH="${RTDMP_COM_EG_ANDROID_ALIPAYGPHONE_REYUN_PATH}/${dt_dash_today}/"
hadoop fs -rm -r "${OUTPUT_PATH}"
spark-submit --class mobvista.dmp.datasource.dm.ComEgAndroidAlipayGphoneReyun \
--conf spark.yarn.executor.memoryOverhead=2048 \
--conf spark.default.parallelism=3000 \
--conf spark.sql.shuffle.partitions=3000 \
--conf spark.driver.maxResultSize=4g \
--conf spark.network.timeout=720s \
--files ${HIVE_SITE_PATH} \
--master yarn --deploy-mode cluster --executor-memory 8g --driver-memory 6g --executor-cores 6 --num-executors 20 \
../../${JAR} -dt_today ${dt_today} -output ${OUTPUT_PATH} -coalesce 200
if [[ $? -ne 0 ]]; then
exit 255
fi
package mobvista.dmp.datasource.dm
import mobvista.dmp.common.CommonSparkJob
import mobvista.dmp.format.TextMultipleOutputFormat
import org.apache.commons.cli.Options
import org.apache.commons.lang.StringUtils
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.Text
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.sql.functions.{concat_ws, lit}
import org.apache.spark.storage.StorageLevel
import java.net.URI
import scala.collection.mutable.ArrayBuffer
/**
* @author jiangfan
* @date 2021/8/19 14:17
*/
class ComEgAndroidAlipayGphoneReyun extends CommonSparkJob with Serializable {
override protected def buildOptions(): Options = {
val options = new Options
options.addOption("coalesce", true, "[must] coalesce")
options.addOption("output", true, "[must] output")
options.addOption("dt_today", true, "[must] dt_today")
options
}
override protected def run(args: Array[String]): Int = {
val commandLine = commParser.parse(options, args)
if (!checkMustOption(commandLine)) {
printUsage(options)
return -1
} else printOptions(commandLine)
val coalesce = commandLine.getOptionValue("coalesce")
val output = commandLine.getOptionValue("output")
val dt_today = commandLine.getOptionValue("dt_today")
val spark = SparkSession.builder()
.appName("ComEgAndroidAlipayGphoneReyun")
.config("spark.rdd.compress", "true")
.config("spark.io.compression.codec", "snappy")
.config("spark.sql.orc.filterPushdown", "true")
.config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.enableHiveSupport()
.getOrCreate()
val sc = spark.sparkContext
import spark.implicits._
FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output), true)
try {
val sql1=
s"""
|select device_id, device_type from dwh.dm_install_list_v2 where dt='${dt_today}' and business='reyun' and package_name='com.eg.android.AlipayGphone'
""".stripMargin
spark.sql(sql1).rdd.flatMap(buildRes(_)).coalesce(coalesce.toInt)
.saveAsNewAPIHadoopFile(s"${output}", classOf[Text], classOf[Text], classOf[TextMultipleOutputFormat], spark.sparkContext.hadoopConfiguration)
} finally {
spark.stop()
}
0
}
def buildRes( row: Row): Array[Tuple2[Text, Text]] = {
val buffer = new ArrayBuffer[Tuple2[Text, Text]]()
val device_id = row.getAs[String]("device_id")
val device_type = row.getAs[String]("device_type")
if (StringUtils.isNotBlank(device_type)) {
buffer += Tuple2(new Text(s"${device_type}, "), new Text(device_id))
}
buffer.toArray
}
}
object ComEgAndroidAlipayGphoneReyun {
def main(args: Array[String]): Unit = {
new ComEgAndroidAlipayGphoneReyun().run(args)
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment