Commit f604014c by wang-jinfeng

fix bug

parent 015a1724
......@@ -28,7 +28,7 @@ spark-submit --class mobvista.dmp.datasource.dm.BtopTiktokrv \
--conf spark.sql.shuffle.partitions=3000 \
--conf spark.network.timeout=720s \
--files ${HIVE_SITE_PATH} \
--master yarn --deploy-mode cluster --executor-memory 8g --driver-memory 4g --executor-cores 6 --num-executors 70 \
--master yarn --deploy-mode cluster --executor-memory 8g --driver-memory 6g --executor-cores 6 --num-executors 70 \
../../${JAR} -begin_day ${begin_day} -begin_day02 ${begin_day02} -end_day ${end_day} -output01 ${OUTPUT_PATH01} -output02 ${OUTPUT_PATH02} -coalesce 200
......
......@@ -49,7 +49,7 @@ spark-submit --class mobvista.dmp.datasource.rtdmp.RTDmpMerge \
--conf spark.executor.extraJavaOptions="-XX:+UseG1GC" \
--files ${HIVE_SITE_PATH} \
--jars ${SPARK_HOME}/auxlib/Common-SerDe-1.0-SNAPSHOT.jar \
--master yarn --deploy-mode cluster --executor-memory 8g --driver-memory 6g --executor-cores 5 --num-executors 20 \
--master yarn --deploy-mode cluster --executor-memory 18g --driver-memory 6g --executor-cores 5 --num-executors 20 \
../${JAR} -date_time "${curr_time}" -old_time "${old_time}" -input ${INPUT} -output ${OUTPUT} -partition 100
if [[ $? -ne 0 ]]; then
......
......@@ -90,7 +90,7 @@ public class DMPServer {
}
jsonObject = JSON.parseObject(result.toString());
} catch (IOException ex) {
System.out.println("IOException -->> " + ex.getMessage());
// System.out.println("IOException -->> " + ex.getMessage());
jsonObject.put("status", -1);
}
} finally {
......
......@@ -121,26 +121,24 @@ public class IQiYiRequest {
}
long min_end = System.currentTimeMillis();
logger.info("Times -->> " + c + ", Runtime -->> " + (min_end - in_start));
if (c % 20 == 0) {
try {
String datetime = DateUtil.format(new Date(), "yyyy-MM-dd HH");
insertIQiYi(connection, futures, datetime.substring(0, 10), datetime.substring(11, 13));
logger.info("Times -->> " + c + ", ClickHouse Insert Success! Size -->> " + futures.size());
} catch (SQLException e) {
logger.info("ClickHouse Insert Failure!");
} finally {
try {
String datetime = DateUtil.format(new Date(), "yyyy-MM-dd HH");
insertIQiYi(connection, futures, datetime.substring(0, 10), datetime.substring(11, 13));
logger.info("Times -->> " + c / 20 + ", ClickHouse Insert Success! Size -->> " + futures.size());
} catch (SQLException e) {
logger.info("ClickHouse Insert Failure!");
}finally {
try {
resultSet.close();
connection.close();
} catch (SQLException throwables) {
logger.info("ClickHouse Connection Close!");
}
resultSet.close();
connection.close();
} catch (SQLException throwables) {
logger.info("ClickHouse Connection Close!");
}
futures = new CopyOnWriteArrayList<>();
min_end = System.currentTimeMillis();
logger.info("ClickHouse Insert Success! Times -->> " + c / 20 + ", Runtime -->> " + (min_end - min_start));
min_start = System.currentTimeMillis();
}
futures = new CopyOnWriteArrayList<>();
min_end = System.currentTimeMillis();
logger.info("ClickHouse Insert Success! Times -->> " + c + ", Runtime -->> " + (min_end - min_start));
min_start = System.currentTimeMillis();
}
poolExecutor.shutdown();
long end = System.currentTimeMillis();
......@@ -291,14 +289,14 @@ public class IQiYiRequest {
// logger.info("result -->> " + result.toString());
jsonObject = Constants.String2JSONObject(result.toString());
} catch (IOException e) {
logger.info("IOException -->> " + e.getMessage());
// logger.info("IOException -->> " + e.getMessage());
} finally {
post.abort();
client.getConnectionManager().shutdown();
try {
client.close();
} catch (IOException e) {
logger.info("IOException -->> " + e.getMessage());
// logger.info("IOException -->> " + e.getMessage());
}
}
return jsonObject;
......
......@@ -94,7 +94,9 @@ ali_activation.package_name=com.taobao.foractivation.172393,com.taobao.foractiva
dsp_req.package_name=com.taobao.taobao_oppo,com.eg.android.AlipayGphone_oppo,com.ucmobile_oppo,com.qiyi.video_oppo,com.taobao.taobao_notinstall_oppo,\
com.eg.android.AlipayGphone_bes,com.youku.phone_notinstall_oppo,com.sankuai.meituan_oppo,com.meituan.itakeaway_oppo,com.taobao.idlefish_bes,\
com.taobao.idlefish_oppo,com.UCMobile_bes,com.taobao.taobao_bes,com.tencent.news_fromtencent
com.taobao.idlefish_oppo,com.UCMobile_bes,com.taobao.taobao_bes,com.tencent.news_fromtencent,com.taobao.taobao_iqiyi,com.taobao.taobao,com.UCMobile_iqiyi,\
com.UCMobile,com.eg.android.AlipayGphone_iqiyi,com.eg.android.AlipayGphone,com.taobao.idlefish_iqiyi,com.taobao.idlefish,com.sankuai.meituan_iqiyi,com.sankuai.meituan,\
com.tencent.news_iqiyi,com.tencent.news
btop.package_name=com.taobao.taobao,com.taobao.taobao_btop7,com.taobao.taobao_btop15,com.eg.android.AlipayGphone_btop,com.taobao.taobao_btop7_imei,\
com.taobao.taobao_btop15_imei,com.taobao.taobao_btop7_oaid,com.taobao.taobao_btop15_oaid,com.taobao.idlefish_btop,com.UCMobile_btop,com.qiyi.video_btop,\
......
package mobvista.dmp.datasource.backflow
import mobvista.dmp.common.{CommonSparkJob, MobvistaConstant}
import mobvista.dmp.util.MRUtils
import org.apache.commons.cli.{BasicParser, Options}
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.compress.GzipCodec
import java.net.URI
/**
* @package: mobvista.dmp.datasource.backflow
* @author: wangjf
* @date: 2021/4/25
* @time: 18:35 下午
* @email: jinfeng.wang@mobvista.com
*/
class BackFlowFilter extends CommonSparkJob with Serializable {
def commandOptions(): Options = {
val options = new Options()
options.addOption("date", true, "date")
options.addOption("output", true, "output")
options
}
override protected def run(args: Array[String]): Int = {
val parser = new BasicParser()
val options = commandOptions()
val commandLine = parser.parse(options, args)
val date = commandLine.getOptionValue("date")
val output = commandLine.getOptionValue("output")
val spark = MobvistaConstant.createSparkSession("BackFlowFilter")
val sc = spark.sparkContext
try {
val df = spark.sql(BackFlowConstant.mapping_sql.replace("@dt", date))
val pathUri = new URI(output)
FileSystem.get(new URI(s"${pathUri.getScheme}://${pathUri.getHost}"), sc.hadoopConfiguration).delete(new Path(output), true)
// region, devid_key, devid_type, devid_value
df.rdd.map(r => {
MRUtils.JOINER.join(r.getAs[String]("region"), r.getAs[String]("devid_key"),
r.getAs[String]("devid_type"), r.getAs[String]("devid_value"))
}).repartition(50)
.saveAsTextFile(output, classOf[GzipCodec])
} finally {
if (sc != null) {
sc.stop()
}
if (spark != null) {
spark.stop()
}
}
0
}
}
object BackFlowFilter {
def main(args: Array[String]): Unit = {
new BackFlowFilter().run(args)
}
}
package mobvista.dmp.datasource.newtag
import java.net.URI
import mobvista.dmp.common.CommonSparkJob
import org.apache.commons.cli.Options
import org.apache.commons.lang.StringUtils
......@@ -10,8 +9,9 @@ import org.apache.hadoop.io.compress.GzipCodec
import org.apache.poi.ss.usermodel.{Cell, WorkbookFactory}
import org.apache.spark.sql.SparkSession
import java.io.InputStream
import scala.collection.mutable.ArrayBuffer
import scala.tools.nsc.interpreter.InputStream
// import scala.tools.nsc.interpreter.InputStream
class ImportCampaignTags extends CommonSparkJob with Serializable {
override protected def run(args: Array[String]): Int = {
......
package mobvista.dmp.datasource.newtag
import java.net.URI
import mobvista.dmp.common.CommonSparkJob
import org.apache.commons.cli.Options
import org.apache.commons.lang.StringUtils
......@@ -10,8 +9,9 @@ import org.apache.hadoop.io.compress.GzipCodec
import org.apache.poi.ss.usermodel.{Cell, WorkbookFactory}
import org.apache.spark.sql.SparkSession
import java.io.InputStream
import scala.collection.mutable.ArrayBuffer
import scala.tools.nsc.interpreter.InputStream
// import scala.tools.nsc.interpreter.InputStream
class ImportPkgTags extends CommonSparkJob with Serializable {
override protected def run(args: Array[String]): Int = {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment