Commit c5e8abd9 by fan.jiang

id1142110895

parent 48220007
type=command
command=sh -x rtdmp_tmp_id1142110895.sh
\ No newline at end of file
#!/usr/bin/env bash
source ../../dmp_env.sh
dt_today=$(date -d "$ScheduleTime 1 days ago" +"%Y%m%d")
dt_slash_today=$(date -d "$ScheduleTime 1 days ago" +"%Y/%m/%d")
check_await "${DM_INSTALL_LIST}_v2/${dt_slash_today}/dsp_req/_SUCCESS"
OUTPUT_PATH1="${RTDMP_TMP_PACKAGE_NAME_PATH}/id1142110895/${dt_slash_today}/"
hadoop fs -rm -r "${OUTPUT_PATH1}"
spark-submit --class mobvista.dmp.datasource.dm.RtdmpTmpId1142110895 \
--conf spark.yarn.executor.memoryOverhead=2048 \
--conf spark.default.parallelism=3000 \
--conf spark.sql.shuffle.partitions=3000 \
--conf spark.driver.maxResultSize=4g \
--conf spark.network.timeout=720s \
--files ${HIVE_SITE_PATH} \
--master yarn --deploy-mode cluster --executor-memory 8g --driver-memory 6g --executor-cores 6 --num-executors 70 \
../../${JAR} -dt_today ${dt_today} -output1 ${OUTPUT_PATH1} \
-coalesce 420
if [[ $? -ne 0 ]]; then
exit 255
fi
\ No newline at end of file
......@@ -265,6 +265,7 @@ RTDMP_COM_TAOBAO_LITETAO_REYUN_PATH="s3://mob-emr-test/dataplatform/DataWareHous
RTDMP_COM_SS_ANDROID_UGC_AWEME_REYUN_PATH="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/tmp/rtdmp_com_ss_android_ugc_aweme_reyun"
RTDMP_COM_TAOBAO_LITETAO_BTOP_PATH="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/tmp/rtdmp_com_taobao_litetao_btop"
RTDMP_COM_SS_ANDROID_UGC_AWEME_BTOP_PATH="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/tmp/rtdmp_com_ss_android_ugc_aweme_btop"
RTDMP_TMP_PACKAGE_NAME_PATH="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/tmp/rtdmp_tmp_package_name"
RTDMP_NORMAL_PATH="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/tmp/rtdmp_normal"
RTDMP_NORMAL_COUNT_RESULT="s3://mob-emr-test/dataplatform/DataWareHouse/data/dwh/tmp/rtdmp_normal_count_result"
......
package mobvista.dmp.datasource.dm
import mobvista.dmp.common.CommonSparkJob
import mobvista.dmp.format.TextMultipleOutputFormat
import org.apache.commons.cli.Options
import org.apache.commons.lang.StringUtils
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.Text
import org.apache.spark.sql.{Row, SparkSession}
import java.net.URI
import scala.collection.mutable.ArrayBuffer
/**
* @author jiangfan
* @date 2021/8/25 11:47
*/
class RtdmpTmpId1142110895 extends CommonSparkJob with Serializable {
override protected def buildOptions(): Options = {
val options = new Options
options.addOption("coalesce", true, "[must] coalesce")
options.addOption("output1", true, "[must] output1")
options.addOption("dt_today", true, "[must] dt_today")
options
}
override protected def run(args: Array[String]): Int = {
val commandLine = commParser.parse(options, args)
if (!checkMustOption(commandLine)) {
printUsage(options)
return -1
} else printOptions(commandLine)
val coalesce = commandLine.getOptionValue("coalesce")
val output1 = commandLine.getOptionValue("output1")
val dt_today = commandLine.getOptionValue("dt_today")
val spark = SparkSession.builder()
.appName("RtdmpTmpId1142110895")
.config("spark.rdd.compress", "true")
.config("spark.io.compression.codec", "snappy")
.config("spark.sql.orc.filterPushdown", "true")
.config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.enableHiveSupport()
.getOrCreate()
val sc = spark.sparkContext
import spark.implicits._
FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output1), true)
try {
val sql1=
s"""
|select device_id, device_type from dwh.dm_install_list_v2 where dt='${dt_today}' and package_name in ('1142110895','id1142110895') and device_type not in ('androidid','android_id','ruid')
""".stripMargin
spark.sql(sql1).rdd.flatMap(buildRes(_)).coalesce(coalesce.toInt)
.saveAsNewAPIHadoopFile(s"${output1}", classOf[Text], classOf[Text], classOf[TextMultipleOutputFormat], spark.sparkContext.hadoopConfiguration)
} finally {
spark.stop()
}
0
}
def buildRes( row: Row): Array[Tuple2[Text, Text]] = {
val buffer = new ArrayBuffer[Tuple2[Text, Text]]()
val device_id = row.getAs[String]("device_id")
val device_type = row.getAs[String]("device_type")
if (StringUtils.isNotBlank(device_type)) {
buffer += Tuple2(new Text(s"${device_type}, "), new Text(device_id))
}
buffer.toArray
}
}
object RtdmpTmpId1142110895 {
def main(args: Array[String]): Unit = {
new RtdmpTmpId1142110895().run(args)
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment