Commit 705f3251 by lining

submit project

parents
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.lining.code</groupId>
<artifactId>spark-project</artifactId>
<version>1.0-SNAPSHOT</version>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
</plugins>
</build>
<properties>
<scala.version>2.11.12</scala.version>
<spark.version>2.1.1</spark.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency> <!-- Spark Streaming -->
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency> <!-- Spark Streaming -->
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>2.1.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-spark-20_2.11</artifactId>
<version>7.14.2</version>
<scope>provided</scope>
</dependency>
</dependencies>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<module type="JAVA_MODULE" version="4" />
\ No newline at end of file
package com.lining.code.constant;
/**
* @ Author :lining.
* @ Date :Created in 21:25 2022/5/10
* @ Description:es索引目录
*/
public class ESIndexConstant {
/**
* 素材详情index
*/
public static String MATERIAL_INDEX = "material_model_{0}/_doc";
/**
* 可玩广告index
*/
public static String PREPLAY_MATERIAL_INDEX = "preplay_material_model_{0}/_doc";
/**
* 文案index
*/
public static String DOCUMENT_INDEX = "document_model_{0}/_doc";
/**
* 产品列表index
*/
public static String PRODUCT_LIST_INDEX = "product_list_model_{0}_{1}/_doc";
public static String PRODUCT_LIST_DATE_INDEX = "product_list_model_{0}_{1}_{2}/_doc";
/**
* 产品详情index
*/
public static String PRODUCT_INDEX = "product_model_{0}/_doc";
/**
* 公司列表index
*/
public static String COMPANY_LIST_INDEX = "company_list_model_{0}_{1}/_doc";
public static String COMPANY_LIST_DATE_INDEX = "company_list_model_{0}_{1}_{2}/_doc";
/**
* 公司详情index
*/
public static String COMPANY_INDEX = "company_model_{0}/_doc";
/**
* 广告变现列表index
*/
public static String AD_LIST_INDEX = "ad_monetization_list_model_{0}_{1}/_doc";
public static String AD_LIST_DATE_INDEX = "ad_monetization_list_model_{0}_{1}_{2}/_doc";
/**
* 国家列表index
*/
public static String COUNTRY_LIST_INDEX = "country_list_model_{0}_{1}/_doc";
public static String COUNTRY_LIST_DATE_INDEX = "country_list_model_{0}_{1}_{2}/_doc";
/**
* 素材趋势index
*/
public static String MATERIAL_TREND_INDEX = "material_trend_model_{0}/_doc";
/**
* 创意趋势index
*/
public static String CREATIVE_TREND_INDEX = "creative_trend_model_{0}/_doc";
}
package com.lining.code.constant;
/**
* @ Author :lining.
* @ Date :Created in 9:49 2022/5/16
* @ Description:table目录
*/
public class TableConstant {
public static final String PRODUCT_LIST_TABLE = "";
}
package com.lining.code.enums;
/**
* @ Author :lining.
* @ Date :Created in 08:52 2022/6/25
* @ Description:数据流类型:real:实时数据 history:历史全量数据
*/
public enum DataFlowEnum {
HISTORY("history"),
REAL("real");
private String type;
DataFlowEnum(String type){
this.type = type;
}
public static DataFlowEnum getDataFlow(String type){
DataFlowEnum[] values = DataFlowEnum.values();
for(DataFlowEnum dataFlow : values){
if(dataFlow.type.equals(type)){
return dataFlow;
}
}
return DataFlowEnum.HISTORY;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
}
package com.lining.code.enums;
/**
* @ Author :lining.
* @ Date :Created in 21:06 2022/5/10
* @ Description:数据分类
*/
public enum DataTypeEnum {
GAME(1, "game"),
APP(2, "app");
private int type;
private String name;
DataTypeEnum(int type, String name) {
this.type = type;
this.name = name;
}
public int getType() {
return type;
}
public void setType(int type) {
this.type = type;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
package com.lining.code.enums;
/**
* @ Author :lining.
* @ Date :Created in 21:06 2022/5/10
* @ Description:日期分类
*/
public enum DayEnum {
DAY_03("03"),
DAY_07("07"),
DAY_30("30")
;
private String name;
DayEnum(String name){
this.name = name;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
package com.lining.code.spark.ad.model;
import com.lining.code.constant.ESIndexConstant;
import com.lining.code.constant.SQLConstant;
import com.lining.code.enums.DataTypeEnum;
import com.lining.code.enums.DayEnum;
import com.lining.code.util.DateUtil;
import com.lining.code.util.SparkSessionUtil;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import java.text.MessageFormat;
/**
* @ Author :lining.
* @ Date :Created in 18:12 2022/5/11
* @ Description:广告列表模型数据同步
*/
public class AdListModel03Action {
public static void main(String[] args) {
SparkSession spark = SparkSessionUtil.getSparkSession("AdListModel03Action");
String date = DateUtil.getCurrentDate();
if(args != null && args.length > 0){
date = args[0];
syncDataByDate(spark, date);
}else{
syncData(spark);
}
spark.stop();
}
private static void syncDataByDate(SparkSession spark, String date) {
String gameSQL = MessageFormat.format(SQLConstant.AD_LIST_MODEL_SQL,
DayEnum.DAY_03.getName(), DataTypeEnum.GAME.getType());
String gameIndex = MessageFormat.format(ESIndexConstant.AD_LIST_DATE_INDEX,
DataTypeEnum.GAME.getName(), DayEnum.DAY_03.getName(), date);
Dataset<Row> gameSet = spark.sql(gameSQL);
gameSet.write()
.format("org.elasticsearch.spark.sql")
.mode(SaveMode.Overwrite)
.option("es.mapping.id", "id")
.save(gameIndex);
String appSQL = MessageFormat.format(SQLConstant.AD_LIST_MODEL_SQL,
DayEnum.DAY_03.getName(), DataTypeEnum.APP.getType());
String appIndex = MessageFormat.format(ESIndexConstant.AD_LIST_DATE_INDEX,
DataTypeEnum.APP.getName(), DayEnum.DAY_03.getName(), date);
Dataset<Row> appSet = spark.sql(appSQL);
appSet.write()
.format("org.elasticsearch.spark.sql")
.mode(SaveMode.Overwrite)
.option("es.mapping.id", "id")
.save(appIndex);
}
private static void syncData(SparkSession spark) {
String gameSQL = MessageFormat.format(SQLConstant.AD_LIST_MODEL_SQL,
DayEnum.DAY_03.getName(), DataTypeEnum.GAME.getType());
String gameIndex = MessageFormat.format(ESIndexConstant.AD_LIST_INDEX,
DataTypeEnum.GAME.getName(), DayEnum.DAY_03.getName());
Dataset<Row> gameSet = spark.sql(gameSQL);
gameSet.write()
.format("org.elasticsearch.spark.sql")
.mode(SaveMode.Overwrite)
.option("es.mapping.id", "id")
.save(gameIndex);
String appSQL = MessageFormat.format(SQLConstant.AD_LIST_MODEL_SQL,
DayEnum.DAY_03.getName(), DataTypeEnum.APP.getType());
String appIndex = MessageFormat.format(ESIndexConstant.AD_LIST_INDEX,
DataTypeEnum.APP.getName(), DayEnum.DAY_03.getName());
Dataset<Row> appSet = spark.sql(appSQL);
appSet.write()
.format("org.elasticsearch.spark.sql")
.mode(SaveMode.Overwrite)
.option("es.mapping.id", "id")
.save(appIndex);
}
}
package com.lining.code.spark.ad.model;
import com.lining.code.constant.ESIndexConstant;
import com.lining.code.constant.SQLConstant;
import com.lining.code.enums.DataTypeEnum;
import com.lining.code.enums.DayEnum;
import com.lining.code.util.DateUtil;
import com.lining.code.util.SparkSessionUtil;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import java.text.MessageFormat;
/**
* @ Author :lining.
* @ Date :Created in 18:12 2022/5/11
* @ Description:广告列表模型数据同步
*/
public class AdListModel07Action {
public static void main(String[] args) {
SparkSession spark = SparkSessionUtil.getSparkSession("AdListModel07Action");
String date = DateUtil.getCurrentDate();
if (args != null && args.length > 0) {
date = args[0];
syncDataByDate(spark, date);
} else {
syncData(spark);
}
spark.stop();
}
private static void syncDataByDate(SparkSession spark, String date) {
String gameSQL = MessageFormat.format(SQLConstant.AD_LIST_MODEL_SQL,
DayEnum.DAY_07.getName(), DataTypeEnum.GAME.getType());
String gameIndex = MessageFormat.format(ESIndexConstant.AD_LIST_DATE_INDEX,
DataTypeEnum.GAME.getName(), DayEnum.DAY_07.getName(), date);
Dataset<Row> gameSet = spark.sql(gameSQL);
gameSet.write()
.format("org.elasticsearch.spark.sql")
.mode(SaveMode.Overwrite)
.option("es.mapping.id", "id")
.save(gameIndex);
String appSQL = MessageFormat.format(SQLConstant.AD_LIST_MODEL_SQL,
DayEnum.DAY_07.getName(), DataTypeEnum.APP.getType());
String appIndex = MessageFormat.format(ESIndexConstant.AD_LIST_DATE_INDEX,
DataTypeEnum.APP.getName(), DayEnum.DAY_07.getName(), date);
Dataset<Row> appSet = spark.sql(appSQL);
appSet.write()
.format("org.elasticsearch.spark.sql")
.mode(SaveMode.Overwrite)
.option("es.mapping.id", "id")
.save(appIndex);
}
private static void syncData(SparkSession spark) {
String gameSQL = MessageFormat.format(SQLConstant.AD_LIST_MODEL_SQL,
DayEnum.DAY_07.getName(), DataTypeEnum.GAME.getType());
String gameIndex = MessageFormat.format(ESIndexConstant.AD_LIST_INDEX,
DataTypeEnum.GAME.getName(), DayEnum.DAY_07.getName());
Dataset<Row> gameSet = spark.sql(gameSQL);
gameSet.write()
.format("org.elasticsearch.spark.sql")
.mode(SaveMode.Overwrite)
.option("es.mapping.id", "id")
.save(gameIndex);
String appSQL = MessageFormat.format(SQLConstant.AD_LIST_MODEL_SQL,
DayEnum.DAY_07.getName(), DataTypeEnum.APP.getType());
String appIndex = MessageFormat.format(ESIndexConstant.AD_LIST_INDEX,
DataTypeEnum.APP.getName(), DayEnum.DAY_07.getName());
Dataset<Row> appSet = spark.sql(appSQL);
appSet.write()
.format("org.elasticsearch.spark.sql")
.mode(SaveMode.Overwrite)
.option("es.mapping.id", "id")
.save(appIndex);
}
}
package com.lining.code.spark.ad.model;
import com.lining.code.constant.ESIndexConstant;
import com.lining.code.constant.SQLConstant;
import com.lining.code.enums.DataTypeEnum;
import com.lining.code.enums.DayEnum;
import com.lining.code.util.DateUtil;
import com.lining.code.util.SparkSessionUtil;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import java.text.MessageFormat;
/**
* @ Author :lining.
* @ Date :Created in 18:12 2022/5/11
* @ Description:广告列表模型数据同步
*/
public class AdListModel30Action {
public static void main(String[] args) {
SparkSession spark = SparkSessionUtil.getSparkSession("AdListModel30Action");
String date = DateUtil.getCurrentDate();
if(args != null && args.length > 0){
date = args[0];
syncDataByDate(spark, date);
}else{
syncData(spark);
}
spark.stop();
}
private static void syncDataByDate(SparkSession spark, String date) {
String gameSQL = MessageFormat.format(SQLConstant.AD_LIST_MODEL_SQL,
DayEnum.DAY_30.getName(), DataTypeEnum.GAME.getType());
String gameIndex = MessageFormat.format(ESIndexConstant.AD_LIST_DATE_INDEX,
DataTypeEnum.GAME.getName(), DayEnum.DAY_30.getName(), date);
Dataset<Row> gameSet = spark.sql(gameSQL);
gameSet.write()
.format("org.elasticsearch.spark.sql")
.mode(SaveMode.Overwrite)
.option("es.mapping.id", "id")
.save(gameIndex);
String appSQL = MessageFormat.format(SQLConstant.AD_LIST_MODEL_SQL,
DayEnum.DAY_30.getName(), DataTypeEnum.APP.getType());
String appIndex = MessageFormat.format(ESIndexConstant.AD_LIST_DATE_INDEX,
DataTypeEnum.APP.getName(), DayEnum.DAY_30.getName(), date);
Dataset<Row> appSet = spark.sql(appSQL);
appSet.write()
.format("org.elasticsearch.spark.sql")
.mode(SaveMode.Overwrite)
.option("es.mapping.id", "id")
.save(appIndex);
}
private static void syncData(SparkSession spark) {
String gameSQL = MessageFormat.format(SQLConstant.AD_LIST_MODEL_SQL,
DayEnum.DAY_30.getName(), DataTypeEnum.GAME.getType());
String gameIndex = MessageFormat.format(ESIndexConstant.AD_LIST_INDEX,
DataTypeEnum.GAME.getName(), DayEnum.DAY_30.getName());
Dataset<Row> gameSet = spark.sql(gameSQL);
gameSet.write()
.format("org.elasticsearch.spark.sql")
.mode(SaveMode.Overwrite)
.option("es.mapping.id", "id")
.save(gameIndex);
String appSQL = MessageFormat.format(SQLConstant.AD_LIST_MODEL_SQL,
DayEnum.DAY_30.getName(), DataTypeEnum.APP.getType());
String appIndex = MessageFormat.format(ESIndexConstant.AD_LIST_INDEX,
DataTypeEnum.APP.getName(), DayEnum.DAY_30.getName());
Dataset<Row> appSet = spark.sql(appSQL);
appSet.write()
.format("org.elasticsearch.spark.sql")
.mode(SaveMode.Overwrite)
.option("es.mapping.id", "id")
.save(appIndex);
}
}
package com.lining.code.spark.company.model;
import com.lining.code.constant.ESIndexConstant;
import com.lining.code.constant.SQLConstant;
import com.lining.code.enums.DataTypeEnum;
import com.lining.code.enums.DayEnum;
import com.lining.code.util.DateUtil;
import com.lining.code.util.PropertiesUtil;
import com.lining.code.util.SparkSessionUtil;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import java.text.MessageFormat;
/**
* @ Author :lining.
* @ Date :Created in 22:30 2022/5/9
* @ Description:公司列表数据同步
*/
public class CompanyListModel03Action {
public static void main(String[] args) {
SparkSession spark = SparkSessionUtil.getSparkSession("CompanyListModel03Action");
String date = DateUtil.getCurrentDate();
if(args != null && args.length > 0){
date = args[0];
syncDataByDate(spark, date);
}else{
syncData(spark);
}
spark.stop();
}
private static void syncDataByDate(SparkSession spark, String date) {
String gameSQL = MessageFormat.format(SQLConstant.COMPANY_LIST_MODEL_SQL,
DayEnum.DAY_03.getName(), DataTypeEnum.GAME.getType());
String gameIndex = MessageFormat.format(ESIndexConstant.COMPANY_LIST_DATE_INDEX,
DataTypeEnum.GAME.getName(), DayEnum.DAY_03.getName(), date);
Dataset<Row> gameSet = spark.sql(gameSQL);
gameSet.write()
.format("org.elasticsearch.spark.sql")
.mode(SaveMode.Overwrite)
.option("es.mapping.id", "id")
.save(gameIndex);
String appSQL = MessageFormat.format(SQLConstant.COMPANY_LIST_MODEL_SQL,
DayEnum.DAY_03.getName(), DataTypeEnum.APP.getType());
String appIndex = MessageFormat.format(ESIndexConstant.COMPANY_LIST_DATE_INDEX,
DataTypeEnum.APP.getName(), DayEnum.DAY_03.getName(), date);
Dataset<Row> appSet = spark.sql(appSQL);
appSet.write()
.format("org.elasticsearch.spark.sql")
.mode(SaveMode.Overwrite)
.option("es.mapping.id", "id")
.save(appIndex);
}
private static void syncData(SparkSession spark) {
String gameSQL = MessageFormat.format(SQLConstant.COMPANY_LIST_MODEL_SQL,
DayEnum.DAY_03.getName(), DataTypeEnum.GAME.getType());
String gameIndex = MessageFormat.format(ESIndexConstant.COMPANY_LIST_INDEX,
DataTypeEnum.GAME.getName(), DayEnum.DAY_03.getName());
Dataset<Row> gameSet = spark.sql(gameSQL);
gameSet.write()
.format("org.elasticsearch.spark.sql")
.mode(SaveMode.Overwrite)
.option("es.mapping.id", "id")
.save(gameIndex);
String appSQL = MessageFormat.format(SQLConstant.COMPANY_LIST_MODEL_SQL,
DayEnum.DAY_03.getName(), DataTypeEnum.APP.getType());
String appIndex = MessageFormat.format(ESIndexConstant.COMPANY_LIST_INDEX,
DataTypeEnum.APP.getName(), DayEnum.DAY_03.getName());
Dataset<Row> appSet = spark.sql(appSQL);
appSet.write()
.format("org.elasticsearch.spark.sql")
.mode(SaveMode.Overwrite)
.option("es.mapping.id", "id")
.save(appIndex);
}
}
package com.lining.code.spark.company.model;
import com.lining.code.constant.ESIndexConstant;
import com.lining.code.constant.SQLConstant;
import com.lining.code.enums.DataTypeEnum;
import com.lining.code.enums.DayEnum;
import com.lining.code.util.DateUtil;
import com.lining.code.util.SparkSessionUtil;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import java.text.MessageFormat;
/**
* @ Author :lining.
* @ Date :Created in 22:30 2022/5/9
* @ Description:公司列表数据同步
*/
public class CompanyListModel07Action {
public static void main(String[] args) {
SparkSession spark = SparkSessionUtil.getSparkSession("CompanyListModel07Action");
String date = DateUtil.getCurrentDate();
if(args != null && args.length > 0){
date = args[0];
syncDataByDate(spark, date);
}else{
syncData(spark);
}
spark.stop();
}
private static void syncDataByDate(SparkSession spark, String date) {
String gameSQL = MessageFormat.format(SQLConstant.COMPANY_LIST_MODEL_SQL,
DayEnum.DAY_07.getName(), DataTypeEnum.GAME.getType());
String gameIndex = MessageFormat.format(ESIndexConstant.COMPANY_LIST_DATE_INDEX,
DataTypeEnum.GAME.getName(), DayEnum.DAY_07.getName(), date);
Dataset<Row> gameSet = spark.sql(gameSQL);
gameSet.write()
.format("org.elasticsearch.spark.sql")
.mode(SaveMode.Overwrite)
.option("es.mapping.id", "id")
.save(gameIndex);
String appSQL = MessageFormat.format(SQLConstant.COMPANY_LIST_MODEL_SQL,
DayEnum.DAY_07.getName(), DataTypeEnum.APP.getType());
String appIndex = MessageFormat.format(ESIndexConstant.COMPANY_LIST_DATE_INDEX,
DataTypeEnum.APP.getName(), DayEnum.DAY_07.getName(), date);
Dataset<Row> appSet = spark.sql(appSQL);
appSet.write()
.format("org.elasticsearch.spark.sql")
.mode(SaveMode.Overwrite)
.option("es.mapping.id", "id")
.save(appIndex);
}
private static void syncData(SparkSession spark) {
String gameSQL = MessageFormat.format(SQLConstant.COMPANY_LIST_MODEL_SQL,
DayEnum.DAY_07.getName(), DataTypeEnum.GAME.getType());
String gameIndex = MessageFormat.format(ESIndexConstant.COMPANY_LIST_INDEX,
DataTypeEnum.GAME.getName(), DayEnum.DAY_07.getName());
Dataset<Row> gameSet = spark.sql(gameSQL);
gameSet.write()
.format("org.elasticsearch.spark.sql")
.mode(SaveMode.Overwrite)
.option("es.mapping.id", "id")
.save(gameIndex);
String appSQL = MessageFormat.format(SQLConstant.COMPANY_LIST_MODEL_SQL,
DayEnum.DAY_07.getName(), DataTypeEnum.APP.getType());
String appIndex = MessageFormat.format(ESIndexConstant.COMPANY_LIST_INDEX,
DataTypeEnum.APP.getName(), DayEnum.DAY_07.getName());
Dataset<Row> appSet = spark.sql(appSQL);
appSet.write()
.format("org.elasticsearch.spark.sql")
.mode(SaveMode.Overwrite)
.option("es.mapping.id", "id")
.save(appIndex);
}
}
package com.lining.code.spark.company.model;
import com.lining.code.constant.ESIndexConstant;
import com.lining.code.constant.SQLConstant;
import com.lining.code.enums.DataTypeEnum;
import com.lining.code.enums.DayEnum;
import com.lining.code.util.DateUtil;
import com.lining.code.util.SparkSessionUtil;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import java.text.MessageFormat;
/**
* @ Author :lining.
* @ Date :Created in 22:30 2022/5/9
* @ Description:公司列表数据同步
*/
public class CompanyListModel30Action {
public static void main(String[] args) {
SparkSession spark = SparkSessionUtil.getSparkSession("CompanyListModel30Action");
String date = DateUtil.getCurrentDate();
if(args != null && args.length > 0){
date = args[0];
syncDataByDate(spark, date);
}else{
syncData(spark);
}
spark.stop();
}
private static void syncDataByDate(SparkSession spark, String date) {
String gameSQL = MessageFormat.format(SQLConstant.COMPANY_LIST_MODEL_SQL,
DayEnum.DAY_30.getName(), DataTypeEnum.GAME.getType());
String gameIndex = MessageFormat.format(ESIndexConstant.COMPANY_LIST_DATE_INDEX,
DataTypeEnum.GAME.getName(), DayEnum.DAY_30.getName(), date);
Dataset<Row> gameSet = spark.sql(gameSQL);
gameSet.write()
.format("org.elasticsearch.spark.sql")
.mode(SaveMode.Overwrite)
.option("es.mapping.id", "id")
.save(gameIndex);
String appSQL = MessageFormat.format(SQLConstant.COMPANY_LIST_MODEL_SQL,
DayEnum.DAY_30.getName(), DataTypeEnum.APP.getType());
String appIndex = MessageFormat.format(ESIndexConstant.COMPANY_LIST_DATE_INDEX,
DataTypeEnum.APP.getName(), DayEnum.DAY_30.getName(), date);
Dataset<Row> appSet = spark.sql(appSQL);
appSet.write()
.format("org.elasticsearch.spark.sql")
.mode(SaveMode.Overwrite)
.option("es.mapping.id", "id")
.save(appIndex);
}
private static void syncData(SparkSession spark) {
String gameSQL = MessageFormat.format(SQLConstant.COMPANY_LIST_MODEL_SQL,
DayEnum.DAY_30.getName(), DataTypeEnum.GAME.getType());
String gameIndex = MessageFormat.format(ESIndexConstant.COMPANY_LIST_INDEX,
DataTypeEnum.GAME.getName(), DayEnum.DAY_30.getName());
Dataset<Row> gameSet = spark.sql(gameSQL);
gameSet.write()
.format("org.elasticsearch.spark.sql")
.mode(SaveMode.Overwrite)
.option("es.mapping.id", "id")
.save(gameIndex);
String appSQL = MessageFormat.format(SQLConstant.COMPANY_LIST_MODEL_SQL,
DayEnum.DAY_30.getName(), DataTypeEnum.APP.getType());
String appIndex = MessageFormat.format(ESIndexConstant.COMPANY_LIST_INDEX,
DataTypeEnum.APP.getName(), DayEnum.DAY_30.getName());
Dataset<Row> appSet = spark.sql(appSQL);
appSet.write()
.format("org.elasticsearch.spark.sql")
.mode(SaveMode.Overwrite)
.option("es.mapping.id", "id")
.save(appIndex);
}
}
package com.lining.code.spark.company.model;
import com.lining.code.constant.ESIndexConstant;
import com.lining.code.constant.SQLConstant;
import com.lining.code.enums.DataFlowEnum;
import com.lining.code.enums.DataTypeEnum;
import com.lining.code.util.SparkSessionUtil;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import java.text.MessageFormat;
/**
* @ Author :lining.
* @ Date :Created in 18:03 2022/5/11
* @ Description:公司详情数据同步
*/
public class CompanyModelAction {
public static void main(String[] args) {
SparkSession spark = SparkSessionUtil.getSparkSession("CompanyModelAction");
String type = args[0];
DataFlowEnum dataFlow = DataFlowEnum.getDataFlow(type);
String gameSQL = MessageFormat.format(SQLConstant.COMPANY_MODEL_SQL,
DataTypeEnum.GAME.getType(), dataFlow.getType());
String gameIndex = MessageFormat.format(ESIndexConstant.COMPANY_INDEX,
DataTypeEnum.GAME.getName());
Dataset<Row> game = spark.sql(gameSQL);
game.write()
.format("org.elasticsearch.spark.sql")
.mode(SaveMode.Append)
.option("es.mapping.id", "id")
.save(gameIndex);
String appSQL = MessageFormat.format(SQLConstant.COMPANY_MODEL_SQL,
DataTypeEnum.APP.getType(), dataFlow.getType());
String appIndex = MessageFormat.format(ESIndexConstant.COMPANY_INDEX,
DataTypeEnum.APP.getName());
Dataset<Row> app = spark.sql(appSQL);
app.write()
.format("org.elasticsearch.spark.sql")
.mode(SaveMode.Append)
.option("es.mapping.id", "id")
.save(appIndex);
spark.stop();
}
}
package com.lining.code.spark.country.model;
import com.lining.code.constant.ESIndexConstant;
import com.lining.code.constant.SQLConstant;
import com.lining.code.enums.DataTypeEnum;
import com.lining.code.enums.DayEnum;
import com.lining.code.util.DateUtil;
import com.lining.code.util.SparkSessionUtil;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import java.text.MessageFormat;
/**
* @ Author :lining.
* @ Date :Created in 22:30 2022/5/9
* @ Description:国家列表数据同步
*/
public class CountryListModel03Action {
public static void main(String[] args) {
SparkSession spark = SparkSessionUtil.getSparkSession("CountryListModel03Action");
String date = DateUtil.getCurrentDate();
if(args != null && args.length > 0){
date = args[0];
syncDataByDate(spark, date);
}else{
syncData(spark);
}
spark.stop();
}
private static void syncDataByDate(SparkSession spark, String date) {
String gameSQL = MessageFormat.format(SQLConstant.COUNTRY_LIST_MODEL_SQL,
DayEnum.DAY_03.getName(), DataTypeEnum.GAME.getType());
String gameIndex = MessageFormat.format(ESIndexConstant.COUNTRY_LIST_DATE_INDEX,
DataTypeEnum.GAME.getName(), DayEnum.DAY_03.getName(), date);
Dataset<Row> gameSet = spark.sql(gameSQL);
gameSet.write()
.format("org.elasticsearch.spark.sql")
.mode(SaveMode.Overwrite)
.option("es.mapping.id", "id")
.save(gameIndex);
String appSQL = MessageFormat.format(SQLConstant.COUNTRY_LIST_MODEL_SQL,
DayEnum.DAY_03.getName(), DataTypeEnum.APP.getType());
String appIndex = MessageFormat.format(ESIndexConstant.COUNTRY_LIST_DATE_INDEX,
DataTypeEnum.APP.getName(), DayEnum.DAY_03.getName(), date);
Dataset<Row> appSet = spark.sql(appSQL);
appSet.write()
.format("org.elasticsearch.spark.sql")
.mode(SaveMode.Overwrite)
.option("es.mapping.id", "id")
.save(appIndex);
}
private static void syncData(SparkSession spark) {
String gameSQL = MessageFormat.format(SQLConstant.COUNTRY_LIST_MODEL_SQL,
DayEnum.DAY_03.getName(), DataTypeEnum.GAME.getType());
String gameIndex = MessageFormat.format(ESIndexConstant.COUNTRY_LIST_INDEX,
DataTypeEnum.GAME.getName(), DayEnum.DAY_03.getName());
Dataset<Row> gameSet = spark.sql(gameSQL);
gameSet.write()
.format("org.elasticsearch.spark.sql")
.mode(SaveMode.Overwrite)
.option("es.mapping.id", "id")
.save(gameIndex);
String appSQL = MessageFormat.format(SQLConstant.COUNTRY_LIST_MODEL_SQL,
DayEnum.DAY_03.getName(), DataTypeEnum.APP.getType());
String appIndex = MessageFormat.format(ESIndexConstant.COUNTRY_LIST_INDEX,
DataTypeEnum.APP.getName(), DayEnum.DAY_03.getName());
Dataset<Row> appSet = spark.sql(appSQL);
appSet.write()
.format("org.elasticsearch.spark.sql")
.mode(SaveMode.Overwrite)
.option("es.mapping.id", "id")
.save(appIndex);
}
}
package com.lining.code.spark.country.model;
import com.lining.code.constant.ESIndexConstant;
import com.lining.code.constant.SQLConstant;
import com.lining.code.enums.DataTypeEnum;
import com.lining.code.enums.DayEnum;
import com.lining.code.util.DateUtil;
import com.lining.code.util.SparkSessionUtil;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import java.text.MessageFormat;
/**
* @ Author :lining.
* @ Date :Created in 22:30 2022/5/9
* @ Description:国家列表数据同步
*/
public class CountryListModel07Action {
public static void main(String[] args) {
SparkSession spark = SparkSessionUtil.getSparkSession("CountryListModel07Action");
String date = DateUtil.getCurrentDate();
if(args != null && args.length > 0){
date = args[0];
syncDataByDate(spark, date);
}else{
syncData(spark);
}
spark.stop();
}
private static void syncDataByDate(SparkSession spark, String date) {
String gameSQL = MessageFormat.format(SQLConstant.COUNTRY_LIST_MODEL_SQL,
DayEnum.DAY_07.getName(), DataTypeEnum.GAME.getType());
String gameIndex = MessageFormat.format(ESIndexConstant.COUNTRY_LIST_DATE_INDEX,
DataTypeEnum.GAME.getName(), DayEnum.DAY_07.getName(), date);
Dataset<Row> gameSet = spark.sql(gameSQL);
gameSet.write()
.format("org.elasticsearch.spark.sql")
.mode(SaveMode.Overwrite)
.option("es.mapping.id", "id")
.save(gameIndex);
String appSQL = MessageFormat.format(SQLConstant.COUNTRY_LIST_MODEL_SQL,
DayEnum.DAY_07.getName(), DataTypeEnum.APP.getType());
String appIndex = MessageFormat.format(ESIndexConstant.COUNTRY_LIST_DATE_INDEX,
DataTypeEnum.APP.getName(), DayEnum.DAY_07.getName(), date);
Dataset<Row> appSet = spark.sql(appSQL);
appSet.write()
.format("org.elasticsearch.spark.sql")
.mode(SaveMode.Overwrite)
.option("es.mapping.id", "id")
.save(appIndex);
}
private static void syncData(SparkSession spark) {
String gameSQL = MessageFormat.format(SQLConstant.COUNTRY_LIST_MODEL_SQL,
DayEnum.DAY_07.getName(), DataTypeEnum.GAME.getType());
String gameIndex = MessageFormat.format(ESIndexConstant.COUNTRY_LIST_INDEX,
DataTypeEnum.GAME.getName(), DayEnum.DAY_07.getName());
Dataset<Row> gameSet = spark.sql(gameSQL);
gameSet.write()
.format("org.elasticsearch.spark.sql")
.mode(SaveMode.Overwrite)
.option("es.mapping.id", "id")
.save(gameIndex);
String appSQL = MessageFormat.format(SQLConstant.COUNTRY_LIST_MODEL_SQL,
DayEnum.DAY_07.getName(), DataTypeEnum.APP.getType());
String appIndex = MessageFormat.format(ESIndexConstant.COUNTRY_LIST_INDEX,
DataTypeEnum.APP.getName(), DayEnum.DAY_07.getName());
Dataset<Row> appSet = spark.sql(appSQL);
appSet.write()
.format("org.elasticsearch.spark.sql")
.mode(SaveMode.Overwrite)
.option("es.mapping.id", "id")
.save(appIndex);
}
}
package com.lining.code.spark.country.model;
import com.lining.code.constant.ESIndexConstant;
import com.lining.code.constant.SQLConstant;
import com.lining.code.enums.DataTypeEnum;
import com.lining.code.enums.DayEnum;
import com.lining.code.util.DateUtil;
import com.lining.code.util.SparkSessionUtil;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import java.text.MessageFormat;
/**
* @ Author :lining.
* @ Date :Created in 22:30 2022/5/9
* @ Description:国家列表数据同步
*/
public class CountryListModel30Action {
public static void main(String[] args) {
SparkSession spark = SparkSessionUtil.getSparkSession("CountryListModel30Action");
String date = DateUtil.getCurrentDate();
if(args != null && args.length > 0){
date = args[0];
syncDataByDate(spark, date);
}else{
syncData(spark);
}
spark.stop();
}
private static void syncDataByDate(SparkSession spark, String date) {
String gameSQL = MessageFormat.format(SQLConstant.COUNTRY_LIST_MODEL_SQL,
DayEnum.DAY_30.getName(), DataTypeEnum.GAME.getType());
String gameIndex = MessageFormat.format(ESIndexConstant.COUNTRY_LIST_DATE_INDEX,
DataTypeEnum.GAME.getName(), DayEnum.DAY_30.getName(), date);
Dataset<Row> gameSet = spark.sql(gameSQL);
gameSet.write()
.format("org.elasticsearch.spark.sql")
.mode(SaveMode.Overwrite)
.option("es.mapping.id", "id")
.save(gameIndex);
String appSQL = MessageFormat.format(SQLConstant.COUNTRY_LIST_MODEL_SQL,
DayEnum.DAY_30.getName(), DataTypeEnum.APP.getType());
String appIndex = MessageFormat.format(ESIndexConstant.COUNTRY_LIST_DATE_INDEX,
DataTypeEnum.APP.getName(), DayEnum.DAY_30.getName(), date);
Dataset<Row> appSet = spark.sql(appSQL);
appSet.write()
.format("org.elasticsearch.spark.sql")
.mode(SaveMode.Overwrite)
.option("es.mapping.id", "id")
.save(appIndex);
}
private static void syncData(SparkSession spark) {
String gameSQL = MessageFormat.format(SQLConstant.COUNTRY_LIST_MODEL_SQL,
DayEnum.DAY_30.getName(), DataTypeEnum.GAME.getType());
String gameIndex = MessageFormat.format(ESIndexConstant.COUNTRY_LIST_INDEX,
DataTypeEnum.GAME.getName(), DayEnum.DAY_30.getName());
Dataset<Row> gameSet = spark.sql(gameSQL);
gameSet.write()
.format("org.elasticsearch.spark.sql")
.mode(SaveMode.Overwrite)
.option("es.mapping.id", "id")
.save(gameIndex);
String appSQL = MessageFormat.format(SQLConstant.COUNTRY_LIST_MODEL_SQL,
DayEnum.DAY_30.getName(), DataTypeEnum.APP.getType());
String appIndex = MessageFormat.format(ESIndexConstant.COUNTRY_LIST_INDEX,
DataTypeEnum.APP.getName(), DayEnum.DAY_30.getName());
Dataset<Row> appSet = spark.sql(appSQL);
appSet.write()
.format("org.elasticsearch.spark.sql")
.mode(SaveMode.Overwrite)
.option("es.mapping.id", "id")
.save(appIndex);
}
}
package com.lining.code.spark.document.model;
import com.lining.code.constant.ESIndexConstant;
import com.lining.code.constant.SQLConstant;
import com.lining.code.enums.DataFlowEnum;
import com.lining.code.enums.DataTypeEnum;
import com.lining.code.util.SparkSessionUtil;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import java.text.MessageFormat;
/**
* @ Author :lining.
* @ Date :Created in 16:31 2022/5/8
* @ Description:文案模型同步
*/
public class DocumentModelAction {
public static void main(String[] args) {
SparkSession sparkSession = SparkSessionUtil.getSparkSession("DocumentModelAction");
String type = args[0];
DataFlowEnum dataFlow = DataFlowEnum.getDataFlow(type);
String gameSQL = MessageFormat.format(SQLConstant.DOCUMENT_MODEL_SQL,
DataTypeEnum.GAME.getType(), dataFlow.getType());
Dataset<Row> gameSet = sparkSession.sql(gameSQL);
String gameIndex = MessageFormat.format(ESIndexConstant.DOCUMENT_INDEX,
DataTypeEnum.GAME.getName());
gameSet.write()
.format("org.elasticsearch.spark.sql")
.mode(SaveMode.Append)
.option("es.mapping.id", "id")
.save(gameIndex);
String appSQL = MessageFormat.format(SQLConstant.DOCUMENT_MODEL_SQL,
DataTypeEnum.APP.getType(), dataFlow.getType());
Dataset<Row> appSet = sparkSession.sql(appSQL);
String appIndex = MessageFormat.format(ESIndexConstant.DOCUMENT_INDEX,
DataTypeEnum.APP.getName());
appSet.write()
.format("org.elasticsearch.spark.sql")
.mode(SaveMode.Append)
.option("es.mapping.id", "id")
.save(appIndex);
sparkSession.stop();
}
}
package com.lining.code.spark.material.model;
import com.lining.code.constant.ESIndexConstant;
import com.lining.code.constant.SQLConstant;
import com.lining.code.enums.DataFlowEnum;
import com.lining.code.enums.DataTypeEnum;
import com.lining.code.util.SparkSessionUtil;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import java.text.MessageFormat;
/**
* @ Author :lining.
* @ Date :Created in 22:30 2022/5/9
* @ Description:素材模型数据同步
*/
public class MaterialModelAction {
public static void main(String[] args) {
SparkSession sparkSession = SparkSessionUtil.getSparkSession("MaterialModelAction");
String type = args[0];
DataFlowEnum dataFlow = DataFlowEnum.getDataFlow(type);
String gameSQL = MessageFormat.format(SQLConstant.MATERIAL_MODEL_SQL,
DataTypeEnum.GAME.getType(), dataFlow.getType());
Dataset<Row> gameSet = sparkSession.sql(gameSQL);
String gameIndex = MessageFormat.format(ESIndexConstant.MATERIAL_INDEX,
DataTypeEnum.GAME.getName());
gameSet.write()
.format("org.elasticsearch.spark.sql")
.mode(SaveMode.Append)
.option("es.mapping.id", "id")
.save(gameIndex);
String appSQL = MessageFormat.format(SQLConstant.MATERIAL_MODEL_SQL,
DataTypeEnum.APP.getType(), dataFlow.getType());
Dataset<Row> appSet = sparkSession.sql(appSQL);
String appIndex = MessageFormat.format(ESIndexConstant.MATERIAL_INDEX,
DataTypeEnum.APP.getName());
appSet.write()
.format("org.elasticsearch.spark.sql")
.mode(SaveMode.Append)
.option("es.mapping.id", "id")
.save(appIndex);
sparkSession.stop();
}
}
package com.lining.code.spark.material.model;
import com.lining.code.constant.ESIndexConstant;
import com.lining.code.constant.SQLConstant;
import com.lining.code.enums.DataFlowEnum;
import com.lining.code.enums.DataTypeEnum;
import com.lining.code.util.SparkSessionUtil;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import java.text.MessageFormat;
/**
* @ Author :lining.
* @ Date :Created in 18:07 2022/5/11
* @ Description:可玩广告模型数据同步
*/
public class PreplayMaterialModelAction {
public static void main(String[] args) {
SparkSession sparkSession = SparkSessionUtil.getSparkSession("PreplayMaterialModelAction");
String type = args[0];
DataFlowEnum dataFlow = DataFlowEnum.getDataFlow(type);
String gameSQL = MessageFormat.format(SQLConstant.PREPLAY_MATERIAL_MODEL_SQL,
DataTypeEnum.GAME.getType(), dataFlow.getType());
Dataset<Row> gameSet = sparkSession.sql(gameSQL);
String gameIndex = MessageFormat.format(ESIndexConstant.PREPLAY_MATERIAL_INDEX,
DataTypeEnum.GAME.getName());
gameSet.write()
.format("org.elasticsearch.spark.sql")
.mode(SaveMode.Append)
.option("es.mapping.id", "id")
.save(gameIndex);
/*String appSQL = MessageFormat.format(SQLConstant.PREPLAY_MATERIAL_MODEL_SQL,
DataTypeEnum.APP.getType(), dataFlow.getType());
Dataset<Row> appSet = sparkSession.sql(appSQL);
String appIndex = MessageFormat.format(ESIndexConstant.PREPLAY_MATERIAL_INDEX,
DataTypeEnum.APP.getName());
appSet.write()
.format("org.elasticsearch.spark.sql")
.mode(SaveMode.Append)
.option("es.mapping.id", "id")
.save(appIndex);*/
sparkSession.stop();
}
}
package com.lining.code.spark.product.model;
import com.lining.code.constant.ESIndexConstant;
import com.lining.code.constant.SQLConstant;
import com.lining.code.enums.DataTypeEnum;
import com.lining.code.enums.DayEnum;
import com.lining.code.util.DateUtil;
import com.lining.code.util.SparkSessionUtil;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import java.text.MessageFormat;
/**
* @ Author :lining.
* @ Date :Created in 22:30 2022/5/9
* @ Description:产品列表数据同步
*/
public class ProductListModel03Action {
public static void main(String[] args) {
SparkSession spark = SparkSessionUtil.getSparkSession("ProductListModel03Action");
String date = DateUtil.getCurrentDate();
// spark.udf().register("DateFormatUdf", new MyUdf(), DataTypes.StringType);
if(args != null && args.length > 0){
date = args[0];
syncDataByDate(spark, date);
}else{
syncData(spark);
}
spark.stop();
}
public static void syncData(SparkSession spark){
String gameSQL = MessageFormat.format(SQLConstant.PRODUCT_LIST_MODEL_SQL,
DayEnum.DAY_03.getName(), DataTypeEnum.GAME.getType());
String gameIndex = MessageFormat.format(ESIndexConstant.PRODUCT_LIST_INDEX,
DataTypeEnum.GAME.getName(), DayEnum.DAY_03.getName());
Dataset<Row> gameSet = spark.sql(gameSQL);
gameSet.write()
.format("org.elasticsearch.spark.sql")
.mode(SaveMode.Overwrite)
.option("es.mapping.id", "id")
.save(gameIndex);
String appSQL = MessageFormat.format(SQLConstant.PRODUCT_LIST_MODEL_SQL,
DayEnum.DAY_03.getName(), DataTypeEnum.APP.getType());
String appIndex = MessageFormat.format(ESIndexConstant.PRODUCT_LIST_INDEX,
DataTypeEnum.APP.getName(), DayEnum.DAY_03.getName());
Dataset<Row> appSet = spark.sql(appSQL);
appSet.write()
.format("org.elasticsearch.spark.sql")
.mode(SaveMode.Overwrite)
.option("es.mapping.id", "id")
.save(appIndex);
}
public static void syncDataByDate(SparkSession spark, String date){
String gameSQL = MessageFormat.format(SQLConstant.PRODUCT_LIST_MODEL_SQL,
DayEnum.DAY_03.getName(), DataTypeEnum.GAME.getType());
String gameIndex = MessageFormat.format(ESIndexConstant.PRODUCT_LIST_DATE_INDEX,
DataTypeEnum.GAME.getName(), DayEnum.DAY_03.getName(), date);
Dataset<Row> gameSet = spark.sql(gameSQL);
gameSet.write()
.format("org.elasticsearch.spark.sql")
.mode(SaveMode.Overwrite)
.option("es.mapping.id", "id")
.save(gameIndex);
String appSQL = MessageFormat.format(SQLConstant.PRODUCT_LIST_MODEL_SQL,
DayEnum.DAY_03.getName(), DataTypeEnum.APP.getType());
String appIndex = MessageFormat.format(ESIndexConstant.PRODUCT_LIST_DATE_INDEX,
DataTypeEnum.APP.getName(), DayEnum.DAY_03.getName(), date);
Dataset<Row> appSet = spark.sql(appSQL);
appSet.write()
.format("org.elasticsearch.spark.sql")
.mode(SaveMode.Overwrite)
.option("es.mapping.id", "id")
.save(appIndex);
}
}
package com.lining.code.spark.product.model;
import com.lining.code.constant.ESIndexConstant;
import com.lining.code.constant.SQLConstant;
import com.lining.code.enums.DataTypeEnum;
import com.lining.code.enums.DayEnum;
import com.lining.code.util.DateUtil;
import com.lining.code.util.SparkSessionUtil;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import java.text.MessageFormat;
/**
* @ Author :lining.
* @ Date :Created in 22:30 2022/5/9
* @ Description:产品列表数据同步
*/
public class ProductListModel07Action {
public static void main(String[] args) {
SparkSession spark = SparkSessionUtil.getSparkSession("ProductListModel07Action");
String date = DateUtil.getCurrentDate();
if(args != null && args.length > 0){
date = args[0];
syncDataByDate(spark, date);
}else{
syncData(spark);
}
spark.stop();
}
private static void syncData(SparkSession spark) {
String gameSQL = MessageFormat.format(SQLConstant.PRODUCT_LIST_MODEL_SQL,
DayEnum.DAY_07.getName(), DataTypeEnum.GAME.getType());
String gameIndex = MessageFormat.format(ESIndexConstant.PRODUCT_LIST_INDEX,
DataTypeEnum.GAME.getName(), DayEnum.DAY_07.getName());
Dataset<Row> gameSet = spark.sql(gameSQL);
gameSet.write()
.format("org.elasticsearch.spark.sql")
.mode(SaveMode.Overwrite)
.option("es.mapping.id", "id")
.save(gameIndex);
String appSQL = MessageFormat.format(SQLConstant.PRODUCT_LIST_MODEL_SQL,
DayEnum.DAY_07.getName(), DataTypeEnum.APP.getType());
String appIndex = MessageFormat.format(ESIndexConstant.PRODUCT_LIST_INDEX,
DataTypeEnum.APP.getName(), DayEnum.DAY_07.getName());
Dataset<Row> appSet = spark.sql(appSQL);
appSet.write()
.format("org.elasticsearch.spark.sql")
.mode(SaveMode.Overwrite)
.option("es.mapping.id", "id")
.save(appIndex);
}
private static void syncDataByDate(SparkSession spark, String date) {
String gameSQL = MessageFormat.format(SQLConstant.PRODUCT_LIST_MODEL_SQL,
DayEnum.DAY_07.getName(), DataTypeEnum.GAME.getType());
String gameIndex = MessageFormat.format(ESIndexConstant.PRODUCT_LIST_DATE_INDEX,
DataTypeEnum.GAME.getName(), DayEnum.DAY_07.getName(), date);
Dataset<Row> gameSet = spark.sql(gameSQL);
gameSet.write()
.format("org.elasticsearch.spark.sql")
.mode(SaveMode.Overwrite)
.option("es.mapping.id", "id")
.save(gameIndex);
String appSQL = MessageFormat.format(SQLConstant.PRODUCT_LIST_MODEL_SQL,
DayEnum.DAY_07.getName(), DataTypeEnum.APP.getType());
String appIndex = MessageFormat.format(ESIndexConstant.PRODUCT_LIST_DATE_INDEX,
DataTypeEnum.APP.getName(), DayEnum.DAY_07.getName(), date);
Dataset<Row> appSet = spark.sql(appSQL);
appSet.write()
.format("org.elasticsearch.spark.sql")
.mode(SaveMode.Overwrite)
.option("es.mapping.id", "id")
.save(appIndex);
}
}
package com.lining.code.spark.product.model;
import com.lining.code.constant.ESIndexConstant;
import com.lining.code.constant.SQLConstant;
import com.lining.code.enums.DataTypeEnum;
import com.lining.code.enums.DayEnum;
import com.lining.code.util.DateUtil;
import com.lining.code.util.SparkSessionUtil;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import java.text.MessageFormat;
/**
* @ Author :lining.
* @ Date :Created in 22:30 2022/5/9
* @ Description:产品列表数据同步
*/
public class ProductListModel30Action {
public static void main(String[] args) {
SparkSession spark = SparkSessionUtil.getSparkSession("ProductListModel30Action");
String date = DateUtil.getCurrentDate();
if(args != null && args.length > 0){
date = args[0];
syncDataByDate(spark, date);
}else{
syncData(spark);
}
spark.stop();
}
private static void syncDataByDate(SparkSession spark, String date) {
String gameSQL = MessageFormat.format(SQLConstant.PRODUCT_LIST_MODEL_SQL,
DayEnum.DAY_30.getName(), DataTypeEnum.GAME.getType());
String gameIndex = MessageFormat.format(ESIndexConstant.PRODUCT_LIST_DATE_INDEX,
DataTypeEnum.GAME.getName(), DayEnum.DAY_30.getName(), date);
Dataset<Row> gameSet = spark.sql(gameSQL);
gameSet.write()
.format("org.elasticsearch.spark.sql")
.mode(SaveMode.Overwrite)
.option("es.mapping.id", "id")
.save(gameIndex);
String appSQL = MessageFormat.format(SQLConstant.PRODUCT_LIST_MODEL_SQL,
DayEnum.DAY_30.getName(), DataTypeEnum.APP.getType());
String appIndex = MessageFormat.format(ESIndexConstant.PRODUCT_LIST_DATE_INDEX,
DataTypeEnum.APP.getName(), DayEnum.DAY_30.getName(), date);
Dataset<Row> appSet = spark.sql(appSQL);
appSet.write()
.format("org.elasticsearch.spark.sql")
.mode(SaveMode.Overwrite)
.option("es.mapping.id", "id")
.save(appIndex);
}
private static void syncData(SparkSession spark) {
String gameSQL = MessageFormat.format(SQLConstant.PRODUCT_LIST_MODEL_SQL,
DayEnum.DAY_30.getName(), DataTypeEnum.GAME.getType());
String gameIndex = MessageFormat.format(ESIndexConstant.PRODUCT_LIST_INDEX,
DataTypeEnum.GAME.getName(), DayEnum.DAY_30.getName());
Dataset<Row> gameSet = spark.sql(gameSQL);
gameSet.write()
.format("org.elasticsearch.spark.sql")
.mode(SaveMode.Overwrite)
.option("es.mapping.id", "id")
.save(gameIndex);
String appSQL = MessageFormat.format(SQLConstant.PRODUCT_LIST_MODEL_SQL,
DayEnum.DAY_30.getName(), DataTypeEnum.APP.getType());
String appIndex = MessageFormat.format(ESIndexConstant.PRODUCT_LIST_INDEX,
DataTypeEnum.APP.getName(), DayEnum.DAY_30.getName());
Dataset<Row> appSet = spark.sql(appSQL);
appSet.write()
.format("org.elasticsearch.spark.sql")
.mode(SaveMode.Overwrite)
.option("es.mapping.id", "id")
.save(appIndex);
}
}
package com.lining.code.spark.product.model;
import com.lining.code.constant.ESIndexConstant;
import com.lining.code.constant.SQLConstant;
import com.lining.code.enums.DataFlowEnum;
import com.lining.code.enums.DataTypeEnum;
import com.lining.code.util.SparkSessionUtil;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import java.text.MessageFormat;
/**
* @ Author :lining.
* @ Date :Created in 16:31 2022/5/8
* @ Description:产品同步
*/
public class ProductModelAction {
public static void main(String[] args) {
SparkSession spark = SparkSessionUtil.getSparkSession("ProductModelAction");
String type = args[0];
DataFlowEnum dataFlow = DataFlowEnum.getDataFlow(type);
String gameSQL = MessageFormat.format(SQLConstant.PRODUCT_MODEL_SQL,
DataTypeEnum.GAME.getType(), dataFlow.getType());
String gameIndex = MessageFormat.format(ESIndexConstant.PRODUCT_INDEX,
DataTypeEnum.GAME.getName());
Dataset<Row> game = spark.sql(gameSQL);
game.write()
.format("org.elasticsearch.spark.sql")
.mode(SaveMode.Append)
.option("es.mapping.id", "id")
.save(gameIndex);
String appSQL = MessageFormat.format(SQLConstant.PRODUCT_MODEL_SQL,
DataTypeEnum.APP.getType(), dataFlow.getType());
String appIndex = MessageFormat.format(ESIndexConstant.PRODUCT_INDEX,
DataTypeEnum.APP.getName());
Dataset<Row> app = spark.sql(appSQL);
app.write()
.format("org.elasticsearch.spark.sql")
.mode(SaveMode.Append)
.option("es.mapping.id", "id")
.save(appIndex);
spark.stop();
}
}
package com.lining.code.spark.trend.model;
import com.lining.code.constant.ESIndexConstant;
import com.lining.code.constant.SQLConstant;
import com.lining.code.enums.DataFlowEnum;
import com.lining.code.enums.DataTypeEnum;
import com.lining.code.util.SparkSessionUtil;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import java.text.MessageFormat;
/**
* @ Author :lining.
* @ Date :Created in 16:31 2022/5/8
* @ Description:创意组趋势同步
*/
public class CreativeTrendModelAction {
public static void main(String[] args) {
SparkSession sparkSession = SparkSessionUtil.getSparkSession("CreativeTrendModelAction");
String gameSQL = MessageFormat.format(SQLConstant.CREATIVE_TREND_MODEL_SQL,
DataTypeEnum.GAME.getType());
Dataset<Row> gameSet = sparkSession.sql(gameSQL);
String gameIndex = MessageFormat.format(ESIndexConstant.CREATIVE_TREND_INDEX,
DataTypeEnum.GAME.getName());
gameSet.write()
.format("org.elasticsearch.spark.sql")
.mode(SaveMode.Append)
.option("es.mapping.id", "id")
.save(gameIndex);
String appSQL = MessageFormat.format(SQLConstant.CREATIVE_TREND_MODEL_SQL,
DataTypeEnum.APP.getType());
Dataset<Row> appSet = sparkSession.sql(appSQL);
String appIndex = MessageFormat.format(ESIndexConstant.CREATIVE_TREND_INDEX,
DataTypeEnum.APP.getName());
appSet.write()
.format("org.elasticsearch.spark.sql")
.mode(SaveMode.Append)
.option("es.mapping.id", "id")
.save(appIndex);
sparkSession.stop();
}
}
package com.lining.code.spark.trend.model;
import com.lining.code.constant.ESIndexConstant;
import com.lining.code.constant.SQLConstant;
import com.lining.code.enums.DataFlowEnum;
import com.lining.code.enums.DataTypeEnum;
import com.lining.code.util.PropertiesUtil;
import com.lining.code.util.SparkSessionUtil;
import org.apache.spark.api.java.function.ForeachFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import java.text.MessageFormat;
/**
* @ Author :lining.
* @ Date :Created in 16:31 2022/5/8
* @ Description:素材趋势模型同步
*/
public class MaterialTrendModelAction {
public static void main(String[] args) {
SparkSession sparkSession = SparkSessionUtil.getSparkSession("MaterialTrendModelAction");
String gameSQL = MessageFormat.format(SQLConstant.MATERIAL_TREND_MODEL_SQL,
DataTypeEnum.GAME.getType());
Dataset<Row> gameSet = sparkSession.sql(gameSQL);
String gameIndex = MessageFormat.format(ESIndexConstant.MATERIAL_TREND_INDEX,
DataTypeEnum.GAME.getName());
gameSet.write()
.format("org.elasticsearch.spark.sql")
.mode(SaveMode.Append)
.option("es.mapping.id", "id")
.save(gameIndex);
String appSQL = MessageFormat.format(SQLConstant.MATERIAL_TREND_MODEL_SQL,
DataTypeEnum.APP.getType());
Dataset<Row> appSet = sparkSession.sql(appSQL);
String appIndex = MessageFormat.format(ESIndexConstant.MATERIAL_TREND_INDEX,
DataTypeEnum.APP.getName());
appSet.write()
.format("org.elasticsearch.spark.sql")
.mode(SaveMode.Append)
.option("es.mapping.id", "id")
.save(appIndex);
sparkSession.stop();
}
}
package com.lining.code.spark.udf;
import com.lining.code.util.DateUtil;
import org.apache.spark.sql.api.java.UDF1;
import java.sql.Timestamp;
import java.util.Date;
/**
* @ Author :lining.
* @ Date :Created in 14:51 2022/5/16
* @ Description:日期转换UDF
*/
public class DateUdf implements UDF1<Timestamp, String> {
@Override
public String call(Timestamp timestamp) throws Exception {
Date date = new Date(timestamp.getTime());
return DateUtil.format(date, DateUtil.YYYY_MM_DD_HH_mm_ss);
}
}
package com.lining.code.util;
import java.text.SimpleDateFormat;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Date;
/**
* @ Author :lining.
* @ Date :Created in 10:48 2022/3/15
* @ Description:日期工具类
*/
public class DateUtil {
public static final String YYYYMMDD = "yyyyMMdd";
public static final String YYYY_MM_DD_HH_mm_ss = "yyyy-MM-dd HH:mm:ss";
/**
* 获取当前日期 yyyy-MM-dd
* @return
*/
public static String getCurrentDate(){
return LocalDate.now().format(DateTimeFormatter.ofPattern(YYYYMMDD));
}
/**
* 获取当前日期 yyyy-MM-dd HH:mm:ss
* @return
*/
public static String format(Date date, String format){
SimpleDateFormat dateFromat = new SimpleDateFormat();
dateFromat.applyPattern(format);
return dateFromat.format(date);
}
/**
* 获取当前日期,自定义格式
* @param format
* @return
*/
public static String currentTimeFormat(String format){
return LocalDateTime.now().format(DateTimeFormatter.ofPattern(format));
}
public static void main(String[] args) {
String format = format(new Date(), YYYY_MM_DD_HH_mm_ss);
System.out.println(format);
}
}
package com.lining.code.util;
import org.apache.commons.lang.StringUtils;
import java.io.InputStream;
import java.util.Map;
import java.util.Properties;
/**
* @ Author :lining.
* @ Date :Created in 16:32 2022/5/8
* @ Description:配置属性读取
*/
public class PropertiesUtil {
private static Properties prop = null;
static{
//获取配置
try {
prop = new Properties();
ClassLoader loader = Thread.currentThread().getContextClassLoader();
InputStream inputStream = loader.getResourceAsStream("config.properties");
prop.load(inputStream);
} catch (Exception e) {
e.printStackTrace();
}
}
public static String getProperty(String property){
return prop.getProperty(property);
}
public static String getProperty(String property,String defaultValue){
String value = prop.getProperty(property);
if(value == null || "".equals(value)){
value = defaultValue;
}
return value;
}
public static String getStrValue(Map map, String key) {
if (map == null || map.isEmpty() || StringUtils.isBlank(key)) {
return "";
}
Object t = map.get(key);
if (t != null) {
if(t instanceof Integer){
return t+"";
}
return t.toString();
} else {
for (Object o : map.keySet()) {
String name = (String) o;
if (name.toLowerCase().equals(key.toLowerCase())) {
Object value = map.get(o);
if (value == null) {
return "";
}
return value.toString();
}
}
}
return "";
}
}
package com.lining.code.util;
import org.apache.spark.sql.SparkSession;
/**
* @ Author :lining.
* @ Date :Created in 20:57 2022/5/10
* @ Description:spark session 会话工具类
*/
public class SparkSessionUtil {
public static SparkSession getSparkSession(String appName){
SparkSession sparkSession = org.apache.spark.sql.SparkSession.builder().appName(appName) // .master("local[*]")
.config("spark.files.ignoreMissingFiles",true) //解决分区损坏问题
.config("spark.hadoop.yarn.timeline-service.enabled",false)//java.lang.NoClassDefFoundError: com/sun/jersey/api/client/config/ClientConfig
.config("spark.sql.broadcastTimeout", PropertiesUtil.getProperty("spark.sql.broadcastTimeout","3600"))
.config("spark.network.timeout",PropertiesUtil.getProperty("spark.network.timeout"))
.config("spark.sql.parquet.compression.codec",PropertiesUtil.getProperty("spark.sql.parquet.compression.codec","gzip"))
.config("es.index.auto.create", "true")//不自动建立,提早建立索引
.config("es.nodes",PropertiesUtil.getProperty("es.nodes"))
.config("es.port",PropertiesUtil.getProperty("es.port"))
.config("es.net.http.auth.user",PropertiesUtil.getProperty("es.net.http.auth.user"))
.config("es.net.http.auth.pass",PropertiesUtil.getProperty("es.net.http.auth.pass"))
.config("es.batch.size.bytes",PropertiesUtil.getProperty("es.batch.size.bytes"))
.config("es.batch.size.entries",PropertiesUtil.getProperty("es.batch.size.entries","100")) // 100 解决Could not write all entries
.config("es.batch.write.refresh",PropertiesUtil.getProperty("es.batch.write.refresh","false"))
.config("es.batch.write.retry.count",PropertiesUtil.getProperty("es.batch.write.retry.count","30")) // 3 解决Could not write all entries
.config("es.batch.write.retry.wait",PropertiesUtil.getProperty("es.batch.write.retry.wait","100")) // 10s 解决Could not write all entries
.config("es.http.timeout",PropertiesUtil.getProperty("es.http.timeout"))
.config("es.http.retries",PropertiesUtil.getProperty("es.http.retries"))
.config("es.action.heart.beat.lead",PropertiesUtil.getProperty("es.action.heart.beat.lead"))
.config("es.nodes.wan.only",PropertiesUtil.getProperty("es.nodes.wan.only","true"))
.config("es.nodes.data.only",PropertiesUtil.getProperty("es.nodes.data.only","true"))
.config("es.nodes.discovery",PropertiesUtil.getProperty("es.nodes.discovery","true"))
.config("es.input.use.sliced.partitions",PropertiesUtil.getProperty("es.input.use.sliced.partitions","50000"))
.config("es.input.max.docs.per.partition",PropertiesUtil.getProperty("es.input.max.docs.per.partition","100000"))
.config("es.net.http.header.Accept-Languag",PropertiesUtil.getProperty("es.net.http.header.Accept-Languag","gzip"))
.enableHiveSupport().getOrCreate();
return sparkSession;
}
}
#spark配置
spark.sql.broadcastTimeout=3600
spark.network.timeout=1400s
spark.sql.parquet.compression.codec=snappy
################################################################
#es配置
es.nodes=10.1.0.59
es.port=9200
es.net.http.auth.user=elastic
es.net.http.auth.pass=Reyun@123
es.batch.size.bytes=20000000
es.batch.size.entries=5000
es.batch.write.refresh=true
es.batch.write.retry.count=50
es.batch.write.retry.wait=500
es.http.timeout=5m
es.http.retries=50
es.action.heart.beat.lead=50
es.nodes.wan.only=true
es.nodes.data.only =false
es.nodes.discovery=false
es.input.use.sliced.partitions=50000
es.input.max.docs.per.partition=100000
es.net.http.header.Accept-Languag=gzip
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment