Constant.scala 8.49 KB
Newer Older
wang-jinfeng committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154
package mobvista.dmp.datasource.retargeting

import java.util.Properties

import org.apache.commons.lang3.StringUtils
import org.apache.spark.sql._

object Constant {

  val old2new_sql: String =
    """
      |SELECT UPPER(CONCAT(tag_type, '-', first_tag, '-', second_tag)) tag_code, new_second_id FROM
      | dwh.dm_old2new_tag WHERE tag_id != '' AND tag_id IS NOT NULL
    """.stripMargin

  val second2first_sql: String =
    """
      |SELECT new_first_id, new_second_id FROM dwh.dm_old2new_tag
    """.stripMargin

  val id_old2new_sql: String =
    """
      |SELECT new_second_id tag_code, tag_id FROM
      | dwh.dm_old2new_tag WHERE tag_id != '' AND tag_id IS NOT NULL
    """.stripMargin

  val ods_dmp_user_info_all_sql: String =
    """
      |SELECT dm_device.device_id, dm_device.platform, dm_device.country, CAST(COALESCE(dm_device.age,0) AS INT) age, CAST(COALESCE(dm_device.gender,0) AS INT) gender, dm_device.install, dm_device.interest, dm_active.frequency, dm_device.update_date
      | FROM
      | (SELECT UPPER(dev_id) device_id, MAX(platform) platform, MAX(country) country, MAX(age) age, MAX(gender) gender, getInstallList(CONCAT_WS(',',COLLECT_SET(install))) install, getInterestList(CONCAT_WS('#',COLLECT_SET(interest))) interest, MAX(update_date) update_date
      |   FROM dwh.ods_dmp_user_info_all WHERE dt = '@date' AND update_date >= '@update_date' AND checkDevice(dev_id)
      |   GROUP BY UPPER(dev_id)
      | ) dm_device
      | LEFT JOIN dm_active
      | ON dm_device.device_id = dm_active.device_id
    """.stripMargin


  val ods_dmp_user_info_all_sql_distinct: String =
    """
      |SELECT dm_device.device_id, dm_device.platform, dm_device.model, dm_device.osversion os_version, dm_device.country, CAST(COALESCE(dm_device.age,0) AS INT) age, CAST(COALESCE(dm_device.gender,0) AS INT) gender, dm_device.install, dm_device.interest,
      | dm_device.behavior, dm_active.frequency, active_week.tags tag_week, active_month.tags tag_month, COALESCE(dm_region.region,ARRAY()) region, dm_device.update_date, dm_device.publish_date
      | FROM
      | (SELECT LOWER(dev_id) device_id, platform, model, osversion, CASE WHEN country = 'GB' THEN 'UK' ELSE country END AS country, age, gender, getInstallList(install) install, getInterestList(interest) interest, behavior, update_date, publish_date
      |    FROM dwh.ods_dmp_user_info_all_v2 WHERE dt = '@date' AND checkDevice(dev_id)
      | ) dm_device
      | LEFT JOIN dm_active ON dm_device.device_id = dm_active.device_id
      | LEFT JOIN (SELECT LOWER(device_id) device_id, MAX(tags) tags FROM dwh.dm_active_tag WHERE dt = '@activeDate' AND part = 'week' GROUP BY lower(device_id)) active_week ON dm_device.device_id = active_week.device_id
      | LEFT JOIN (SELECT LOWER(device_id) device_id, MAX(tags) tags FROM dwh.dm_active_tag WHERE dt = '@activeDate' AND part = 'month' GROUP BY lower(device_id)) active_month ON dm_device.device_id = active_month.device_id
      | LEFT JOIN (SELECT LOWER(device_id) device_id, COLLECT_SET(region) region FROM dev.dm_device_region WHERE dt = '@date' AND region IN ('cn','virginia','tokyo') GROUP BY lower(device_id)) dm_region ON dm_device.device_id = dm_region.device_id
    """.stripMargin

  val ods_dmp_user_info_all_sql_distinct_region: String =
    """
      |SELECT dm_device.device_id, dm_device.platform, device_region.region, dm_device.model, dm_device.osversion os_version, dm_device.country, CAST(COALESCE(dm_device.age,0) AS INT) age, CAST(COALESCE(dm_device.gender,0) AS INT) gender, dm_device.install, dm_device.interest,
      | dm_device.behavior, dm_active.frequency, active_week.tags tag_week, active_month.tags tag_month, dm_device.update_date
      | FROM
      | (SELECT LOWER(dev_id) device_id, platform, model, osversion, CASE WHEN country = 'GB' THEN 'UK' ELSE country END AS country, age, gender, getInstallList(install) install, getInterestList(interest) interest, behavior, update_date
      |    FROM dwh.ods_dmp_user_info_all WHERE dt = '@date' AND update_date >= '@update_date' AND checkDevice(dev_id)
      | ) dm_device
      | LEFT JOIN dm_active ON dm_device.device_id = dm_active.device_id
      | LEFT JOIN (SELECT LOWER(device_id) device_id, MAX(tags) tags FROM dwh.dm_active_tag WHERE dt = '@activeDate' AND part = 'week' GROUP BY lower(device_id)) active_week ON dm_device.device_id = active_week.device_id
      | LEFT JOIN (SELECT LOWER(device_id) device_id, MAX(tags) tags FROM dwh.dm_active_tag WHERE dt = '@activeDate' AND part = 'month' GROUP BY lower(device_id)) active_month ON dm_device.device_id = active_month.device_id
      | LEFT JOIN (SELECT LOWER(device_id) device_id, MAX(region) region FROM dev.dm_device_region WHERE dt = '@date' AND last_req_day >= '@last_req_day' GROUP BY lower(device_id)) device_region ON dm_device.device_id = device_region.device_id
    """.stripMargin

  val statistics_sql: String =
    """
      |SELECT LOWER(device_id) device_id,tags frequency
      | FROM dwh.dmp_device_tag_statistics
      | WHERE dt = '@date'
    """.stripMargin

  def checkDevice(device_id: String): Boolean = {
    StringUtils.isNotBlank(device_id)
  }

  val package_sql: String =
    """
      |SELECT package_name FROM dwh.dm_install_list_v2 WHERE dt = '@date' AND update_date >= '@update_date'
    """.stripMargin

  //  import com.datastax.spark.connector._

  //  val deviceTagColumn = SomeColumns("device_id", "age", "gender", "install_apps", "interest", "frequency" overwrite)

  def jdbcConnection(spark: SparkSession, database: String, table: String): DataFrame = {
    val properties = new Properties()
    properties.put("driver", "com.mysql.jdbc.Driver")
    //  properties.put("user", "root")
    //  properties.put("password", "19920627")
    //  properties.put("user", "apptag_rw")
    //  properties.put("password", "7gyLEVtkER3u8c9")
    //  切换为 线上MySQL
    properties.put("user", "mob_mfordmp")
    properties.put("password", "3xqbE7MNX8WHf2H")
    properties.put("characterEncoding", "utf8")
    properties.put("zeroDateTimeBehavior", "convertToNull")
    //  val url = s"jdbc:mysql://dataplatform-app-tag.c5yzcdreb1xr.us-east-1.rds.amazonaws.com:3306/${database}"
    val url = s"jdbc:mysql://adn-mysql-internal.mobvista.com:3306/${database}"

    spark.read.jdbc(url = url, table = table, properties = properties)
  }

  def writeMySQL(df: Dataset[Row], database: String, table: String, saveMode: SaveMode): Unit = {
    val properties = new Properties()
    properties.put("driver", "com.mysql.jdbc.Driver")
    //  properties.put("user", "root")
    //  properties.put("password", "19920627")
    //  properties.put("user", "apptag_rw")
    //  properties.put("password", "7gyLEVtkER3u8c9")
    properties.put("user", "mob_mfordmp")
    properties.put("password", "3xqbE7MNX8WHf2H")
    properties.put("characterEncoding", "utf8")
    properties.put("batchsize", "5000")
    properties.put("innodb_lock_wait_timeout", "600")
    //  val url = s"jdbc:mysql://dataplatform-app-tag.c5yzcdreb1xr.us-east-1.rds.amazonaws.com:3306/${database}"
    val url = s"jdbc:mysql://adn-mysql-internal.mobvista.com:3306/${database}"

    df.write.mode(saveMode).jdbc(url, table, properties)
  }

  def writeMySQLTest(df: Dataset[Row], database: String, table: String, saveMode: SaveMode): Unit = {
    val properties = new Properties()
    properties.put("driver", "com.mysql.jdbc.Driver")
    properties.put("user", "root")
    properties.put("password", "19920627")
    properties.put("user", "apptag_rw")
    properties.put("password", "7gyLEVtkER3u8c9")
    val url = s"jdbc:mysql://localhost:3306/$database"
    df.write.mode(saveMode).jdbc(url, table, properties)
  }

  //  ('cn','virginia','tokyo')
  val user_feature_sql: String =
    s"""
       |SELECT u.*
       |  FROM
       |    (SELECT LOWER(device_id) device_id, age, gender, interest, concat_ws(',',install) install_apps, frequency FROM dwh.dm_user_info a WHERE dt = '@date'
       |      AND update_date >= '@update_date' AND ((country != '' AND country IS NOT NULL) OR (age != '' AND age IS NOT NULL) OR (gender != '' AND gender IS NOT NULL) OR
       |      (size(interest) != 0 AND interest IS NOT NULL) OR (size(install) != 0 AND install IS NOT NULL))
       |    ) u
       |  LEFT SEMI JOIN
       |    (SELECT device_id FROM dm_region WHERE region = '@region') r
       |  ON u.device_id = r.device_id
    """.stripMargin

  val region_sql: String =
    s"""
       |SELECT LOWER(device_id) device_id, region FROM dev.dm_device_region WHERE dt = '@date' AND last_req_day >= '@last_req_day' AND region IN ('cn','virginia','tokyo')
       |  GROUP BY LOWER(device_id), region
    """.stripMargin
}