package mobvista.prd.datasource.udf;

import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import mobvista.prd.datasource.util.GsonUtil;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;

import java.util.ArrayList;
import java.util.List;

public class ExpansionPkg  extends GenericUDTF {

    private static final String dataSplit = "\t";
    @Override
    public StructObjectInspector initialize(ObjectInspector[] args) throws UDFArgumentException {
        if (args.length !=1) {
            throw new UDFArgumentLengthException("ExplodeMap takes only one argument");
        }
        if (args[0].getCategory() != ObjectInspector.Category.PRIMITIVE) {
            throw new UDFArgumentException("ExplodeMap takes string as a parameter");
        }

        List<String> fieldNames = new ArrayList<>();
        List<ObjectInspector> fieldOIs = new ArrayList<>();
        fieldNames.add("date");
        fieldNames.add("pkg_name");
        fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
        fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
        return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
    }
    @Override
    public void process(Object[] args) throws HiveException {
        String record = String.valueOf(args[0]);
        List<String> list = new ArrayList<>();
        if (record != null && record.startsWith("[")) {
            JsonArray array = GsonUtil.String2JsonArray(record);
            for (JsonElement element : array) {
                JsonObject obj = element.getAsJsonObject();
                String date = obj.get("date").getAsString();
                String pkg = obj.get("package_name").getAsString();
                list.clear();
                list.add(date);
                list.add(pkg);
                forward(list.toArray());
            }
        } else {
            list.clear();
            list.add("");
            list.add("");
            forward(list.toArray());
        }
    }

    @Override
    public void close() throws HiveException {

    }

}