package mobvista.dmp.datasource.dsp.mapreduce;

import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import mobvista.dmp.common.CommonMapReduce;
import mobvista.dmp.format.TextMultipleOutputFormat;
import mobvista.dmp.util.MD5Util;
import mobvista.dmp.util.MRUtils;
import mobvista.prd.datasource.util.GsonUtil;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.orc.mapred.OrcStruct;
import org.codehaus.jackson.map.ObjectMapper;

import java.io.IOException;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;

import static mobvista.dmp.common.CommonMapReduce.setMetrics;

/**
 * author: zhi.liu
 * date  : 18-07-18
 * desc  :
 * output: device_id, device_type, platform, country_code, ip, gender, birthday, maker, model, os_version, package_list, androidIds, datetime, segment
 */
public class DspDailyEtlMR3Demo extends Configured implements Tool {


    /**
     * struct<idfa:string,
     * gaid:string,
     * platform:string,
     * country:string,
     * ip:string,
     * gender:string,
     * birthday:string,
     * maker:string,
     * model:string,
     * osVersion:string,
     * packageName:string,
     * exitId:string,
     * time:string,
     * geoInfo:string,
     * longitude:string,
     * latitude:string>
     */
    public enum Fields {
        IDFA("idfa", 0),
        GAID("gaid", 1),
        PLATFORM("platform", 2),
        COUNTRY("country", 3),
        IP("ip", 4),
        GENDER("gender", 5),
        BIRTHDAY("birthday", 6),
        MAKER("maker", 7),
        MODEL("model", 8),
        OS_VERSION("osVersion", 9),
        PKG_NAME("package_name", 10),
         EXT_ID("ext_id", 11),
        UPDATE_TIME("time",12),
        GEO_INFO("geoInfo", 13),
        LONGITUDE("longitude", 14),
        LATITUDE("latitude", 15),
        SEGMET("segment", 16),
        REGION("region", 17);


    private String name;
        private int idx;

        Fields(String name, int idx) {
            this.name = name;
            this.idx = idx;
        }

        public String getName() {
            return name;
        }

        public int getIdx() {
            return idx;
        }
    }

    public static class DspDailyEtlMapper3 extends Mapper<NullWritable, OrcStruct, Text, Text> {
        private Pattern iosPkgPtn = Pattern.compile("^\\d+$");
        private Pattern adrPkgPtn = Pattern.compile("^[0-9a-zA-Z\\.]+$");
        private Pattern deviceIdPtn = Pattern.compile("^[0-9a-fA-F]{8}(-[0-9a-fA-F]{4}){3}-[0-9a-fA-F]{12}$");
        private Pattern idSplitPtn = Pattern.compile(",");
        private Text outKey = new Text();
        private Text outValue = new Text();

        @Override
        protected void map(NullWritable key, OrcStruct value, Context context) throws IOException, InterruptedException {
            if (value.getNumFields() < 16) {
                setMetrics(context, "DMP", "dsp_log_fields_num_error", 1);
                return;
            }

            String region ="";

            if (value.getFieldValue(Fields.REGION.getIdx()) != null) {
                region = value.getFieldValue(Fields.REGION.getIdx()).toString();
            }

            String idfa = "";
            if (value.getFieldValue(Fields.IDFA.getIdx()) != null) {
                String idfaTmp = ((Text) value.getFieldValue(Fields.IDFA.getIdx())).toString();
                if (StringUtils.isNotBlank(idfaTmp) && deviceIdPtn.matcher(idfaTmp).matches()) {
                    idfa = idfaTmp;
                } else {
                    setMetrics(context, "DMP", "idfa_illegal_format", 1);
                }
            }

            String gaid = "";
            if (value.getFieldValue(Fields.GAID.getIdx()) != null) {
                String gaidTmp = ((Text) value.getFieldValue(Fields.GAID.getIdx())).toString();
                if (StringUtils.isNotBlank(gaidTmp) && deviceIdPtn.matcher(gaidTmp).matches()) {
                    gaid = gaidTmp;
                } else {
                    setMetrics(context, "DMP", "gaid_illegal_format", 1);
                }
            }

            String countryCode = "";
            if (value.getFieldValue(Fields.COUNTRY.getIdx()) != null) {
                countryCode = value.getFieldValue(Fields.COUNTRY.getIdx()).toString();
            }

            String platform = "";
            if (value.getFieldValue(Fields.PLATFORM.getIdx()) != null) {
                platform = ((Text) value.getFieldValue(Fields.PLATFORM.getIdx())).toString();
            } else {
                context.getCounter("DMP", "platformIsNull").increment(1l);  // platform为空则不处理
                return;
            }

            // String ids = "";
            String androidId = "";
            String imei = "";
            String imeimd5 = "";
            // gaid,idfa,idfamd5,idfasha1,imei,imeimd5,imeisha1,androidid,androididmd5,androididsha1,macmd5,macsha1
            if (value.getFieldValue(Fields.EXT_ID.getIdx()) != null) {
                String ids = ((Text) value.getFieldValue(Fields.EXT_ID.getIdx())).toString();
                if (StringUtils.isNotBlank(ids)) {
                    String[] devIds = idSplitPtn.split(((Text) value.getFieldValue(Fields.EXT_ID.getIdx())).toString(), -1);
                    if (devIds.length >= 8) {  //!androidid.isEmpty() && androidid.matches(CommonMapReduce.andriodIdPtn)
                        if (StringUtils.isNotBlank(devIds[7]) && devIds[7].matches(CommonMapReduce.andriodIdPtn)) {
                            androidId = devIds[7];
                        }
                        if (StringUtils.isNotBlank(devIds[4]) && devIds[4].matches(CommonMapReduce.imeiPtn) && "android".equals(platform) && StringUtils.isNotBlank(countryCode) && "CN".equalsIgnoreCase(countryCode)) {
                            imei = devIds[4];
                        }
                        if (StringUtils.isNotBlank(devIds[5]) && devIds[5].matches(CommonMapReduce.imeiMd5Ptn) && "android".equals(platform) && StringUtils.isNotBlank(countryCode) && "CN".equalsIgnoreCase(countryCode)) {
                            imeimd5 = devIds[5];
                        }
                    } else { // 设置计数器
                        setMetrics(context, "DMP", "extIdlength<8", 1);
                    }
                } else {
                    setMetrics(context, "DMP", "EXT_IDisNull", 1);
                }
            }

            // 过滤掉gaid和idfa全为空的情况
            if (StringUtils.isBlank(gaid) && StringUtils.isBlank(idfa) && StringUtils.isBlank(androidId) && StringUtils.isBlank(imei) && StringUtils.isBlank(imeimd5)) {
                context.getCounter("DMP", "deviceIdError").increment(1l);  // 不能同时为空
                return;
            }


            String packageName = "";
            if (value.getFieldValue(Fields.PKG_NAME.getIdx()) != null) {
                packageName = value.getFieldValue(Fields.PKG_NAME.getIdx()).toString();
            }

            String deviceId = "";
            String deviceType = null;
            String geoInfo = null;

            if (value.getFieldValue(Fields.GEO_INFO.getIdx()) != null) {
                geoInfo = value.getFieldValue(Fields.GEO_INFO.getIdx()).toString();
            }

            String longitude = null; //经度
            String latitude = null;  //纬度
            String segment = null;   // double click segment
            if (value.getFieldValue(Fields.LONGITUDE.getIdx()) != null) {
                longitude = value.getFieldValue(Fields.LONGITUDE.getIdx()).toString();
            }
            if (value.getFieldValue(Fields.LATITUDE.getIdx()) != null) {
                latitude = value.getFieldValue(Fields.LATITUDE.getIdx()).toString();
            }

            if (value.getFieldValue(Fields.SEGMET.getIdx()) != null) {
                segment = value.getFieldValue(Fields.SEGMET.getIdx()).toString();
            }

            // 去掉IOS中id开头包名中的id
            if (platform.equals("ios") && packageName.matches("^id\\d+$")) {
                packageName = packageName.replace("id", "");
            }

            if (platform.equals("ios") && idfa.length() > 4 && checkPkgName("ios", packageName) && !CommonMapReduce.allZero.equals(idfa)) {
                deviceId = idfa;
                deviceType = "idfa";
            } else if (platform.equals("android") && gaid.length() > 4 && checkPkgName("adr", packageName) && !CommonMapReduce.allZero.equals(gaid)) {
                deviceId = gaid;
                deviceType = "gaid";
            }


            if (value.getFieldValue(Fields.PKG_NAME.getIdx()) != null) {
                packageName = value.getFieldValue(Fields.PKG_NAME.getIdx()).toString();
            }


            String ip = "";
            if (value.getFieldValue(Fields.IP.getIdx()) != null) {
                ip = value.getFieldValue(Fields.IP.getIdx()).toString();
            }
            String gender = "";
            if (value.getFieldValue(Fields.GENDER.getIdx()) != null) {
                gender = value.getFieldValue(Fields.GENDER.getIdx()).toString();
            }
            String birthday = "";
            if (value.getFieldValue(Fields.BIRTHDAY.getIdx()) != null) {
                birthday = value.getFieldValue(Fields.BIRTHDAY.getIdx()).toString();
            }
            String maker = "";
            if (value.getFieldValue(Fields.MAKER.getIdx()) != null) {
                maker = value.getFieldValue(Fields.MAKER.getIdx()).toString();
            }
            String model = "";
            if (value.getFieldValue(Fields.MODEL.getIdx()) != null) {
                model = value.getFieldValue(Fields.MODEL.getIdx()).toString();
            }
            String osVersion = "";
            if (value.getFieldValue(Fields.OS_VERSION.getIdx()) != null) {
                osVersion = value.getFieldValue(Fields.OS_VERSION.getIdx()).toString();
            }

            String updateTime = "";
            if (value.getFieldValue(Fields.UPDATE_TIME.getIdx()) != null) {
                updateTime = value.getFieldValue(Fields.UPDATE_TIME.getIdx()).toString();
            }

            if (!deviceId.isEmpty()) {
                outKey.set(MRUtils.JOINER.join(deviceId, deviceType));
                outValue.set(MRUtils.JOINER.join(
                        platform,
                        countryCode,
                        ip,
                        gender,
                        birthday,
                        maker,
                        model,
                        osVersion,
                        packageName,
                        androidId,
                        updateTime,
                        geoInfo,
                        longitude,
                        latitude,
                        segment,
                        region
                )); // size = 16
                context.write(outKey, outValue);
            }

            if (!androidId.isEmpty()) {
                outKey.set(MRUtils.JOINER.join(androidId, "androidid"));
                outValue.set(MRUtils.JOINER.join(
                        "android",
                        countryCode,
                        ip,
                        gender,
                        birthday,
                        maker,
                        model,
                        osVersion,
                        packageName,
                        androidId,
                        updateTime,
                        geoInfo,
                        longitude,
                        latitude,
                        segment,
                        region
                )); // size = 15
                context.write(outKey, outValue);
            }

            if (StringUtils.isNotBlank(imei)) {
                outKey.set(MRUtils.JOINER.join(imei, "imei"));
                outValue.set(MRUtils.JOINER.join(
                        "android",
                        countryCode,
                        ip,
                        gender,
                        birthday,
                        maker,
                        model,
                        osVersion,
                        packageName,
                        androidId,
                        updateTime,
                        geoInfo,
                        longitude,
                        latitude,
                        segment,
                        region
                ));
                context.write(outKey, outValue);

                // 成卓萌新增需求,该需求时间 2020-03-02 imei明文要变成md5类型数据输出
                try {
                    String imeiMd5 = MD5Util.getMD5Str(imei);
                    outKey.set(MRUtils.JOINER.join(imeiMd5, "imeimd5"));
                    outValue.set(MRUtils.JOINER.join(
                            "android",
                            countryCode,
                            ip,
                            gender,
                            birthday,
                            maker,
                            model,
                            osVersion,
                            packageName,
                            androidId,
                            updateTime,
                            geoInfo,
                            longitude,
                            latitude,
                            segment,
                            region
                    ));
                    context.write(outKey, outValue);
                } catch (Exception e) {
                    e.printStackTrace();
                }

            }

            if (StringUtils.isNotBlank(imeimd5)) {
                outKey.set(MRUtils.JOINER.join(imeimd5, "imeimd5"));
                outValue.set(MRUtils.JOINER.join(
                        "android",
                        countryCode,
                        ip,
                        gender,
                        birthday,
                        maker,
                        model,
                        osVersion,
                        packageName,
                        androidId,
                        updateTime,
                        geoInfo,
                        longitude,
                        latitude,
                        segment,
                        region
                ));
                context.write(outKey, outValue);
            }
        }

        private boolean checkPkgName(String platform, String pkg) {
            switch (platform) {
                case "ios":
                    return iosPkgPtn.matcher(pkg).matches() || adrPkgPtn.matcher(pkg).matches();
                case "adr":
                    return adrPkgPtn.matcher(pkg).matches();
                default:
                    return false;
            }
        }
    }

    public static class DspDailyEtlReducer3 extends Reducer<Text, Text, Text, Text> {
        private ObjectMapper objectMapper = new ObjectMapper();
        private Text outKey = new Text();
        private Text outValue = new Text();
        private String outPath;
        private String detailOutPath;
        private StringBuilder builder = new StringBuilder();

        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            Configuration conf = context.getConfiguration();
            detailOutPath = conf.get("detailOutPath");
            outPath = conf.get("outPath");
        }

        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            Set<String> pkgSet = Sets.newHashSet();
            Set<String> regionSet = Sets.newHashSet();
            Map<String, SegmentVO> segmentMap = Maps.newHashMap();
            String[] outFields = null; //new String[11];
            Set<String> androidId = Sets.newHashSet();
            for (Text value : values) {
                String line = value.toString();
                String[] array = MRUtils.SPLITTER.split(line, -1);
                 if (array.length != 16) {
                     continue;
                }

                if (outFields == null) {
                    outFields = new String[14];
                    outFields[0] = key.toString(); //device_id \t device_type
                    for (int i = 1; i < 12; i++) {
                        outFields[i] = array[i - 1];
                    }
                }
                String packageNames= array[8];
                if(StringUtils.isNotBlank(packageNames)){
                    String [] pkgNames = packageNames.split("#",-1);
                    if(pkgNames != null && pkgNames.length != 0){
                        for(int i=0;i< pkgNames.length;i++){
                            pkgSet.add(pkgNames[i]);
                        }
                }
                }
                if (!array[9].isEmpty()) {
                    androidId.add(array[9]);
                }
                /*if(!array[15].isEmpty()){
                    regionSet.add(array[15]);
                }*/
                String region= array[15];

                if(StringUtils.isNotBlank(region)){
                    String [] regions = region.split("#",-1);
                    if(regions != null && regions.length != 0){
                        for(int i=0;i< regions.length;i++){
                            regionSet.add(regions[i]);
                        }
                    }
                }

                builder.setLength(0);
                builder.append(detailOutPath)
                        .append(",")               //用于分隔输出路径和输出数据真实key,逗号左边为路径,后边为key
                        .append(key.toString())    //device_id + "\t" + device_type
                        .append("\t")
                        .append(array[0]);         //platform
                outKey.set(builder.toString());
                outValue.set(MRUtils.JOINER.join(
                        array[10], // time
                        array[2],  // ip
                        array[11], // geo
                        array[12], // longitude
                        array[13] // latitude
                ));

                String segment = array[14];
                if (segment.startsWith("[") && segment.contains("{")) {
                    SegmentVO segmentVO = null;
                    JsonArray segArray = GsonUtil.String2JsonArray(segment);
                    for (JsonElement segElement : segArray) {
                        try {
                            segmentVO = GsonUtil.fromJson(segElement, SegmentVO.class);
                            segmentMap.put(segmentVO.getId(), segmentVO);
                        } catch (Exception e) {
                            e.printStackTrace();
                            context.getCounter("DMP", "segment id error").increment(1l);
                        }
                    }
                }
                context.write(outKey, outValue);
            }
            if (outFields != null) {
                outFields[9] = objectMapper.writeValueAsString(pkgSet);
                outFields[10] = MRUtils.join(androidId, ",");
                if (!segmentMap.isEmpty()) {
                    outFields[12] = objectMapper.writeValueAsString(segmentMap.values());
                } else {
                    outFields[12] = "";
                }

                if (!regionSet.isEmpty()) {
                    outFields[13] = objectMapper.writeValueAsString(regionSet);
                } else {
                    outFields[13] = "";
                }
                outKey.set(outPath + ", ");
                outValue.set(MRUtils.join(outFields, "\t"));
                context.write(outKey, outValue);
            }
        }
    }


    @Override
    public int run(String[] args) throws Exception {
        Configuration conf = getConf();
        conf.set("mapreduce.map.speculative", "true");
        conf.set("mapreduce.reduce.speculative", "true");
        conf.set("mapreduce.task.io.sort.mb", "500");
        conf.set("mapreduce.reduce.shuffle.parallelcopies", "50");
        Job job = Job.getInstance(conf, "dsp orc daily etl job");
        job.setJarByClass(DspDailyEtlMR3Demo.class);

        setInputPath(job, args);
        setOutputPath(job, args);

        Path outputPath = FileOutputFormat.getOutputPath(job);
        FileSystem fileSystem = outputPath.getFileSystem(job.getConfiguration());
        if (fileSystem.exists(outputPath)) {
            fileSystem.delete(outputPath, true);
        }

        job.setMapperClass(DspDailyEtlMapper3.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        job.setReducerClass(DspDailyEtlReducer3.class);
        job.setOutputFormatClass(TextMultipleOutputFormat.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        return job.waitForCompletion(true) ? 0 : 1;
    }


    protected void setOutputPath(Job job, String[] args) throws IOException {
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        FileOutputFormat.setCompressOutput(job, true);
        FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
    }

    protected void setInputPath(Job job, String[] args) throws IOException {
        FileInputFormat.addInputPath(job, new Path(args[0]));
    }


    public static void main(String[] args) throws Exception {
        System.exit(ToolRunner.run(new Configuration(), new DspDailyEtlMR3Demo(), args));
    }
}