Commit b1f36887 by WangJinfeng

update partition filter

parent 719ef9a5
......@@ -33,6 +33,7 @@ hadoop jar ../../${JAR} mobvista.dmp.datasource.adn.mapreduce.MdsAdnRequestDaily
-Dmapreduce.reduce.java.opts=-Xmx2458m \
-Dmapreduce.job.reduces=200 \
-Dmapreduce.fileoutputcommitter.algorithm.version=2 \
-Dmapreduce.input.fileinputformat.input.dir.recursive=true \
"$INPUT_ADN_SDK_PKG_DAILY" "$OUTPUT_PATH" || exit 1
mount_partition "mds_adn_request_daily" "\`date\`='$LOG_TIME'" "$OUTPUT_PATH" || exit 1
......
......@@ -60,7 +60,7 @@ spark-submit --class mobvista.dmp.datasource.dm.DmInterestTagAllV2 \
--conf spark.sql.adaptive.enabled=true \
--conf spark.sql.adaptive.advisoryPartitionSizeInBytes=536870912 \
--conf spark.executor.extraJavaOptions="-XX:+UseG1GC" \
--master yarn --deploy-mode cluster --executor-memory 10g --driver-memory 6g --executor-cores 3 --num-executors 256 \
--master yarn --deploy-mode cluster --executor-memory 12g --driver-memory 6g --executor-cores 4 --num-executors 256 \
../${JAR} \
-output ${OUTPUT_PATH} -date ${date} -ga_date ${ga_date} -coalesce 5000
......
......@@ -49,8 +49,6 @@ public class MD5Util {
}
// 32位加密
return buf.toString();
// 16位的加密
// return buf.toString().substring(8, 24);
} catch (NoSuchAlgorithmException e) {
e.printStackTrace();
return "";
......
......@@ -336,6 +336,8 @@ object MobvistaConstant {
val oaidPtb = """^[0-9A-Za-z-]{16,64}$"""
// IP
val ipPtn = """^(25[0-5]|2[0-4][0-9]|[0-1]?[0-9]?[0-9])(\.(25[0-5]|2[0-4][0-9]|[0-1]?[0-9]?[0-9])){3}$"""
// Date
val datePtn = """^\d{4}-\d{2}-\d{2}"""
def checkDeviceId(device_id: String): Boolean = {
StringUtils.isNotBlank(device_id) &&
......
......@@ -37,10 +37,14 @@ class AdnClickDaily extends CommonSparkJob with Serializable {
val spark = MobvistaConstant.createSparkSession(s"AdnClickDaily.$datetime")
val year = datetime.substring(0, 4)
val month = datetime.substring(4, 6)
val day = datetime.substring(6, 8)
val sql =
s"""
|SELECT request_id, campaign_id, platform, gaid, idfa, imei, dev_id android_id, getDevId(ext_sysid) sysid, getRuid(ext_algo) ruid, getDevId(cdn_ab) idfv, ext_oaid oaid
|FROM dwh.ods_adn_trackingnew_click WHERE CONCAT(yyyy,mm,dd) = '$datetime' AND request_id != '0' AND LENGTH(campaign_id) >= 2
|FROM dwh.ods_adn_trackingnew_click WHERE yyyy = '$year' AND mm = '$month' AND dd = '$day' AND request_id != '0' AND LENGTH(campaign_id) >= 2
|""".stripMargin
try {
......
......@@ -34,6 +34,10 @@ class AdnInstallDaily extends CommonSparkJob with Serializable {
val spark = MobvistaConstant.createSparkSession(s"AdnInstallDaily.$datetime")
val year = datetime.substring(0, 4)
val month = datetime.substring(4, 6)
val day = datetime.substring(6, 8)
val sql =
s"""
|SELECT request_id, adn.campaign_id campaign_id, cmp_pkg.package_name package_name, platform, gaid, idfa, imei,
......@@ -41,9 +45,9 @@ class AdnInstallDaily extends CommonSparkJob with Serializable {
| FROM (
| SELECT request_id, campaign_id, platform, gaid, idfa, imei, dev_id android_id, getDevId(ext_sysid) sysid,
| getRuid(ext_algo) ruid, getDevId(cdn_ab) idfv, ext_oaid oaid
| FROM dwh.ods_adn_trackingnew_install WHERE CONCAT(yyyy,mm,dd) = '$datetime'
| FROM dwh.ods_adn_trackingnew_install WHERE yyyy = '$year' AND mm = '$month' AND dd = '$day'
| ) adn LEFT JOIN
| (SELECT campaign_id, package_name FROM dwh.dim_adn_campaign WHERE CONCAT(year,month,day) = '$datetime') cmp_pkg
| (SELECT campaign_id, package_name FROM dwh.dim_adn_campaign WHERE year = '$year' AND month = '$month' AND day = '$day') cmp_pkg
| ON adn.campaign_id = cmp_pkg.campaign_id
|""".stripMargin
......
......@@ -32,6 +32,11 @@ class AdnOrgLogEtlHours extends CommonSparkJob with Serializable {
val region = commandLine.getOptionValue("region")
val output = commandLine.getOptionValue("output")
val year = datetime.substring(0, 4)
val month = datetime.substring(4, 6)
val day = datetime.substring(6, 8)
val hh = datetime.substring(8, 10)
val spark = MobvistaConstant.createSparkSession(s"AdnOrgLogEtlHours.$datetime.$region")
// 添加hb request的数据入库dmp,数据源在s3://mob-ad/adn/hb-v1/request 本身该路径已经挂载表dwh.ods_adn_trackingnew_hb_request 但是该表的挂载语句和和下面用到的表dwh.ods_adn_trackingnew_request挂载语句不是同时执行的
// 执行这个文件的shell脚本,运行运行时是通过判断路径下的_SUCCESS文件来进行运行的,可能出现表dwh.ods_adn_trackingnew_request挂载成功,但是表dwh.ods_adn_trackingnew_hb_request 挂载还没成功的情况,此时表dwh.ods_adn_trackingnew_hb_request 访问不到
......@@ -42,12 +47,12 @@ class AdnOrgLogEtlHours extends CommonSparkJob with Serializable {
|SELECT date, time, created timestamp, app_id, platform, os_version, sdk_version, device_model, screen_size, country_code,
| language, strategy, ip, imei, mac, dev_id android_id, gaid, idfa, device_brand, getDevId(cdn_ab) idfv, ext_packagename package_name,
| getDevId(ext_sysid) sysid, ext_oaid oaid, getRuid(ext_algo) ruid
| FROM dwh.ods_adn_trackingnew_request WHERE CONCAT(yyyy,mm,dd,hh) = '$datetime' AND re = '$region'
| FROM dwh.ods_adn_trackingnew_request WHERE yyyy = '$year' and mm = '$month' and dd = '$day' and hh = '$hh' AND re = '$region'
| UNION
| SELECT date, time, created timestamp, app_id, platform, os_version, sdk_version, device_model, screen_size, country_code,
| language, strategy, ip, imei, mac, dev_id android_id, gaid, idfa, device_brand, getDevId(cdn_ab) idfv, ext_packagename package_name,
| getDevId(ext_sysid) sysid, ext_oaid oaid, getRuid(ext_algo) ruid
| FROM dwh.ods_adn_trackingnew_request_tmp_hb_request WHERE CONCAT(yyyy,mm,dd,hh) = '$datetime' AND re = '${region}_hb_request'
| FROM dwh.ods_adn_trackingnew_request_tmp_hb_request WHERE yyyy = '$year' and mm = '$month' and dd = '$day' and hh = '$hh' AND re = '${region}_hb_request'
|""".stripMargin
try {
......
......@@ -37,10 +37,14 @@ class AdnPreClickDaily extends CommonSparkJob with Serializable {
val spark = MobvistaConstant.createSparkSession(s"AdnPreClickDaily.$datetime")
val year = datetime.substring(0, 4)
val month = datetime.substring(4, 6)
val day = datetime.substring(6, 8)
val sql =
s"""
|SELECT request_id, campaign_id, platform, gaid, idfa, imei, dev_id android_id, getDevId(ext_sysid) sysid, getRuid(ext_algo) ruid, getDevId(cdn_ab) idfv, ext_oaid oaid
|FROM dwh.ods_adn_trackingnew_preclick WHERE CONCAT(yyyy,mm,dd) = '$datetime' AND request_id != '0' AND LENGTH(campaign_id) >= 2
|FROM dwh.ods_adn_trackingnew_preclick WHERE yyyy = '$year' AND mm = '$month' AND dd = '$day' AND request_id != '0' AND LENGTH(campaign_id) >= 2
|""".stripMargin
try {
......
......@@ -49,6 +49,10 @@ class AdnAdxDeviceTag extends CommonSparkJob {
try {
val year = today.substring(0, 4)
val month = today.substring(4, 6)
val day = today.substring(6, 8)
// 数据量很少,直接写表操作
val dim_adn_adx_package=
......@@ -65,7 +69,7 @@ class AdnAdxDeviceTag extends CommonSparkJob {
| t2.price_raw price,
| t2.`date`
|from (select request_id,substr(package,3) package_name
|from adn_report.adndata_midway_backend_v2 where concat(yyyy,mm,dd) = '${today}' and platform ='2' and backend_id = '17' ) t1
|from adn_report.adndata_midway_backend_v2 where yyyy = '$year' and mm = '$month' and dd = '$day' and platform ='2' and backend_id = '17' ) t1
|join (select * from dwh.ods_adn_adx_req_tmp where dt = '${today}' ) t2
|on(t1.request_id = t2.request_id)
|join (select package_name,platform,first_tag from dwh.dim_package_tags where first_tag in ('lightgame','moderategame','hardcoregame') ) t3
......
......@@ -54,6 +54,9 @@ class AdnTecentAdxDataMidWay extends CommonSparkJob {
try {
val year = today.substring(0, 4)
val month = today.substring(4, 6)
val day = today.substring(6, 8)
val ods_adn_adx_req_tmp_pre =
s"""
......@@ -67,7 +70,7 @@ class AdnTecentAdxDataMidWay extends CommonSparkJob {
| explode (dsp_response) adx_dsp,
| `date`
|from dwh.ods_adn_adx_v1_request
|where concat(yyyy,mm,dd) = '${today}' and device_info.ifa rlike '${didPtn}') t
|where yyyy = '$year' and mm = '$month' and dd = '$day' and device_info.ifa rlike '${didPtn}') t
|where t.adx_dsp.id in (4,27) and t.os ='ios' and adx_dsp.sbid[0].bid[0].price_raw > 100
""".stripMargin
......@@ -85,14 +88,13 @@ class AdnTecentAdxDataMidWay extends CommonSparkJob {
.option("orc.compress", "zlib")
.orc(outputadxtmp)
// 数据量很少,直接写表操作 hive.exec.reducers.bytes.per.reducer
val dim_adn_adx_package =
s"""
|insert overwrite table dwh.dim_adn_adx_package partition(dt='${today}')
|select substr(t1.package_name,3) package_name
|from (select request_id,package package_name,platform,backend_id
|from adn_report.adndata_midway_backend_v2 where concat(yyyy,mm,dd) = '${today}' and platform ='2' and backend_id = '17' ) t1
|from adn_report.adndata_midway_backend_v2 where yyyy = '$year' and mm = '$month' and dd = '$day' and platform ='2' and backend_id = '17' ) t1
|join ods_adn_adx_req_tmp_pre t2
|on(t1.request_id = t2.request_id)
|where t1.package_name like 'id%'
......
......@@ -66,11 +66,15 @@ class AdnRequestSdkEtlDaily extends CommonSparkJob with java.io.Serializable {
(r(0), r(1))
}).collectAsMap())
val year = date.substring(0, 4)
val month = date.substring(4, 6)
val day = date.substring(6, 8)
val sql =
s"""
|SELECT gaid, idfa, imei, androidid, extsysid, platform, appid, devicemodel, devicebrand, countrycode, strategy, oaid, idfv, ruid, osversion, rg
| FROM dwh.etl_adn_org_request_daily_hours
| WHERE CONCAT(yt,mt,dt) = '$date'
| WHERE yt = '$year' AND mt = '$month' AND dt = '$day'
|""".stripMargin
val rdd = spark.sql(sql).coalesce(coalesce * 2).rdd.map(row => {
......
......@@ -141,12 +141,16 @@ object AdnSdkDaily extends Serializable {
row.getString(3).contains("key=2000047")
})
val year = loadTime.substring(0, 4)
val month = loadTime.substring(4, 6)
val day = loadTime.substring(6, 8)
spark.createDataFrame(rdd_2000047, input_schema).filter("device_id!=''").createOrReplaceTempView("adn_sdk_input_2000047")
import spark.implicits._
val sql01 =
s"""select requestid,campaignid from dwh.ods_adn_trackingnew_request
| LATERAL VIEW explode(split(extra2,',')) test_tmp_table AS campaignid
| where concat(yyyy,mm,dd)='${loadTime}' and requestid!='' and campaignid!=''
| where yyyy = '$year' and mm = '$month' and dd = '$day' and requestid!='' and campaignid!=''
| """.stripMargin
val Requestid_Campaignid_df = spark.sql(sql01).select($"requestid".cast("string"), $"campaignid".cast("string"))
Requestid_Campaignid_df.createOrReplaceTempView("Requestid_Campaignid")
......@@ -177,7 +181,8 @@ object AdnSdkDaily extends Serializable {
|join Campaignid_Packagename t3 on t2.campaignid=t3.id
|union
|select t1.device_id,t1.device_type,t1.platform,t1.data,t1.model,t1.country,t1.sdk_version,t1.brand,t2.campaign_id,t2.package_name from (select * from adn_sdk_input_2000047 where hb='1') t1
|join (select get_json_object(ext_dsp, '$$.bd') package_name,bidid rid_n,get_json_object(ext_dsp, '$$.cid') campaign_id from dwh.ods_adn_hb_v1_bid where concat(yyyy,mm,dd)='${loadTime}') t2 on t1.rid_n=t2.rid_n
|join (select get_json_object(ext_dsp, '$$.bd') package_name,bidid rid_n,get_json_object(ext_dsp, '$$.cid') campaign_id from dwh.ods_adn_hb_v1_bid
| where yyyy = '$year' and mm = '$month' and dd = '$day') t2 on t1.rid_n=t2.rid_n
|""".stripMargin
).rdd.map(contents => {
parseKey2000047Data(contents, loadTime)
......
......@@ -78,6 +78,10 @@ class MergeAppTagID extends CommonSparkJob with java.io.Serializable {
(packageName, Integer.parseInt(r.getAs("id").toString))
}).collectAsMap())
val year = date.substring(0, 4)
val month = date.substring(4, 6)
val day = date.substring(6, 8)
var sql =
s"""
|SELECT
......@@ -85,7 +89,7 @@ class MergeAppTagID extends CommonSparkJob with java.io.Serializable {
| CASE WHEN platform = 'ios' THEN 'ios' ELSE 'android' END AS platform,
| tag
| FROM dwh.dim_app_tag
| WHERE concat(year, month, day) = '${date}'
| WHERE year = '$year' and month = '$month' and day = '$day'
""".stripMargin
import spark.implicits._
......
package mobvista.dmp.datasource.btop
import java.net.URI
import mobvista.dmp.common.CommonSparkJob
import org.apache.commons.cli.Options
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.{SaveMode, SparkSession}
import java.net.URI
class BtopDaily extends CommonSparkJob with Serializable {
override protected def buildOptions(): Options = {
val options = new Options
......@@ -48,12 +48,15 @@ class BtopDaily extends CommonSparkJob with Serializable {
.getOrCreate()
val sc = spark.sparkContext
import spark.implicits._
FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output), true)
// idfv,imei,gaid,idfa,android_id
// case when name='11' then 'android' when name='aa' then 'ios' end as os_platform,
try {
val year = dt_today.substring(0, 4)
val month = dt_today.substring(4, 6)
val day = dt_today.substring(6, 8)
// 当日新增设备
val append_sql=
s"""
......@@ -68,42 +71,42 @@ class BtopDaily extends CommonSparkJob with Serializable {
| os_platform,
| qcc_package,
| country_code
| from uparpu_main.uparpu_plugin_qcc_package where concat(yyyy,mm,dd)='${dt_today}' and imei!='' and qcc_package_install_status='1'
| from uparpu_main.uparpu_plugin_qcc_package where yyyy = '$year' and mm = '$month' and dd = '$day' and imei!='' and qcc_package_install_status='1'
| union
| select gaid device_id,
| 'gaid' device_type,
| os_platform,
| qcc_package,
| country_code
| from uparpu_main.uparpu_plugin_qcc_package where concat(yyyy,mm,dd)='${dt_today}' and gaid!='' and qcc_package_install_status='1'
| from uparpu_main.uparpu_plugin_qcc_package where yyyy = '$year' and mm = '$month' and dd = '$day' and gaid!='' and qcc_package_install_status='1'
| union
| select idfa device_id,
| 'idfa' device_type,
| os_platform,
| qcc_package,
| country_code
| from uparpu_main.uparpu_plugin_qcc_package where concat(yyyy,mm,dd)='${dt_today}' and idfa!='' and qcc_package_install_status='1'
| from uparpu_main.uparpu_plugin_qcc_package where yyyy = '$year' and mm = '$month' and dd = '$day' and idfa!='' and qcc_package_install_status='1'
| union
| select android_id device_id,
| 'android_id' device_type,
| os_platform,
| qcc_package,
| country_code
| from uparpu_main.uparpu_plugin_qcc_package where concat(yyyy,mm,dd)='${dt_today}' and android_id!='' and qcc_package_install_status='1'
| from uparpu_main.uparpu_plugin_qcc_package where yyyy = '$year' and mm = '$month' and dd = '$day' and android_id!='' and qcc_package_install_status='1'
| union
| select idfv device_id,
| 'idfv' device_type,
| os_platform,
| qcc_package,
| country_code
| from uparpu_main.uparpu_plugin_qcc_package where concat(yyyy,mm,dd)='${dt_today}' and idfv!='' and qcc_package_install_status='1'
| from uparpu_main.uparpu_plugin_qcc_package where yyyy = '$year' and mm = '$month' and dd = '$day' and idfv!='' and qcc_package_install_status='1'
| union
| select oaid device_id,
| 'oaid' device_type,
| os_platform,
| qcc_package,
| country_code
| from uparpu_main.uparpu_plugin_qcc_package where concat(yyyy,mm,dd)='${dt_today}' and oaid!='' and qcc_package_install_status='1'
| from uparpu_main.uparpu_plugin_qcc_package where yyyy = '$year' and mm = '$month' and dd = '$day' and oaid!='' and qcc_package_install_status='1'
| ) t1
|join
|(select id,package_name from uparpu_main.uparpu_qcc_package ) t2
......@@ -127,42 +130,42 @@ class BtopDaily extends CommonSparkJob with Serializable {
| os_platform,
| qcc_package,
| country_code
| from uparpu_main.uparpu_plugin_qcc_package where concat(yyyy,mm,dd)='${dt_today}' and imei!='' and qcc_package_install_status='2'
| from uparpu_main.uparpu_plugin_qcc_package where yyyy = '$year' and mm = '$month' and dd = '$day' and imei!='' and qcc_package_install_status='2'
| union
| select gaid device_id,
| 'gaid' device_type,
| os_platform,
| qcc_package,
| country_code
| from uparpu_main.uparpu_plugin_qcc_package where concat(yyyy,mm,dd)='${dt_today}' and gaid!='' and qcc_package_install_status='2'
| from uparpu_main.uparpu_plugin_qcc_package where yyyy = '$year' and mm = '$month' and dd = '$day' and gaid!='' and qcc_package_install_status='2'
| union
| select idfa device_id,
| 'idfa' device_type,
| os_platform,
| qcc_package,
| country_code
| from uparpu_main.uparpu_plugin_qcc_package where concat(yyyy,mm,dd)='${dt_today}' and idfa!='' and qcc_package_install_status='2'
| from uparpu_main.uparpu_plugin_qcc_package where yyyy = '$year' and mm = '$month' and dd = '$day' and idfa!='' and qcc_package_install_status='2'
| union
| select android_id device_id,
| 'android_id' device_type,
| os_platform,
| qcc_package,
| country_code
| from uparpu_main.uparpu_plugin_qcc_package where concat(yyyy,mm,dd)='${dt_today}' and android_id!='' and qcc_package_install_status='2'
| from uparpu_main.uparpu_plugin_qcc_package where yyyy = '$year' and mm = '$month' and dd = '$day' and android_id!='' and qcc_package_install_status='2'
| union
| select idfv device_id,
| 'idfv' device_type,
| os_platform,
| qcc_package,
| country_code
| from uparpu_main.uparpu_plugin_qcc_package where concat(yyyy,mm,dd)='${dt_today}' and idfv!='' and qcc_package_install_status='2'
| from uparpu_main.uparpu_plugin_qcc_package where yyyy = '$year' and mm = '$month' and dd = '$day' and idfv!='' and qcc_package_install_status='2'
| union
| select oaid device_id,
| 'oaid' device_type,
| os_platform,
| qcc_package,
| country_code
| from uparpu_main.uparpu_plugin_qcc_package where concat(yyyy,mm,dd)='${dt_today}' and oaid!='' and qcc_package_install_status='2'
| from uparpu_main.uparpu_plugin_qcc_package where yyyy = '$year' and mm = '$month' and dd = '$day' and oaid!='' and qcc_package_install_status='2'
| ) t1
|join
|(select id,package_name from uparpu_main.uparpu_qcc_package ) t2
......
......@@ -47,7 +47,8 @@ object Constant {
val tracking_install_sql: String =
"""
|SELECT b.device_id, device_model, os_version, UPPER(country) country, city, CAST(b.offer_id AS string) offer_id, CAST(COALESCE(a.id,'') AS string) id, COALESCE(a.event_name,'') event_name, a.event_type FROM
| (SELECT devid device_id, MAX(device) device_model, MAX(os_version) os_version, MAX(country) country, MAX(city) city, uuid offer_id FROM dwh.ods_3s_trackingnew_install WHERE CONCAT(yyyy,mm,dd) = '@date' GROUP BY devid, uuid) b
| (SELECT devid device_id, MAX(device) device_model, MAX(os_version) os_version, MAX(country) country, MAX(city) city, uuid offer_id FROM dwh.ods_3s_trackingnew_install
| WHERE yyyy = '@year' and mm = '@month' and dd = '@day' GROUP BY devid, uuid) b
| LEFT JOIN
| (SELECT id, event_name, event_type, offer_id FROM dwh.ods_3s_trackingcsv_event_define WHERE yyyymmdd = '@date') a
| ON a.offer_id = b.offer_id
......@@ -57,7 +58,8 @@ object Constant {
val tracking_event_sql: String =
"""
|SELECT b.device_id, UPPER(country) country, CAST(b.offer_id AS string) offer_id, COALESCE(a.id, b.event_name) id, COALESCE(a.event_name, b.event_name) event_name, COALESCE(a.event_type,'') event_type FROM
| (SELECT devid device_id, MAX(country) country, event_name, uuid offer_id FROM dwh.ods_3s_trackingcsv_event_info WHERE yyyymmdd = '@date' AND devid IS NOT NULL AND devid <> '' GROUP BY devid, event_name, uuid) b
| (SELECT devid device_id, MAX(country) country, event_name, uuid offer_id FROM dwh.ods_3s_trackingcsv_event_info
| WHERE yyyy = '@year' and mm = '@month' and dd = '@day' AND devid IS NOT NULL AND devid <> '' GROUP BY devid, event_name, uuid) b
| LEFT JOIN
| (SELECT CAST(id AS string) id, event_name, event_type, offer_id FROM dwh.ods_3s_trackingcsv_event_define WHERE yyyymmdd = '@date') a
| ON a.offer_id = b.offer_id
......
package mobvista.dmp.datasource.datatory
import java.net.URI
import mobvista.dmp.common.CommonSparkJob
import org.apache.commons.cli.{BasicParser, Options}
import org.apache.commons.lang3.StringUtils
......@@ -10,14 +8,16 @@ import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{Dataset, Row, SaveMode, SparkSession}
import org.apache.spark.storage.StorageLevel
import java.net.URI
/**
* @package: mobvista.dmp.datasource.datatory
* @author: wangjf
* @date: 2019/4/25
* @time: 下午3:19
* @email: jinfeng.wang@mobvista.com
* @phone: 152-1062-7698
*/
* @package: mobvista.dmp.datasource.datatory
* @author: wangjf
* @date: 2019/4/25
* @time: 下午3:19
* @email: jinfeng.wang@mobvista.com
* @phone: 152-1062-7698
*/
class TrackingEventDaily extends CommonSparkJob with java.io.Serializable {
def commandOptions(): Options = {
val options = new Options()
......@@ -75,7 +75,13 @@ class TrackingEventDaily extends CommonSparkJob with java.io.Serializable {
FileSystem.get(new URI(s"s3://mob-emr-test"), sc.hadoopConfiguration).delete(new Path(output), true)
var sql = Constant.tracking_event_sql.replace("@date", date)
val year = date.substring(0, 4)
val month = date.substring(4, 6)
val day = date.substring(6, 8)
var sql = Constant.tracking_event_sql.replace("@year", year)
.replace("@month", month)
.replace("@day", day)
spark.sql(sql)
.filter(r => {
......
......@@ -45,7 +45,13 @@ class TrackingInstallDaily extends CommonSparkJob with java.io.Serializable {
try {
// spark.udf.register("check_deviceId", Constant.check_deviceId _)
val sql = Constant.tracking_install_sql.replace("@date", date)
val year = date.substring(0, 4)
val month = date.substring(4, 6)
val day = date.substring(6, 8)
val sql = Constant.tracking_install_sql.replace("@year", year)
.replace("@month", month)
.replace("@day", day)
// .replace("@check_deviceId", "check_deviceId(devid)")
spark.sql(sql)
......
......@@ -49,6 +49,9 @@ class OdsDmpUserInfoDaily extends CommonSparkJob with Serializable {
val yesBef1Part = DateUtil.getDayByString(scheduleTime, "yyyy-MM-dd", -1)
val yesBef1Str = sdf2.format(sdf1.parse(yesBef1Part)) // 20181121
val year1 = yesBef1Str.substring(0, 4)
val month1 = yesBef1Str.substring(4, 6)
val day1 = yesBef1Str.substring(6, 8)
//2daysago时间
// 2018-11-20
......@@ -202,7 +205,8 @@ class OdsDmpUserInfoDaily extends CommonSparkJob with Serializable {
|max(country) as country,
|max(os_v) as os_v
|from dwh.dm_profile_total
|where type='dsp' and concat(year,month,day) = '${yesBef1Str}' and dmp_time='${yesBef1Part}' group by device_id) b
|where type='dsp' and year = '$year1' and month = '$month1' and day = '$day1'
| and dmp_time='${yesBef1Part}' group by device_id) b
|on a.dev_id=b.device_id left join
| (select device_id,
|max(id_type) as id_type,
......@@ -211,13 +215,13 @@ class OdsDmpUserInfoDaily extends CommonSparkJob with Serializable {
|max(country_code) as country_code,
|max(os_version) as os_version
|from dwh.ods_adn_device_total
|where concat(year,month,day) = '${yesBef1Str}' and to_date(update_time)='${yesBef1Part}' group by device_id) c
|where year = '$year1' and month = '$month1' and day = '$day1' and to_date(update_time)='${yesBef1Part}' group by device_id) c
|on a.dev_id=c.device_id left join
| (select devid,
| max(device) as device,
| max(country) as country,
| max(os_version) as os_version
|from dwh.ods_3s_trackingnew_install sss where concat(sss.yyyy,sss.mm,sss.dd)='${yesBef1Str}' group by devid) d
|from dwh.ods_3s_trackingnew_install sss where sss.yyyy = '$year1' and sss.mm = '$month1' and sss.dd = '$day1' group by devid) d
|on a.dev_id=d.devid left join dwh.ga_device_add_tmp e
|on a.dev_id=e.dev_id
""".stripMargin
......
......@@ -53,6 +53,11 @@ class DspOrgLogEtlHoursDemo extends CommonSparkJob with Serializable {
import spark.implicits._
val year = yyyymmddhh.substring(0, 4)
val month = yyyymmddhh.substring(4, 6)
val day = yyyymmddhh.substring(6, 8)
val hh = yyyymmddhh.substring(8, 10)
val sql =
s"""
select idfa,googleadid gaid,
......@@ -61,7 +66,7 @@ class DspOrgLogEtlHoursDemo extends CommonSparkJob with Serializable {
| make,model,osv osversion,ext6 ua,
| appid packagename,ext3,exchanges,ext5 exitid,`time`,
| body json_msg,rg region
| from adn_dsp.log_adn_dsp_request_orc_hour where concat(yr,mt,dt,hh)='${yyyymmddhh}'
| from adn_dsp.log_adn_dsp_request_orc_hour where yr = '$year' and mt = '$month' and dt = '$day' and hh = '$hh'
""".stripMargin
val df = spark.sql(sql).filter(filterData _)
.rdd
......
......@@ -53,6 +53,10 @@ class PostBackEvent extends CommonSparkJob with java.io.Serializable {
try {
val year = today.substring(0, 4)
val month = today.substring(4, 6)
val day = today.substring(6, 8)
val sql1 =
s"""
|select UPPER(idfa) idfa,
......@@ -69,7 +73,7 @@ class PostBackEvent extends CommonSparkJob with java.io.Serializable {
|install_time,
|cast(`date` as string) update_date,
|type
|from dwh.ods_adn_trackingnew_postback_event where concat(yyyy,mm,dd) = '${today}' and type in ('appsflyer','min_appsflyer','tenjin','adjust')
|from dwh.ods_adn_trackingnew_postback_event where yyyy = '$year' and mm = '$month' and dd = '$day' and type in ('appsflyer','min_appsflyer','tenjin','adjust')
|group by
|UPPER(idfa),
|md5_idfa,
......
......@@ -44,12 +44,18 @@ class PostBackDaily extends CommonSparkJob {
FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output), true)
val year = today.substring(0, 4)
val month = today.substring(4, 6)
val day = today.substring(6, 8)
try {
val sql1 =
s"""
|select UPPER(idfa) idfa,md5_idfa,LOWER(gaid) gaid,md5_gaid,lower(pl) platform,lower(app_id) package_name,country,install_time,cast(`date` as string) update_date,type from dwh.ods_adn_trackingnew_postback_install where concat(yyyy,mm,dd) = '${today}' and type in ('appsflyer','min_appsflyer','tenjin','adjust','reyun') and app_id!="0"
|select UPPER(idfa) idfa,md5_idfa,LOWER(gaid) gaid,md5_gaid,lower(pl) platform,lower(app_id) package_name,country,install_time,cast(`date` as string) update_date,type from dwh.ods_adn_trackingnew_postback_install
| where yyyy = '$year' and mm = '$month' and dd = '$day' and type in ('appsflyer','min_appsflyer','tenjin','adjust','reyun') and app_id!="0"
|union
|select UPPER(idfa) idfa,md5_idfa,LOWER(gaid) gaid,md5_gaid,lower(pl) platform,lower(app_id) package_name,country,install_time,cast(`date` as string) update_date,type from dwh.ods_adn_trackingnew_postback_event where concat(yyyy,mm,dd) = '${today}' and type in ('appsflyer','min_appsflyer','tenjin','adjust','reyun') and app_id!="0"
|select UPPER(idfa) idfa,md5_idfa,LOWER(gaid) gaid,md5_gaid,lower(pl) platform,lower(app_id) package_name,country,install_time,cast(`date` as string) update_date,type from dwh.ods_adn_trackingnew_postback_event
| where yyyy = '$year' and mm = '$month' and dd = '$day' and type in ('appsflyer','min_appsflyer','tenjin','adjust','reyun') and app_id!="0"
""".stripMargin
spark.sql(sql1).createOrReplaceTempView("etl_3s_postback_daily")
......
......@@ -14,7 +14,7 @@ object Constant {
| mac, dev_id, idfa, ext_packagename, ext_finalpackagename, ext_channel, ext_oaid, ext_advinstalltime, ext_eventtime,
| ext_campaignpackagename, ext_deviceid
| FROM dwh.ods_adn_trackingnew_impression
| WHERE CONCAT(yyyy,mm,dd) = '@date' AND UPPER(country_code) = 'CN'
| WHERE yyyy = '@year' and mm = '@month' and dd = '@day' AND UPPER(country_code) = 'CN'
|""".stripMargin
val tracking_install =
......@@ -22,7 +22,7 @@ object Constant {
|SELECT created, app_id, advertiser_id, creative_id, platform, os_version, device_brand, device_model, country_code, network_type,
| ip, imei, mac, dev_id, idfa, ext_campaignpackagename, ext_finalpackagename, ext_advinstalltime, ext_oaid, ext_eventtime, ext_deviceid
| FROM dwh.ods_adn_trackingnew_install
| WHERE CONCAT(yyyy,mm,dd) = '@date'
| WHERE yyyy = '@year' and mm = '@month' and dd = '@day'
|""".stripMargin
val tracking_click =
......@@ -31,7 +31,7 @@ object Constant {
| mac, dev_id, idfa, ext_packagename, ext_finalpackagename, ext_channel, ext_oaid, ext_advinstalltime, ext_eventtime,
| ext_campaignpackagename, ext_deviceid
| FROM dwh.ods_adn_trackingnew_click
| WHERE CONCAT(yyyy,mm,dd) = '@date'
| WHERE yyyy = '@year' and mm = '@month' and dd = '@day'
|""".stripMargin
val user_info =
......
......@@ -39,6 +39,10 @@ class TrackingLog extends CommonSparkJob {
try {
val year = date.substring(0, 4)
val month = date.substring(4, 6)
val day = date.substring(6, 8)
val sql =
log_type match {
case "impression" =>
......@@ -48,7 +52,9 @@ class TrackingLog extends CommonSparkJob {
case "click" =>
tracking_click
}
val df = spark.sql(sql.replace("@date", date))
val df = spark.sql(sql.replace("@year", year)
.replace("@month", month)
.replace("@day", day))
df.repartition(coalesce)
.write
......
package mobvista.dmp.utils.common
import mobvista.dmp.common.MobvistaConstant
import mobvista.dmp.util.MD5Util.getMD5Str
import java.io.UnsupportedEncodingException
import java.math.BigInteger
......@@ -36,12 +36,17 @@ object MD5Util {
strm.toLong
}
def main(args: Array[String]): Unit = {
val data =
"""
|{ "@timestamp": "07/Dec/2021:23:13:36 +0800", "@fields": { "remote_addr": "100.122.18.27", "remote_user": "-", "body_bytes_sent": "0", "status": "200", "http_host": "analytics-http-ab.mintegral.net", "request": "POST / HTTP/1.1", "request_method": "POST", "request_body": "platform=1&package_name=com.Upperpik.HairChallenge&os_version=11&brand=samsung&model=SM-T510&gaid=6b4e83cb-b21b-41e4-a187-ed1c2c56197e&network_type=9&network_str=&language=es-ES&timezone=GMT%252B01%253A00&ua=Mozilla%252F5.0%2B%2528Linux%253B%2BAndroid%2B11%253B%2BSM-T510
|""".stripMargin
def hashMD5(content: String): String = {
val md5 = MessageDigest.getInstance("MD5")
val encoded = md5.digest((content).getBytes)
encoded.map("%02x".format(_)).mkString
}
println(MobvistaConstant.String2JSONObject(data))
def main(args: Array[String]): Unit = {
val data = "abc"
println(hashMD5(data))
println(hashMD5(data).length)
println(getMD5Str(data))
println(getMD5Str(data).length)
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment