package mobvista.dmp.datasource.ga import java.net.URI import java.util.regex.Pattern import mobvista.dmp.util.MRUtils import org.apache.commons.cli.{BasicParser, Options} import org.apache.hadoop.fs.{FileSystem, Path} import scala.collection.mutable.ArrayBuffer object GaInstallDaily { private val splitPtn = Pattern.compile("\\|") def main(args: Array[String]): Unit = { val options = buildOptions() val parser = new BasicParser val commandLine = parser.parse(options, args) val input_path_install = commandLine.getOptionValue("input_path_install") val output = commandLine.getOptionValue("output") val coalesce = commandLine.getOptionValue("coalesce").toInt val spark = mobvista.dmp.common.MobvistaConstant.createSparkSession(s"GaInstallDaily") val sc = spark.sparkContext FileSystem.get(new URI(s"s3://mob-emr-test"), sc.hadoopConfiguration).delete(new Path(output), true) try { val input_path_install_rdd = sc.textFile(input_path_install) input_path_install_rdd.flatMap(line => { parseflatMapData(line) }).coalesce(coalesce).saveAsTextFile(output) } finally { if (sc != null) { sc.stop() } } } def parseflatMapData(line: String): Array[String] = { val arrayBuffer = new ArrayBuffer[String]() val array: Array[String] = splitPtn.split(line) if (array.length == 4) { var idType: String = null var platform: String = null if (array(1).equalsIgnoreCase("ios")) { idType = "idfa" platform = "ios" arrayBuffer += MRUtils.JOINER.join(array(0),idType,platform, array(2),array(3)) } else if (array(1).equalsIgnoreCase("android")) { idType = "gaid" platform = "adr" arrayBuffer += MRUtils.JOINER.join(array(0),idType,platform, array(2),array(3)) } } arrayBuffer.toArray } def buildOptions(): Options = { val options = new Options options.addOption("input_path_install", true, "[must] input path") options.addOption("output", true, "[must] output path") options.addOption("coalesce", true, "[must] coalesce") options } }