package mobvista.dmp.datasource.adn.mapreduce;

import mobvista.dmp.common.CommonMapReduce;
import mobvista.dmp.common.ListCache;
import mobvista.dmp.util.MRUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.IOException;
import java.util.regex.Pattern;

/**
 * author: houying
 * date  : 16-11-1
 * desc  : adn每日的安装和campaign_list联查出package name
 */
public class AdnInstallDailyMR extends Configured implements Tool {

    private static Pattern didPtn = Pattern.compile("^[0-9a-fA-F]{8}(-[0-9a-fA-F]{4}){3}-[0-9a-fA-F]{12}$");

    private static Pattern idIosPkgPtn = Pattern.compile("^id\\d+$");
    private static Pattern iosPkgPtn = Pattern.compile("^\\d+$");
    private static Pattern adrPkgPtn = Pattern.compile("^[0-9a-zA-Z\\.]+$");

    public static class CampaignIdJoinMapper extends Mapper<LongWritable, Text, Text, Text> {
        private final Text outKey = new Text();
        private final Text outValue = new Text();

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String[] array = MRUtils.SPLITTER.split(value.toString(), -1);
            if (array.length >= 59) { //install v3 log
                String campaignId = array[7];
                String gaid = array[42];
                String idfa = array[43];
                String platform = array[13];
                String requestId = array[33]; // 从extra5改为requestId  冯亮 20170616
                String imei = array[35];
                String andriodId = array[37];
                String idfv = GetDevIdUtil.getIdfv(array[55]);
                String packageName = "";
                if (array.length >= 72) {
                    if (idIosPkgPtn.matcher(array[71]).matches() || iosPkgPtn.matcher(array[71]).matches() || adrPkgPtn.matcher(array[71]).matches()) {
                        packageName = array[71];
                    }
                }
                outKey.set(campaignId);
                String extSysid = "";
                if (array.length >= 118) {
                    extSysid = array[117];
                }
                String oaid = "";
                if (array.length >= 135) {
                    oaid = array[134];
                }

                //新增自有Id
                String sysIdType = GetDevIdUtil.getExtSysId(extSysid);
                if (StringUtils.isNotBlank(sysIdType)) {
                    outValue.set(MRUtils.JOINER.join(sysIdType, platform, requestId) + "#" + packageName);
                    context.write(outKey, outValue);
                }

                String devIdType = GetDevIdUtil.getIdByIdAndPlatform(gaid, idfa, extSysid, platform);
                if (StringUtils.isNotBlank(devIdType)) {
                    outValue.set(MRUtils.JOINER.join(devIdType, platform, requestId) + "#" + packageName);
                    context.write(outKey, outValue);
                }

                /*
                if (gaid.matches(CommonMapReduce.didPtn)
                        && !gaid.equals(CommonMapReduce.allZero)) {
                    outValue.set(MRUtils.JOINER.join(gaid, "gaid", platform, requestId));
                    context.write(outKey, outValue);
                } else if (idfa.matches(CommonMapReduce.didPtn)
                        && !idfa.equals(CommonMapReduce.allZero)) {
                    outValue.set(MRUtils.JOINER.join(idfa, "idfa", platform, requestId));
                    context.write(outKey, outValue);
                } else {
                    outValue.set(MRUtils.JOINER.join(idfa, "other", platform, requestId));
                    context.write(outKey, outValue);
                }
                 */

                if (!imei.isEmpty() && imei.matches(CommonMapReduce.imeiPtn) && "android".equals(platform)) { //添加imei信息 2019.01.14
                    outValue.set(MRUtils.JOINER.join(imei, "imei", platform, requestId) + "#" + packageName);
                    context.write(outKey, outValue);
                }
                if (!andriodId.isEmpty() && andriodId.matches(CommonMapReduce.andriodIdPtn) && "android".equals(platform)) { //添加 android 信息 2019.01.14
                    outValue.set(MRUtils.JOINER.join(andriodId, "androidid", "android", requestId) + "#" + packageName);
                    context.write(outKey, outValue);
                }
                if (!oaid.isEmpty() && !oaid.equals(CommonMapReduce.allZero) && "android".equals(platform) && !oaid.matches("^0+$")) { //添加 oaid 信息 2020.07.27
                    outValue.set(MRUtils.JOINER.join(oaid, "oaid", "android", requestId) + "#" + packageName);
                    context.write(outKey, outValue);
                }

                if (!idfv.isEmpty() && !idfv.equals(CommonMapReduce.allZero) && "ios".equals(platform) && didPtn.matcher(idfv).matches()) { //添加 idfv 信息 2020.08.04
                    outValue.set(MRUtils.JOINER.join(idfv, "idfv", "ios", requestId) + "#" + packageName);
                    context.write(outKey, outValue);
                }

            } else if (array.length <= 5) { //campaign list
                String campaignId = array[0];
                String packageName = array[2];
                outKey.set(campaignId);
                outValue.set("#" + packageName);
                context.write(outKey, outValue);
            }
        }
    }

    public static class CampaignIdJoinReducer extends Reducer<Text, Text, Text, NullWritable> {
        private Text outKey = new Text();

        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            String campaignId = key.toString();
            String packageName = null;
            ListCache<String> listCache = ListCache.newStringListCache();
            for (Text value : values) {
                String tmp = value.toString();
                if (tmp.startsWith("#")) {
                    packageName = tmp.substring(1);
                } else {
                    listCache.add(value.toString()); //device_id,device_type
                }
            }
            listCache.flushAndClose();
            for (String device : listCache) {
                //去掉IOS中id开头包名中的id
                if (StringUtils.isNotBlank(packageName) && packageName.matches("^id\\d+$")) {
                    if ("ios".equals(MRUtils.SPLITTER.split(device, -1)[2])) {
                        packageName = packageName.replace("id", "");
                    }
                } else {
                    packageName = device.split("#", -1)[1];
                }
                outKey.set(MRUtils.JOINER.join(
                        device.split("#", -1)[0],
                        campaignId,
                        packageName
                ));
                //device_id, device_type, platform, request_id, campaign_id, package_name
                context.write(outKey, NullWritable.get());
            }
        }
    }

    @Override
    public int run(String[] args) throws Exception {
        Configuration conf = getConf();
        conf.set("mapreduce.map.speculative", "true");
        conf.set("mapreduce.reduce.speculative", "true");
        conf.set("mapreduce.task.io.sort.mb", "500");
        conf.set("mapreduce.reduce.java.opts", "-Xmx1536m");
        conf.set("mapreduce.reduce.memory.mb", "2048");
        conf.set("mapreduce.reduce.shuffle.parallelcopies", "50");
        Job job = Job.getInstance(conf, "adn install daily");
        job.setJarByClass(AdnInstallDailyMR.class);
        Path outputPath = new Path(args[2]);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileInputFormat.addInputPath(job, new Path(args[1]));
        FileSystem fileSystem = outputPath.getFileSystem(conf);
        if (fileSystem.exists(outputPath)) {
            fileSystem.delete(outputPath, true);
        }
        FileOutputFormat.setOutputPath(job, outputPath);

        job.setMapperClass(CampaignIdJoinMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);

        job.setNumReduceTasks(10);

        job.setReducerClass(CampaignIdJoinReducer.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);

        return job.waitForCompletion(true) ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        System.exit(ToolRunner.run(new Configuration(), new AdnInstallDailyMR(), args));
    }
}