package mobvista.dmp.datasource.dsp.mapreduce;

import mobvista.dmp.common.CommonMapReduce;
import mobvista.dmp.common.CommonMapper;
import mobvista.dmp.common.InstallTotalReducer;
import mobvista.dmp.util.MRUtils;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.type.JavaType;

import java.io.IOException;
import java.util.List;

/**
 * author: houying
 * date  : 16-11-9
 * desc  : dsp request 全量安装列表
 */
public class DspReqPkgTotalMR extends CommonMapReduce {

    public static class DspReqPkgMapper extends CommonMapper {
        private ObjectMapper objectMapper;
        private JavaType javaType;

        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            objectMapper = new ObjectMapper();
            javaType = objectMapper.getTypeFactory().constructCollectionType(List.class, String.class);
        }

        private void handleDaily(String[] array, Context context) throws IOException, InterruptedException {
            if (array.length >= 14) {
                List<String> pkgList = objectMapper.readValue(array[10], javaType);
                outKey.set(MRUtils.JOINER.join(
                        array[0], array[1], array[2]
                ));
                if (pkgList != null && pkgList.size() != 0) {
                    for (String pkg : pkgList) {
                        outValue.set(MRUtils.JOINER.join(
                                pkg, array[12].substring(0, 10)
                        ));
                        context.write(outKey, outValue);
                    }
                }
            } else {
                context.getCounter("dsp", "dataLengthError").increment(1L);
            }
        }

        private void handleInstallList(String[] array, Context context) throws IOException, InterruptedException {
            outKey.set(MRUtils.JOINER.join(
                    array[0], array[1], array[2]
            ));
            outValue.set(array[3]);
            context.write(outKey, outValue);
        }

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String[] array = MRUtils.SPLITTER.split(value.toString(), -1);
            String inputFile = context.getConfiguration().get("map.input.file");
            //  support oaidmd5  2020.12.16
            if ((array[0].matches(CommonMapReduce.didPtn) && !array[0].equals(CommonMapReduce.allZero))
                    || array[0].matches(CommonMapReduce.andriodIdPtn)
                    || array[0].matches(CommonMapReduce.imeiPtn)
                    || (array[0].matches(imeiMd5Ptn) && array[1].equalsIgnoreCase(imeiMd5))
                    || (array[0].matches(imeiMd5Ptn) && array[1].equalsIgnoreCase(oaidMd5))
                    || (array[0].length() > 16)) {
                if (inputFile.contains("/etl_dsp_request_daily/")) { // daily data
                    handleDaily(array, context);
                } else if (inputFile.contains("/dm_install_list/")) {
                    handleInstallList(array, context);
                } else {
                    CommonMapReduce.setMetrics(context, "DMP", "input path error", 1);
                }
            }
        }
    }

    public DspReqPkgTotalMR(String name, Class<? extends Mapper> mapperClass, Class<? extends Reducer> reducerClass) {
        super(name, mapperClass, reducerClass);
    }

    public static void main(String[] args) throws Exception {
        start(new DspReqPkgTotalMR("dsp request package total", DspReqPkgMapper.class, InstallTotalReducer.class), args);
    }

    @Override
    protected void setOutputPath(Job job, String[] args) throws IOException {
        FileOutputFormat.setOutputPath(job, new Path(args[2]));
        FileOutputFormat.setCompressOutput(job, true);
        FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
    }

    @Override
    protected void setInputPath(Job job, String[] args) throws IOException {
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileInputFormat.addInputPath(job, new Path(args[1]));
    }
}