package mobvista.dmp.datasource.ga.mapreduce.map;

import mobvista.dmp.common.CommonMapReduce;
import mobvista.dmp.datasource.ga.mapreduce.vo.TextPair;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;


import java.io.IOException;
import java.util.regex.Pattern;

/**
 *
 * Created by fl on 2017/4/26.
 */
public class GaActiveTotalMapper extends Mapper<LongWritable, Text, TextPair, Text> {

    private String dataType;
    private String dataSplit = "\t";
    private TextPair outKey = new TextPair();
    private Text outValue = new Text();
    private Pattern pattern = Pattern.compile(dataSplit);
    private Pattern dayPattern = Pattern.compile("\\|");
    private StringBuilder keyBuilder = new StringBuilder();
    private StringBuilder valBuilder = new StringBuilder();

    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        FileSplit split = (FileSplit) context.getInputSplit();
        if (split.getPath().toString().contains("dau-device-data-export")) {
            dataType = "0";   // daily data
        } else {
            dataType = "1";   // total data
        }
        System.out.println(split.getPath().toString());
    }

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String valStr = value.toString();
        try {
            String[] valSplits = null;
            if ("0".equals(dataType)) {
                valSplits = dayPattern.split(value.toString(), -1);
            } else {
                valSplits = pattern.split(value.toString(), -1);
            }

            if (valSplits.length >= 2) {
                String deviceId = valSplits[0];
                String platform = valSplits[1];

                if (StringUtils.isNotEmpty(platform) && deviceId.matches(CommonMapReduce.didPtn) &&
                        !deviceId.equals(CommonMapReduce.allZero)) {
                    keyBuilder.setLength(0);
                    keyBuilder.append(deviceId).append(dataSplit).append(platform.toLowerCase());

                    int count = 0;
                    valBuilder.setLength(0);
                    for (String split : valSplits) {
                        if (valBuilder.length() == 0) {
                            valBuilder.append(split);
                        } else {
                            if (count == 1) {
                                // convert platform to lowerCase
                                valBuilder.append(dataSplit).append(split.toLowerCase());
                            } else {
                                valBuilder.append(dataSplit).append(split);
                            }
                        }
                        count++;
                    }

                    outKey.set(keyBuilder.toString(), dataType);
                    outValue.set(valBuilder.toString());
                    context.write(outKey, outValue);
                }
            } else {
                context.getCounter("DMP", "dataLengthError").increment(1l);
            }
        } catch (Exception e) {
            System.out.println("type = " + dataType);
            System.out.println("val = " + valStr);
            throw new RuntimeException(e);
        }
    }
}