Commit fd0560d0 by wang-jinfeng

optimize dmp

parent a355902f
...@@ -34,7 +34,7 @@ if [[ $? -ne 0 ]]; then ...@@ -34,7 +34,7 @@ if [[ $? -ne 0 ]]; then
exit 255 exit 255
fi fi
sleep $((fors * 40)) sleep $((fors * 60))
ssh -o "StrictHostKeyChecking no" -i /home/hadoop/wangjf/dataplatform_cn.pem -l root 182.92.177.185 "sh -x /root/workspace/check_process.sh '${shell}'" ssh -o "StrictHostKeyChecking no" -i /home/hadoop/wangjf/dataplatform_cn.pem -l root 182.92.177.185 "sh -x /root/workspace/check_process.sh '${shell}'"
......
...@@ -34,7 +34,7 @@ if [[ $? -ne 0 ]]; then ...@@ -34,7 +34,7 @@ if [[ $? -ne 0 ]]; then
exit 255 exit 255
fi fi
sleep $((fors * 40)) sleep $((fors * 60))
ssh -o "StrictHostKeyChecking no" -i /home/hadoop/wangjf/dataplatform_cn.pem -l root 182.92.177.185 "sh -x /root/workspace/check_process.sh '${shell}'" ssh -o "StrictHostKeyChecking no" -i /home/hadoop/wangjf/dataplatform_cn.pem -l root 182.92.177.185 "sh -x /root/workspace/check_process.sh '${shell}'"
......
type=command type=command
dependencies=uc_lahuo_data_to_dmp dependencies=uc_lahuo_data_to_dmp
command=sh -x uc_other_data_to_dmp.sh command=sh -x uc_other_data_to_dmp_v2.sh
\ No newline at end of file \ No newline at end of file
...@@ -22,7 +22,7 @@ EXPIRE_OUTPUT_PATH="${UC_LAHUO_TMP_DAILY_TO_S3}/${expire_date_path}/uc_activatio ...@@ -22,7 +22,7 @@ EXPIRE_OUTPUT_PATH="${UC_LAHUO_TMP_DAILY_TO_S3}/${expire_date_path}/uc_activatio
# OUTPUT_PATH03="${UC_LAHUO_TMP_DAILY_TO_S3}/${dt_slash_today}/uc_activation_other_data/4b5a58_ucoppo" # OUTPUT_PATH03="${UC_LAHUO_TMP_DAILY_TO_S3}/${dt_slash_today}/uc_activation_other_data/4b5a58_ucoppo"
# OUTPUT_PATH04="${UC_LAHUO_TMP_DAILY_TO_S3}/${dt_slash_today}/uc_activation_other_data/d3f521_ucoppo" # OUTPUT_PATH04="${UC_LAHUO_TMP_DAILY_TO_S3}/${dt_slash_today}/uc_activation_other_data/d3f521_ucoppo"
OUTPUT_PATH="${UC_LAHUO_TMP_DAILY_TO_S3}/${dt_slash_today}/uc_activation_other_data/" OUTPUT_PATH="${UC_LAHUO_TMP_DAILY_TO_S3}/${dt_slash_today}/uc_activation_other_data"
hadoop fs -rm -r "${OUTPUT_PATH}" hadoop fs -rm -r "${OUTPUT_PATH}"
......
...@@ -39,7 +39,7 @@ if [[ $? -ne 0 ]]; then ...@@ -39,7 +39,7 @@ if [[ $? -ne 0 ]]; then
exit 255 exit 255
fi fi
sleep $((fors * 15)) sleep $((fors * 25))
ssh -o "StrictHostKeyChecking no" -i /home/hadoop/wangjf/dataplatform_cn.pem -l root 182.92.177.185 "sh -x /root/workspace/check_process.sh '${shell}'" ssh -o "StrictHostKeyChecking no" -i /home/hadoop/wangjf/dataplatform_cn.pem -l root 182.92.177.185 "sh -x /root/workspace/check_process.sh '${shell}'"
......
...@@ -39,7 +39,7 @@ if [[ $? -ne 0 ]]; then ...@@ -39,7 +39,7 @@ if [[ $? -ne 0 ]]; then
exit 255 exit 255
fi fi
sleep $((fors * 15)) sleep $((fors * 25))
ssh -o "StrictHostKeyChecking no" -i /home/hadoop/wangjf/dataplatform_cn.pem -l root 182.92.177.185 "sh -x /root/workspace/check_process.sh '${shell}'" ssh -o "StrictHostKeyChecking no" -i /home/hadoop/wangjf/dataplatform_cn.pem -l root 182.92.177.185 "sh -x /root/workspace/check_process.sh '${shell}'"
......
...@@ -37,7 +37,7 @@ if [[ $? -ne 0 ]]; then ...@@ -37,7 +37,7 @@ if [[ $? -ne 0 ]]; then
exit 255 exit 255
fi fi
sleep $((fors * 25)) sleep $((fors * 50))
shell=" -cp /root/workspace/DMP-1.0.3-jar-with-dependencies.jar mobvista.dmp.datasource.iqiyi.IQiYiRequest" shell=" -cp /root/workspace/DMP-1.0.3-jar-with-dependencies.jar mobvista.dmp.datasource.iqiyi.IQiYiRequest"
......
...@@ -32,8 +32,8 @@ import java.util.concurrent.TimeUnit; ...@@ -32,8 +32,8 @@ import java.util.concurrent.TimeUnit;
public class BaiChuanMainV2 { public class BaiChuanMainV2 {
private static String dt = DateUtil.format(new Date(), "yyyy-MM-dd"); private static String dt = DateUtil.format(new Date(), "yyyy-MM-dd");
static ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(200, 400, 500, TimeUnit.MILLISECONDS, static ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(100, 200, 500, TimeUnit.MILLISECONDS,
new LinkedBlockingDeque<>(400), new CustomizableThreadFactory("BaiChuan"), new ThreadPoolExecutor.CallerRunsPolicy()); new LinkedBlockingDeque<>(200), new CustomizableThreadFactory("BaiChuan"), new ThreadPoolExecutor.CallerRunsPolicy());
public static void main(String[] args) throws JoranException, InterruptedException { public static void main(String[] args) throws JoranException, InterruptedException {
LoggerContext context = (LoggerContext) LoggerFactory.getILoggerFactory(); LoggerContext context = (LoggerContext) LoggerFactory.getILoggerFactory();
......
...@@ -37,21 +37,16 @@ spark_clickhouse_url=jdbc:clickhouse://192.168.17.122:8123,192.168.17.123:8123,1 ...@@ -37,21 +37,16 @@ spark_clickhouse_url=jdbc:clickhouse://192.168.17.122:8123,192.168.17.123:8123,1
mapping.se.host=3.34.241.249,13.209.48.228,3.35.218.108 mapping.se.host=3.34.241.249,13.209.48.228,3.35.218.108
mapping.sg.host=18.136.203.33,18.139.0.13,18.139.160.145 mapping.sg.host=18.139.0.13,18.139.160.145,54.251.75.86
mapping.aliyun.host=10.22.2.232,10.22.2.231,10.22.1.97,10.22.1.98,10.22.1.100,10.22.0.24,10.22.0.23,10.22.0.22,10.22.3.126,10.22.0.40,10.22.0.39,\ mapping.aliyun.host=10.22.1.188,10.22.1.187,10.22.1.186
10.22.1.39,10.22.3.120,10.22.3.178,10.22.1.100,10.22.1.98,10.22.1.97
mapping.fk.host=3.122.120.90,35.159.16.74,3.121.69.130,18.193.88.136,18.184.171.197,3.65.211.221,3.120.203.124,3.65.116.29,54.93.86.92,3.65.111.184,\ mapping.fk.host=18.159.113.192,3.126.116.164,3.126.249.98
18.195.203.195,18.192.124.88,18.196.157.191,3.65.136.90
mapping.se.host_map=172.31.24.255:3.34.241.249,172.31.29.23:13.209.48.228,172.31.21.185:3.35.218.108 mapping.se.host_map=172.31.24.255:3.34.241.249,172.31.29.23:13.209.48.228,172.31.21.185:3.35.218.108
mapping.sg.host_map=172.31.17.79:18.136.203.33,172.31.23.187:18.139.0.13,172.31.18.220:18.139.160.145 mapping.sg.host_map=172.31.23.187:18.139.0.13,172.31.18.220:18.139.160.145,172.31.31.118:54.251.75.86
mapping.aliyun.host_map=10.22.2.232:10.22.2.232,10.22.2.231:10.22.2.231,10.22.1.97:10.22.1.97,10.22.1.98:10.22.1.98,10.22.1.100:10.22.1.100,\ mapping.aliyun.host_map=10.22.1.188:10.22.1.188,10.22.1.187:10.22.1.187,10.22.1.186:10.22.1.186
10.22.0.24:10.22.0.24,10.22.0.23:10.22.0.23,10.22.0.22:10.22.0.22,10.22.3.126:10.22.3.126,10.22.0.40:10.22.0.40,10.22.0.39:10.22.0.39,\
10.22.1.39:10.22.1.39,10.22.3.120:10.22.3.120,10.22.3.178:10.22.3.178,10.22.1.100:10.22.1.100,10.22.1.98:10.22.1.98,10.22.1.97:10.22.1.97 mapping.fk.host_map=172.31.26.93:18.159.113.192,172.31.18.112:3.126.116.164,172.31.30.65:3.126.249.98
mapping.fk.host_map=172.31.21.197:3.122.120.90,172.31.30.85:35.159.16.74,172.31.31.36:3.121.69.130,172.31.23.22:18.193.88.136,\ \ No newline at end of file
172.31.31.83:18.184.171.197,172.31.23.232:3.65.211.221,172.31.3.81:3.120.203.124,172.31.29.20:3.65.116.29,172.31.30.96:54.93.86.92,\
172.31.20.225:3.65.111.184,172.31.22.97:18.195.203.195,172.31.17.2:18.192.124.88,172.31.26.198:18.196.157.191,172.31.10.218:3.65.136.90
\ No newline at end of file
package mobvista.dmp.datasource.taobao package mobvista.dmp.datasource.taobao
import mobvista.dmp.common.{CommonSparkJob, MobvistaConstant} import mobvista.dmp.common.{CommonSparkJob, MobvistaConstant}
import mobvista.dmp.format.RDDMultipleOutputFormat
import mobvista.dmp.util.MRUtils
import org.apache.commons.cli.Options import org.apache.commons.cli.Options
import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.SequenceFile.CompressionType import org.apache.hadoop.io.SequenceFile.CompressionType
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.compress.{CompressionCodec, GzipCodec} import org.apache.hadoop.io.compress.{CompressionCodec, GzipCodec}
import org.apache.spark.sql.SaveMode import org.apache.spark.storage.StorageLevel
import java.net.URI import java.net.URI
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
class UCOtherDataToDmpV2 extends CommonSparkJob with Serializable { class UCOtherDataToDmpV2 extends CommonSparkJob with Serializable {
...@@ -34,6 +39,7 @@ class UCOtherDataToDmpV2 extends CommonSparkJob with Serializable { ...@@ -34,6 +39,7 @@ class UCOtherDataToDmpV2 extends CommonSparkJob with Serializable {
val spark = MobvistaConstant.createSparkSession("UCOtherDataToDmp") val spark = MobvistaConstant.createSparkSession("UCOtherDataToDmp")
mutable.WrappedArray
val sc = spark.sparkContext val sc = spark.sparkContext
FileSystem.get(new URI(s"s3://mob-emr-test"), sc.hadoopConfiguration).delete(new Path(output), true) FileSystem.get(new URI(s"s3://mob-emr-test"), sc.hadoopConfiguration).delete(new Path(output), true)
...@@ -64,42 +70,33 @@ class UCOtherDataToDmpV2 extends CommonSparkJob with Serializable { ...@@ -64,42 +70,33 @@ class UCOtherDataToDmpV2 extends CommonSparkJob with Serializable {
| GROUP BY device_id | GROUP BY device_id
|""".stripMargin |""".stripMargin
/*
val df = spark.sql(sql).persist(StorageLevel.MEMORY_AND_DISK_SER) val df = spark.sql(sql).persist(StorageLevel.MEMORY_AND_DISK_SER)
val rdd = df.rdd.map(r => { val rdd = df.rdd.map(r => {
val arrayBuffer = new ArrayBuffer[(Text, Text)]()
val deviceId = r.getAs[String]("device_id") val deviceId = r.getAs[String]("device_id")
val deviceType = "imeimd5" val deviceType = "imeimd5"
val platform = "android" val platform = "android"
val installList = r.getAs[mutable.WrappedArray[String]]("install_list") val installList = r.getAs[mutable.WrappedArray[String]]("install_list")
if (installList.contains("com.uc.foractivation.4b5a58") && installList.contains("com.UCMobile_bes")) { if (installList.contains("com.uc.foractivation.4b5a58") && installList.contains("com.UCMobile_bes")) {
(new Text(s"$output/4b5a58_ucbes"), new Text(MRUtils.JOINER.join(deviceId, deviceType, platform, "com.uc.foractivation.4b5a58_ucbes", update))) arrayBuffer += ((new Text(s"$output/4b5a58_ucbes"), new Text(MRUtils.JOINER.join(deviceId, deviceType, platform, "com.uc.foractivation.4b5a58_ucbes", update))))
} }
if (installList.contains("com.uc.foractivation.d3f521") && installList.contains("com.UCMobile_bes")) { if (installList.contains("com.uc.foractivation.d3f521") && installList.contains("com.UCMobile_bes")) {
(new Text(s"$output/d3f521_ucbes"), new Text(MRUtils.JOINER.join(deviceId, deviceType, platform, "com.uc.foractivation.d3f521_ucbes", update))) arrayBuffer += ((new Text(s"$output/d3f521_ucbes"), new Text(MRUtils.JOINER.join(deviceId, deviceType, platform, "com.uc.foractivation.d3f521_ucbes", update))))
} }
if (installList.contains("com.uc.foractivation.4b5a58") && installList.contains("com.ucmobile_oppo")) { if (installList.contains("com.uc.foractivation.4b5a58") && installList.contains("com.ucmobile_oppo")) {
(new Text(s"$output/4b5a58_ucoppo"), new Text(MRUtils.JOINER.join(deviceId, deviceType, platform, "com.uc.foractivation.4b5a58_ucoppo", update))) arrayBuffer += ((new Text(s"$output/4b5a58_ucoppo"), new Text(MRUtils.JOINER.join(deviceId, deviceType, platform, "com.uc.foractivation.4b5a58_ucoppo", update))))
} }
if (installList.contains("com.uc.foractivation.d3f521") && installList.contains("com.ucmobile_oppo")) { if (installList.contains("com.uc.foractivation.d3f521") && installList.contains("com.ucmobile_oppo")) {
(new Text(s"$output/d3f521_ucoppo"), new Text(MRUtils.JOINER.join(deviceId, deviceType, platform, "com.uc.foractivation.d3f521_ucoppo", update))) arrayBuffer += ((new Text(s"$output/d3f521_ucoppo"), new Text(MRUtils.JOINER.join(deviceId, deviceType, platform, "com.uc.foractivation.d3f521_ucoppo", update))))
} else {
(new Text(""), new Text(""))
} }
}).filter(t => { arrayBuffer
StringUtils.isNotBlank(t._1.toString) && StringUtils.isNotBlank(t._2.toString) }).flatMap(l => {
l
}) })
println(s"count -->> ${rdd.count()}")
rdd.coalesce(50) rdd.coalesce(50)
.saveAsNewAPIHadoopFile(output, classOf[Text], classOf[Text], classOf[RDDMultipleOutputFormat[_, _]], conf) .saveAsNewAPIHadoopFile(output, classOf[Text], classOf[Text], classOf[RDDMultipleOutputFormat[_, _]], conf)
*/
val df = spark.sql(sql)
df.write
.mode(SaveMode.Overwrite)
.option("orc.compress", "zlib")
.orc(output)
} finally { } finally {
spark.stop() spark.stop()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment