TestJob1.java 5.99 KB
package mobvista.prd.datasource.test;

import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import mobvista.dmp.format.MultipleOrcOutputFormat;
import mobvista.prd.datasource.util.GsonUtil;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.orc.TypeDescription;
import org.apache.orc.mapred.OrcStruct;

import java.io.IOException;
import java.util.HashSet;
import java.util.Set;

public class TestJob1 extends Configured implements Tool {

    public static class MyMapper extends Mapper<LongWritable, Text, Text, OrcStruct> {


        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            FileSplit split = (FileSplit) context.getInputSplit();
            System.out.println(split.getPath().toString());
        }

        private final String schema = "struct<device_id:string,device_type:string,platform:string,package_name:string,tag_type:string,first_tag:string,second_tag:string,update_date:string>";
        OrcStruct struct = (OrcStruct) OrcStruct.createValue(TypeDescription.fromString(schema));
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            struct.setFieldValue(0, new Text("did1"));
            struct.setFieldValue(1, new Text("dtype1"));
            struct.setFieldValue(2, new Text("ios"));
            struct.setFieldValue(3, new Text("1"));
            struct.setFieldValue(4, new Text("first"));
            struct.setFieldValue(5, new Text("secondTag"));
            struct.setFieldValue(6, new Text("updateDate"));
            context.write(new Text("s3://mob-emr-test/feng.liang/studyout/dir1"), struct);
            struct.setFieldValue(0, new Text("did2"));
            struct.setFieldValue(1, new Text("dtype2"));
            struct.setFieldValue(2, new Text("ios2"));
            struct.setFieldValue(3, new Text("2"));
            struct.setFieldValue(4, new Text("first2"));
            struct.setFieldValue(5, new Text("secondTag2"));
            struct.setFieldValue(6, new Text("updateDat2e"));
            context.write(new Text("s3://mob-emr-test/feng.liang/studyout/dir2"), struct);
        }
    }

    public static class MyReducer extends Reducer<Text, Text, NullWritable, Text> {
        private StringBuffer buffer = new StringBuffer();

        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            Set<String> set = new HashSet<>();
            for (Text val : values) {
                set.addAll(parseTags(val.toString()));
            }

            buffer.setLength(0);
            buffer.append(key.toString())
                    .append("\t");
            for (String s: set) {
                buffer.append(s).append(";");
            }

//            context.write(NullWritable.get(), BytesRefUtil.createRcOutValue(buffer.toString()));
        }

        public Set<String> parseTags(String str) {
            Set<String> set = new HashSet<>();
            JsonArray array = GsonUtil.String2JsonArray(str);
            for (JsonElement element: array) {
                JsonObject obj = element.getAsJsonObject();
                JsonArray tagElement = obj.get("tag").getAsJsonArray();
                for (JsonElement ele: tagElement) {
                    String first = getJsonValue(ele, "1");
                    String second = getJsonValue(ele, "2");
                    if (StringUtils.isEmpty(second)) {
                        set.add(first);
                    } else {
                        set.add(first + "-->" + second);
                    }

                }
            }
            return set;
        }
    }



    @Override
    public int run(String[] args) throws Exception {

//        RCFileOutputFormat.setColumnNumber(this.getConf(), 2);
        String[] otherParams = new GenericOptionsParser(this.getConf(), args).getRemainingArgs();
        Job job = Job.getInstance(this.getConf(), "Testjob1");
        job.setNumReduceTasks(0);
        job.setJarByClass(TestJob1.class);
        job.setMapperClass(MyMapper.class);
//        job.setReducerClass(MyReducer.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(OrcStruct.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(OrcStruct.class);
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(MultipleOrcOutputFormat.class);

//        job.addFileToClassPath(new Path("s3://mob-emr-test/feng.liang/lib/hive-exec-0.13.1-amzn-3.jar"));
        job.addFileToClassPath(new Path("s3://mob-emr-test/dataplatform/env/hive/lib/hive-exec-2.3.3.jar"));

        FileInputFormat.addInputPath(job, new Path(otherParams[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherParams[1]));
        return job.waitForCompletion(true) ? 0 : 1;
    }


    public static void main(String[] args) throws Exception {
        ToolRunner.run(new Configuration(), new TestJob1(), args);

    }

    public static String getJsonValue(JsonElement element, String prop) {
        JsonObject obj = element.getAsJsonObject();
        JsonElement ele = obj.get(prop);
        if (ele != null) {
            return ele.getAsString();
        }
        return "";
    }
}