package mobvista.prd.datasource.udf; import com.google.gson.JsonArray; import com.google.gson.JsonElement; import com.google.gson.JsonObject; import mobvista.prd.datasource.util.GsonUtil; import org.apache.hadoop.hive.ql.exec.UDFArgumentException; import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; import java.util.ArrayList; import java.util.List; public class ExpansionPkg extends GenericUDTF { private static final String dataSplit = "\t"; @Override public StructObjectInspector initialize(ObjectInspector[] args) throws UDFArgumentException { if (args.length !=1) { throw new UDFArgumentLengthException("ExplodeMap takes only one argument"); } if (args[0].getCategory() != ObjectInspector.Category.PRIMITIVE) { throw new UDFArgumentException("ExplodeMap takes string as a parameter"); } List<String> fieldNames = new ArrayList<>(); List<ObjectInspector> fieldOIs = new ArrayList<>(); fieldNames.add("date"); fieldNames.add("pkg_name"); fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector); fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector); return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs); } @Override public void process(Object[] args) throws HiveException { String record = String.valueOf(args[0]); List<String> list = new ArrayList<>(); if (record != null && record.startsWith("[")) { JsonArray array = GsonUtil.String2JsonArray(record); for (JsonElement element : array) { JsonObject obj = element.getAsJsonObject(); String date = obj.get("date").getAsString(); String pkg = obj.get("package_name").getAsString(); list.clear(); list.add(date); list.add(pkg); forward(list.toArray()); } } else { list.clear(); list.add(""); list.add(""); forward(list.toArray()); } } @Override public void close() throws HiveException { } }