package mobvista.dmp.datasource.device import java.net.URI import java.text.SimpleDateFormat import mobvista.dmp.common.CommonSparkJob import mobvista.dmp.util.DateUtil import mobvista.prd.datasource.util.GsonUtil import org.apache.commons.cli.{BasicParser, Options} import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.storage.StorageLevel class OdsDmpUserInfoDaily extends CommonSparkJob with Serializable { override protected def run(args: Array[String]): Int = { val parser = new BasicParser() val options = commandOptions() val commandLine = parser.parse(options, args) // 2018-11-22 00:00:01 val scheduleTime = commandLine.getOptionValue("cur_day") val output = commandLine.getOptionValue("output") val coalesce = commandLine.getOptionValue("coalesce") val sdf1 = new SimpleDateFormat("yyyy-MM-dd"); val sdf2 = new SimpleDateFormat("yyyyMMdd"); val spark = SparkSession .builder() .appName("OdsDmpUserInfoDaily_job") .config("spark.rdd.compress", "true") .config("spark.speculation", "false") .config("spark.speculation.quantile", "0.9") .config("spark.speculation.multiplier", "1") .config("spark.io.compression.codec", "snappy") .config("spark.sql.warehouse.dir", "s3://mob-emr-test/spark-warehouse") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .enableHiveSupport() .getOrCreate() import spark.implicits._ FileSystem.get(new URI(s"s3://mob-emr-test"), spark.sparkContext.hadoopConfiguration).delete(new Path(output), true) try { //昨天时间 2018-11-21 val yesBef1Part = DateUtil.getDayByString(scheduleTime, "yyyy-MM-dd", -1) val yesBef1Str = sdf2.format(sdf1.parse(yesBef1Part)) // 20181121 //2daysago时间 // 2018-11-20 val yesBef2Part = DateUtil.getDayByString(scheduleTime, "yyyy-MM-dd", -2) val yesBef2Str = sdf2.format(sdf1.parse(yesBef2Part)) // 20181120 System.out.print("yesBef1Part:" + yesBef1Part + ";yesBef1Str" + yesBef1Str + ";yesBef2Part" + yesBef2Part + ";yesBef2Str" + yesBef2Str) var hql = s""" |drop table if exists dwh.ga_device_add_tmp """.stripMargin spark.sql(hql) //在hive建表读取ga数据 /* hql = s""" |create table dwh.ga_device_add_tmp ( |dev_id string, |model string, |country string, |osversion string |) row format delimited fields terminated by '\t' |location 's3://mob-emr-test/dataplatform/datawarehourse/ga/${yesBef2Str}' """.stripMargin*/ //在hive建表读取ga数据 hql = s""" |create table dwh.ga_device_add_tmp ( |dev_id string, |model string, |country string, |osversion string |) row format delimited fields terminated by '\t' |location 's3://mob-emr-test/dataplatform/datawarehourse/real-time-query/ga_add/${yesBef2Str}' """.stripMargin spark.sql(hql) System.out.print("hql1: " + hql) /* hql = s""" |select X.device_id |from ( |select device_id |from dwh.dm_device_tag |where dt='${yesBef1Str}' and update_date='${yesBef1Part}' and business not in ('ga', 'other') |union all |select device_id |from dwh.dm_device_tag |where dt='${yesBef2Str}' and update_date='${yesBef2Part}' and business in ('ga', 'other') ) X |group by X.device_id """.stripMargin */ hql = s""" |select X.device_id |from ( |select device_id |from dwh.ods_dmp_user_info |where dt='${yesBef1Str}' and last_req_day='${yesBef1Part}' and business not in ('ga', 'other') |union all |select device_id |from dwh.ods_dmp_user_info |where dt='${yesBef2Str}' and last_req_day='${yesBef2Part}' and business in ('ga', 'other') ) X |group by X.device_id """.stripMargin // spark.sql(hql).createOrReplaceTempView("devices_daily") val device_daily_df = spark.sql(hql).persist(StorageLevel.MEMORY_AND_DISK_SER) device_daily_df.createOrReplaceTempView("devices_daily") System.out.print("hql2: " + hql) /* hql = s""" |select a.device_id as dev_id, |dwh.md5(a.device_id) as dev_id_md5, |a.device_type, |a.platform, |concat_ws(',',collect_set(a.package_name)) as install, |concat_ws('#',collect_set(concat_ws(',',a.tag_type,a.first_tag,a.second_tag,a.business))) as interest |from ( |select device_id,device_type,platform,package_name,tag_type,first_tag,second_tag, |case when business='userdesk' then 'userdesk' | when business='talkingdata' then 'talkingdata' | when business='yunhai' then 'yunhai' else 'mv' end as business, |update_date |from dwh.dm_device_tag |where dt='${yesBef1Str}' /*and update_date='${yesBef1Part}' */ and business not in ('ga', 'other') |union all |select device_id,device_type,platform,package_name,tag_type,first_tag,second_tag, |case when business='userdesk' then 'userdesk' | when business='talkingdata' then 'talkingdata' | when business='yunhai' then 'yunhai' else 'mv' end as business, |update_date |from dwh.dm_device_tag |where dt='${yesBef2Str}' /*and update_date='${yesBef2Part}' */ and business in ('ga', 'other') |) a |left semi join devices_daily b |on a.device_id=b.device_id |group by a.device_id,a.device_type,a.platform """.stripMargin */ hql = s""" select a.device_id as dev_id, |dwh.md5(a.device_id) as dev_id_md5, |a.device_type, |a.platform, |concat_ws(',',collect_set(a.package_name)) as install, |concat_ws('#',collect_set(concat_ws(',',a.tag_type,a.first_tag,a.second_tag,a.business))) as interest |from ( |select device_id,device_type,platform,concat(package_name,"|",update_date) package_name ,tag_type,first_tag,second_tag, |case when business='userdesk' then 'userdesk' | when business='talkingdata' then 'talkingdata' | when business='yunhai' then 'yunhai' else 'mv' end as business |from dwh.dm_device_tag |where dt='${yesBef1Str}' and business not in ('ga', 'other') |union all |select device_id,device_type,platform,concat(package_name,"|",update_date) package_name ,tag_type,first_tag,second_tag, |case when business='userdesk' then 'userdesk' | when business='talkingdata' then 'talkingdata' | when business='yunhai' then 'yunhai' else 'mv' end as business |from dwh.dm_device_tag |where dt='${yesBef2Str}' and business in ('ga', 'other') |) a |left semi join devices_daily b on(a.device_id = b.device_id) |group by a.device_id,a.device_type,a.platform """.stripMargin spark.sql(hql).createOrReplaceTempView("dm_pkg_insterest") System.out.print("hql2: " + hql) //增量字段 model country osversion hql = s""" |select a.dev_id as dev_id, |a.dev_id_md5 as dev_id_md5, |coalesce(a.device_type,b.device_type,c.id_type) as dev_type, |coalesce(a.platform,b.platform,c.platform) as platform, |a.install, |a.interest, |coalesce(b.model,c.device_model,d.device,e.model) as model, |coalesce(b.country,c.country_code,d.country,e.country) as country, |coalesce(b.os_v,c.os_version,d.os_version,e.osversion) as osversion |from dm_pkg_insterest a left join | (select device_id, |max(device_type) as device_type, |max(platform) as platform, |max(model) as model, |max(country) as country, |max(os_v) as os_v |from dwh.dm_profile_total |where type='dsp' and concat(year,month,day) = '${yesBef1Str}' and dmp_time='${yesBef1Part}' group by device_id) b |on a.dev_id=b.device_id left join | (select device_id, |max(id_type) as id_type, |max(platform) as platform, |max(device_model) as device_model, |max(country_code) as country_code, |max(os_version) as os_version |from dwh.ods_adn_device_total |where concat(year,month,day) = '${yesBef1Str}' and to_date(update_time)='${yesBef1Part}' group by device_id) c |on a.dev_id=c.device_id left join | (select devid, | max(device) as device, | max(country) as country, | max(os_version) as os_version |from dwh.ods_3s_trackingnew_install sss where concat(sss.yyyy,sss.mm,sss.dd)='${yesBef1Str}' group by devid) d |on a.dev_id=d.devid left join dwh.ga_device_add_tmp e |on a.dev_id=e.dev_id """.stripMargin spark.sql(hql) .createOrReplaceTempView("dm_pkg_insterest_model_os_country") System.out.print("hql3: " + hql) /* var sql = s""" | select t.device_id, t.age, t.tag | from dwh.dm_device_age t | where update_date='${yesBef1Part}' and concat(t.year, t.month, t.day)='${yesBef1Str}' """.stripMargin*/ hql = s""" |select a.device_id,b.age,b.tag |from devices_daily a |join ( select t.device_id, t.age, t.tag | from dwh.dm_device_age t | where concat(t.year, t.month, t.day)='${yesBef1Str}' | ) b on a.device_id=b.device_id """.stripMargin spark.sql(hql) .rdd .map(parseAge(_)) .toDF("device_id", "age", "ratio") .createOrReplaceTempView("tmp_age") System.out.print("hql4: " + hql) hql = """ |select t.device_id, t.age |from ( | select t.device_id, t.age, row_number() over(partition by t.device_id, t.age order by t.ratio desc )as rk | from tmp_age t | where t.age is not null and t.ratio is not null |) t |where t.rk='1' """.stripMargin spark.sql(hql).createOrReplaceTempView("tmp_age_daily") System.out.print("hql5: " + hql) /* sql = s""" |select t.device_id,t.gender from |(select device_id, | case when gender = 'm' then 1 | when gender = 'f' then 2 | else 10 end as gender, row_number() over(partition by device_id order by ratio desc) as rk | from dwh.dm_device_gender | where update_date='${yesBef1Part}' and concat(year, month, day)='${yesBef1Str}' ) t |where t.rk ='1'; """.stripMargin */ hql = s""" |select t.device_id, t.gender |from ( | select a.device_id, b.gender, | row_number() over(partition by a.device_id order by b.ratio desc) as rk | from devices_daily a | join ( | select t.device_id, | case when t.gender = 'm' then 1 | when t.gender = 'f' then 2 | else 10 end as gender, t.ratio | from dwh.dm_device_gender t | where concat(t.year, t.month, t.day)='${yesBef1Str}' | ) b on a.device_id=b.device_id |) t |where t.rk='1' """.stripMargin spark.sql(hql).createOrReplaceTempView("tmp_gender_daily") System.out.print("hql6: " + hql) // 性别、年龄、兴趣join得到最终结果 // age和gender 每个设备取ratio最大的一条 hql = s""" |select a.dev_id, | a.dev_id_md5, | a.dev_type, | a.platform, | a.install, | a.interest, | a.model, | a.country, | a.osversion, | b.age, | c.gender, | d.behavior, | "${yesBef1Part}" update_date |from dm_pkg_insterest_model_os_country a |left outer join tmp_age_daily b on a.dev_id = b.device_id |left outer join tmp_gender_daily c on a.dev_id = c.device_id |left outer join (select device_id,concat_ws(',',collect_set(tag_name)) as behavior |from ( select device_id,tag_name |from dwh.dmp_event_tag_daily |where day='${yesBef1Str}' and tag_source='3s' |union all |select device_id,tag_name |from dwh.dmp_event_tag_daily |where day='${yesBef2Str}' and tag_source='ga' ) X |group by X.device_id |) d on a.dev_id = d.device_id """.stripMargin spark.sql(hql) .repartition(coalesce.toInt) .write .option("orc.compress", "zlib") .orc(output) System.out.print("hql7: " + hql) } finally { if (spark != null) { spark.stop() } } 0 } def commandOptions(): Options = { val options = new Options() options.addOption("cur_day", true, "schedule Time") options.addOption("output", true, "output dir") options.addOption("coalesce", true, "coalesce") options } def parseAge(row: Row): Tuple3[String, Int, Double] = { val deviceId = row.getString(0) val age = row.getString(1) val tag = row.getString(2) val json = GsonUtil.String2JsonObject(age) val ageJson = json.get("age_and_proportion").getAsJsonObject val ageSource = json.get("age_and_source").getAsJsonObject if (ageJson.toString.contains("0-17")) { var max = 0.0d var maxRange = "" if (ageJson.get("0-17").getAsDouble > max) { max = ageJson.get("0-17").getAsDouble maxRange = "0-17" } if (ageJson.get("18-24").getAsDouble > max) { max = ageJson.get("18-24").getAsDouble maxRange = "18-24" } if (ageJson.get("25-44").getAsDouble > max) { max = ageJson.get("25-44").getAsDouble maxRange = "25-44" } if (ageJson.get("45-59").getAsDouble > max) { max = ageJson.get("45-59").getAsDouble maxRange = "45-59" } if (ageJson.get("60+").getAsDouble > max) { max = ageJson.get("60+").getAsDouble maxRange = "60+" } val tmpAge = maxRange match { case "0-17" => 1 case "18-24" => 2 case "25-44" => 3 case "45-59" => 4 case "60+" => 5 case _ => 10 } (deviceId, tmpAge, max) } else if (!"{\"null\":\"null\"}".equals(ageSource.toString)) { var ageRange = ageSource.toString.replaceAll("[\\{\\}\"]", "").split(":", -1)(0) val tmpAge = ageRange match { case "0-17" => 1 case "18-24" => 2 case "25-44" => 3 case "45-59" => 4 case "60+" => 5 case _ => 10 } (deviceId, tmpAge, 1.0) } else { (deviceId, 10, 0) } } } object OdsDmpUserInfoDaily { def main(args: Array[String]): Unit = { new OdsDmpUserInfoDaily().run(args) } }