package mobvista.dmp.datasource.retargeting import java.util.Properties import org.apache.commons.lang3.StringUtils import org.apache.spark.sql._ object Constant { val old2new_sql: String = """ |SELECT UPPER(CONCAT(tag_type, '-', first_tag, '-', second_tag)) tag_code, new_second_id FROM | dwh.dm_old2new_tag WHERE tag_id != '' AND tag_id IS NOT NULL """.stripMargin val second2first_sql: String = """ |SELECT new_first_id, new_second_id FROM dwh.dm_old2new_tag """.stripMargin val id_old2new_sql: String = """ |SELECT new_second_id tag_code, tag_id FROM | dwh.dm_old2new_tag WHERE tag_id != '' AND tag_id IS NOT NULL """.stripMargin val ods_dmp_user_info_all_sql: String = """ |SELECT dm_device.device_id, dm_device.platform, dm_device.country, CAST(COALESCE(dm_device.age,0) AS INT) age, CAST(COALESCE(dm_device.gender,0) AS INT) gender, dm_device.install, dm_device.interest, dm_active.frequency, dm_device.update_date | FROM | (SELECT UPPER(dev_id) device_id, MAX(platform) platform, MAX(country) country, MAX(age) age, MAX(gender) gender, getInstallList(CONCAT_WS(',',COLLECT_SET(install))) install, getInterestList(CONCAT_WS('#',COLLECT_SET(interest))) interest, MAX(update_date) update_date | FROM dwh.ods_dmp_user_info_all WHERE dt = '@date' AND update_date >= '@update_date' AND checkDevice(dev_id) | GROUP BY UPPER(dev_id) | ) dm_device | LEFT JOIN dm_active | ON dm_device.device_id = dm_active.device_id """.stripMargin val ods_dmp_user_info_all_sql_distinct: String = """ |SELECT dm_device.device_id, dm_device.platform, dm_device.model, dm_device.osversion os_version, dm_device.country, CAST(COALESCE(dm_device.age,0) AS INT) age, CAST(COALESCE(dm_device.gender,0) AS INT) gender, dm_device.install, dm_device.interest, | dm_device.behavior, dm_active.frequency, active_week.tags tag_week, active_month.tags tag_month, COALESCE(dm_region.region,ARRAY()) region, dm_device.update_date, dm_device.publish_date | FROM | (SELECT LOWER(dev_id) device_id, platform, model, osversion, CASE WHEN country = 'GB' THEN 'UK' ELSE country END AS country, age, gender, getInstallList(install) install, getInterestList(interest) interest, behavior, update_date, publish_date | FROM dwh.ods_dmp_user_info_all_v2 WHERE dt = '@date' AND checkDevice(dev_id) | ) dm_device | LEFT JOIN dm_active ON dm_device.device_id = dm_active.device_id | LEFT JOIN (SELECT LOWER(device_id) device_id, MAX(tags) tags FROM dwh.dm_active_tag WHERE dt = '@activeDate' AND part = 'week' GROUP BY lower(device_id)) active_week ON dm_device.device_id = active_week.device_id | LEFT JOIN (SELECT LOWER(device_id) device_id, MAX(tags) tags FROM dwh.dm_active_tag WHERE dt = '@activeDate' AND part = 'month' GROUP BY lower(device_id)) active_month ON dm_device.device_id = active_month.device_id | LEFT JOIN (SELECT LOWER(device_id) device_id, COLLECT_SET(region) region FROM dev.dm_device_region WHERE dt = '@date' AND region IN ('cn','virginia','tokyo') GROUP BY lower(device_id)) dm_region ON dm_device.device_id = dm_region.device_id """.stripMargin val ods_dmp_user_info_all_sql_distinct_region: String = """ |SELECT dm_device.device_id, dm_device.platform, device_region.region, dm_device.model, dm_device.osversion os_version, dm_device.country, CAST(COALESCE(dm_device.age,0) AS INT) age, CAST(COALESCE(dm_device.gender,0) AS INT) gender, dm_device.install, dm_device.interest, | dm_device.behavior, dm_active.frequency, active_week.tags tag_week, active_month.tags tag_month, dm_device.update_date | FROM | (SELECT LOWER(dev_id) device_id, platform, model, osversion, CASE WHEN country = 'GB' THEN 'UK' ELSE country END AS country, age, gender, getInstallList(install) install, getInterestList(interest) interest, behavior, update_date | FROM dwh.ods_dmp_user_info_all WHERE dt = '@date' AND update_date >= '@update_date' AND checkDevice(dev_id) | ) dm_device | LEFT JOIN dm_active ON dm_device.device_id = dm_active.device_id | LEFT JOIN (SELECT LOWER(device_id) device_id, MAX(tags) tags FROM dwh.dm_active_tag WHERE dt = '@activeDate' AND part = 'week' GROUP BY lower(device_id)) active_week ON dm_device.device_id = active_week.device_id | LEFT JOIN (SELECT LOWER(device_id) device_id, MAX(tags) tags FROM dwh.dm_active_tag WHERE dt = '@activeDate' AND part = 'month' GROUP BY lower(device_id)) active_month ON dm_device.device_id = active_month.device_id | LEFT JOIN (SELECT LOWER(device_id) device_id, MAX(region) region FROM dev.dm_device_region WHERE dt = '@date' AND last_req_day >= '@last_req_day' GROUP BY lower(device_id)) device_region ON dm_device.device_id = device_region.device_id """.stripMargin val statistics_sql: String = """ |SELECT LOWER(device_id) device_id,tags frequency | FROM dwh.dmp_device_tag_statistics | WHERE dt = '@date' """.stripMargin def checkDevice(device_id: String): Boolean = { StringUtils.isNotBlank(device_id) } val package_sql: String = """ |SELECT package_name FROM dwh.dm_install_list_v2 WHERE dt = '@date' AND update_date >= '@update_date' """.stripMargin // import com.datastax.spark.connector._ // val deviceTagColumn = SomeColumns("device_id", "age", "gender", "install_apps", "interest", "frequency" overwrite) def jdbcConnection(spark: SparkSession, database: String, table: String): DataFrame = { val properties = new Properties() properties.put("driver", "com.mysql.jdbc.Driver") // properties.put("user", "root") // properties.put("password", "19920627") // properties.put("user", "apptag_rw") // properties.put("password", "7gyLEVtkER3u8c9") // 切换为 线上MySQL properties.put("user", "mob_mfordmp") properties.put("password", "3xqbE7MNX8WHf2H") properties.put("characterEncoding", "utf8") properties.put("zeroDateTimeBehavior", "convertToNull") // val url = s"jdbc:mysql://dataplatform-app-tag.c5yzcdreb1xr.us-east-1.rds.amazonaws.com:3306/${database}" val url = s"jdbc:mysql://adn-mysql-internal.mobvista.com:3306/${database}" spark.read.jdbc(url = url, table = table, properties = properties) } def writeMySQL(df: Dataset[Row], database: String, table: String, saveMode: SaveMode): Unit = { val properties = new Properties() properties.put("driver", "com.mysql.jdbc.Driver") // properties.put("user", "root") // properties.put("password", "19920627") // properties.put("user", "apptag_rw") // properties.put("password", "7gyLEVtkER3u8c9") properties.put("user", "mob_mfordmp") properties.put("password", "3xqbE7MNX8WHf2H") properties.put("characterEncoding", "utf8") properties.put("batchsize", "5000") properties.put("innodb_lock_wait_timeout", "600") // val url = s"jdbc:mysql://dataplatform-app-tag.c5yzcdreb1xr.us-east-1.rds.amazonaws.com:3306/${database}" val url = s"jdbc:mysql://adn-mysql-internal.mobvista.com:3306/${database}" df.write.mode(saveMode).jdbc(url, table, properties) } def writeMySQLTest(df: Dataset[Row], database: String, table: String, saveMode: SaveMode): Unit = { val properties = new Properties() properties.put("driver", "com.mysql.jdbc.Driver") properties.put("user", "root") properties.put("password", "19920627") properties.put("user", "apptag_rw") properties.put("password", "7gyLEVtkER3u8c9") val url = s"jdbc:mysql://localhost:3306/$database" df.write.mode(saveMode).jdbc(url, table, properties) } // ('cn','virginia','tokyo') val user_feature_sql: String = s""" |SELECT u.* | FROM | (SELECT LOWER(device_id) device_id, age, gender, interest, concat_ws(',',install) install_apps, frequency FROM dwh.dm_user_info a WHERE dt = '@date' | AND update_date >= '@update_date' AND ((country != '' AND country IS NOT NULL) OR (age != '' AND age IS NOT NULL) OR (gender != '' AND gender IS NOT NULL) OR | (size(interest) != 0 AND interest IS NOT NULL) OR (size(install) != 0 AND install IS NOT NULL)) | ) u | LEFT SEMI JOIN | (SELECT device_id FROM dm_region WHERE region = '@region') r | ON u.device_id = r.device_id """.stripMargin val region_sql: String = s""" |SELECT LOWER(device_id) device_id, region FROM dev.dm_device_region WHERE dt = '@date' AND last_req_day >= '@last_req_day' AND region IN ('cn','virginia','tokyo') | GROUP BY LOWER(device_id), region """.stripMargin }