package mobvista.dmp.datasource.adn.mapreduce;

import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import mobvista.dmp.common.CommonMapReduce;
import mobvista.dmp.common.CommonMapper;
import mobvista.dmp.common.CommonReducer;
import mobvista.dmp.util.MD5Util;
import mobvista.dmp.util.MRUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.Map;
import java.util.Set;

/**
 * author: walt
 * date  : 16-11-29
 * desc  : 从ods_adn_trackingnew_request中抽取appid字段,匹配出package_name输出到etl_adn_sdk_request_daily
 */
public class AdnSdkRequestPkgDailyMR extends CommonMapReduce {
    private static final Logger logger = LoggerFactory.getLogger(AdnSdkRequestPkgDailyMR.class);

    public AdnSdkRequestPkgDailyMR(String name, Class<? extends Mapper> mapperClass, Class<? extends Reducer> reducerClass) {
        super(name, mapperClass, reducerClass);
    }

    public static void main(String[] args) throws Exception {
        start(new AdnSdkRequestPkgDailyMR("adn sdk request pkg daily job", AdnSdkMapper.class, AdnSdkReducer.class), args);
    }

    public static class AdnSdkMapper extends CommonMapper {
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String[] array = MRUtils.SPLITTER.split(value.toString(), -1);
             /*  if (array.length >= 54) {
              String gaid = array[42];
                String idfa = array[43];
                String imei = array[35];
                String model = array[16];
                String osVersion = array[14];
                String androidId = array[37];
                String platform = array[13];
                String appId = array[4];
                String extSysid = "";
                if (array.length >= 118) {
                    extSysid = array[117];
                } */
            if (array.length >= 22) {
                String gaid = array[15];
                String idfa = array[16];
                String imei = array[12];
                String model = array[7];
                String osVersion = array[5];
                String androidId = array[14];
                String platform = array[4];
                String appId = array[3];
                String extSysid = array[18];
                String oaid = array[21];
                String idfv = "";
                if (array.length >= 23) {
                    idfv = array[22];
                }
                String ruid = "";
                if (array.length >= 24) {
                    ruid = array[23];
                }

                if ("android".equals(platform) && !gaid.isEmpty()) {
                    CommonMapReduce.setMetrics(context, "DMP", "gaid", 1);
                    if ("0".equals(gaid)) {
                        CommonMapReduce.setMetrics(context, "DMP", "gaid_zero", 1);
                    }
                }

                if ("android".equals(platform) && !imei.isEmpty()) {
                    CommonMapReduce.setMetrics(context, "DMP", "imei", 1);
                    if ("0".equals(imei)) {
                        CommonMapReduce.setMetrics(context, "DMP", "imei_zero", 1);
                    }
                }

                if ("ios".equals(platform)) {
                    CommonMapReduce.setMetrics(context, "DMP", "ios", 1);
                    if ("0".equals(idfa)) {
                        CommonMapReduce.setMetrics(context, "DMP", "idfa_zero", 1);
                    }
                }

                outValue.set(MRUtils.JOINER.join(appId, model, osVersion));


                //新增自有Id
                String sysIdType = GetDevIdUtil.getExtSysId(extSysid);
                if (StringUtils.isNotBlank(sysIdType)) {
                    outKey.set(MRUtils.JOINER.join(sysIdType, platform.toLowerCase()));
                    context.write(outKey, outValue);
                }


                String devIdType = GetDevIdUtil.getIdByIdAndPlatform(gaid, idfa, extSysid, platform);
                if (StringUtils.isNotBlank(devIdType)) {
                    outKey.set(MRUtils.JOINER.join(devIdType, platform.toLowerCase()));
                    context.write(outKey, outValue);
                }
                if (!imei.isEmpty() && imei.matches(CommonMapReduce.imeiPtn) && "android".equals(platform)) {
                    outKey.set(MRUtils.JOINER.join(imei, "imei", platform.toLowerCase()));
                    context.write(outKey, outValue);
                    // 卓萌需求,imei转换成imeimd5类型 2020.03.16
                    try {
                        String imeiMd5 = MD5Util.getMD5Str(imei);
                        outKey.set(MRUtils.JOINER.join(imeiMd5, "imeimd5", platform.toLowerCase()));
                        context.write(outKey, outValue);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }

                /*
                if (gaid.matches(CommonMapReduce.didPtn)
                        && !gaid.equals(CommonMapReduce.allZero)) {
                    outKey.set(MRUtils.JOINER.join(gaid, "gaid", platform.toLowerCase()));
                    context.write(outKey, outValue);
                }
                if (idfa.matches(CommonMapReduce.didPtn)
                        && !idfa.equals(CommonMapReduce.allZero)) {
                    outKey.set(MRUtils.JOINER.join(idfa, "idfa", platform.toLowerCase()));
                    context.write(outKey, outValue);
                }
                 */

                if (!androidId.isEmpty() && androidId.matches(CommonMapReduce.andriodIdPtn) && "android".equals(platform)) {
                    outKey.set(MRUtils.JOINER.join(androidId, "androidid", "android"));
                    context.write(outKey, outValue);
                }
                //  卓盟需求,新增oaid字段 2020.07.27
                if (!oaid.isEmpty() && !CommonMapReduce.allZero.equals(oaid) && "android".equals(platform) && !oaid.matches("^0+$")) {
                    outKey.set(MRUtils.JOINER.join(oaid, "oaid", "android"));
                    context.write(outKey, outValue);
                }

                //  算法需求,新增 idfv 字段 2020.08.03
                if (StringUtils.isNotBlank(idfv) && !CommonMapReduce.allZero.equals(idfv) && "ios".equals(platform) && idfv.matches(didPtn)) {
                    outKey.set(MRUtils.JOINER.join(idfv, "idfv", "ios"));
                    context.write(outKey, outValue);
                }
                //  算法需求,新增 ruid 字段 2021.02.25
                if (StringUtils.isNotBlank(ruid) && ruid.length() > 16) {
                    outKey.set(MRUtils.JOINER.join(ruid, "ruid", "ios"));
                    context.write(outKey, outValue);
                }
            }
        }
    }


    public static class AdnSdkReducer extends CommonReducer {
        private Map<String, String> appIdMap;
        private String date;

        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            appIdMap = Maps.newHashMap();
            for (URI uri : context.getCacheFiles()) {
                logger.info("load cache {}", uri.getPath());
                FileSystem fileSystem = FileSystem.get(uri, context.getConfiguration());
                BufferedReader reader = new BufferedReader(new InputStreamReader(fileSystem.open(new Path(uri.getPath()))));
                String line;
                while ((line = reader.readLine()) != null) {
                    String[] array = line.split(",");
                    if (array[1].isEmpty() || array[1].equals("null")) {
                        continue;
                    }
                    appIdMap.put(array[0], array[1]);
                }
                reader.close();
            }
            logger.info("app id map: {}", appIdMap);
            date = Preconditions.checkNotNull(context.getConfiguration().get("task.date"));
        }

        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            String model = "";
            String osVersion = "";
            String device = key.toString();
            Set<String> set = Sets.newHashSet();
            for (Text text : values) {     // 如此去重当数据倾斜时间会出现OOM问题
                String[] valSplits = MRUtils.SPLITTER.split(text.toString(), -1);
                set.add(valSplits[0]);
                model = valSplits[1];
                osVersion = valSplits[2];
            }
            for (String appId : set) {

                String pkgName = Strings.nullToEmpty(appIdMap.get(appId));
                if (pkgName.isEmpty()) { //no pkgname for this appid
                    CommonMapReduce.setMetrics(context, "DMP", "appid_no_pkg", 1);
                }
                out.set(MRUtils.JOINER.join(
                        device, appId, pkgName, date, model, osVersion
                ));
                //device_id device_type platform app_id pkg_name date
                context.write(out, NullWritable.get());
            }
        }
    }

    @Override
    protected void otherSetting(Job job, String[] args) throws Exception {
        Path mapping = new Path(args[1]);
        for (FileStatus status : mapping.getFileSystem(job.getConfiguration()).listStatus(mapping)) {
            logger.info("add file {} into cache", status.getPath().toString());
            job.addCacheFile(new URI(status.getPath().toString()));
        }
        // 添加人工appid与package对应关系文件
        job.addCacheFile(URI.create(args[3]));
    }

    @Override
    protected void setOutputPath(Job job, String[] args) throws IOException {
        FileOutputFormat.setOutputPath(job, new Path(args[2]));
        FileOutputFormat.setCompressOutput(job, true);
        FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
    }

    @Override
    protected void setInputPath(Job job, String[] args) throws IOException {
        FileInputFormat.addInputPath(job, new Path(args[0]));

    }
}