package common.task; import com.google.common.base.Splitter; import common.model.AppCategory; import common.model.AppInfo; import common.repository.AppCategoryRepository; import common.repository.AppInfoRepository; import common.repository.CityRepository; import dmp.model.OtPkgDevStats; import dmp.model.TagCrawlerAppsWandoujia; import dmp.repository.OtPkgDevStatsRepository; import dmp.repository.TagCrawlerAppsWandoujiaRepository; import net.sf.json.JSONArray; import net.sf.json.JSONObject; import org.anarres.lzo.LzoAlgorithm; import org.anarres.lzo.LzoDecompressor; import org.anarres.lzo.LzoInputStream; import org.anarres.lzo.LzoLibrary; import org.apache.commons.collections.map.HashedMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import util.*; import java.io.BufferedReader; import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; /** * Created by zxy on 2017/12/26. */ public class SyncAppDataTask { private static Logger logger = LoggerFactory.getLogger(SyncAppDataTask.class); @Autowired TagCrawlerAppsWandoujiaRepository tagRepository; @Autowired AppInfoRepository appInfoRepository; @Autowired AppCategoryRepository appCategoryRepository; @Autowired CityRepository cityRepository; @Autowired OtPkgDevStatsRepository otPkgDevStatsRepository; /*public void syncAppData() { System.out.println(DateUtil.getBeforeDays(1)); List<TagCrawlerAppsWandoujia> list = tagRepository.findAllByDs("2017-12-15"); //从网上找了一个一线城市以及地级市的列表 List<String> citys = cityRepository.findCitys(); List<AppInfo> appInfos = new ArrayList<>(); List<AppCategory> appCategories = appCategoryRepository.findAll(); Map<String, List<String>> map = new HashMap<>(); for (AppCategory appCategory : appCategories) { List<String> innerList = map.get(appCategory.getLevel()); if (innerList == null) { innerList = new ArrayList<>(); } innerList.add(appCategory.getName()); map.put(appCategory.getLevel(), innerList); } List<String> ry_pkgNameList = findReyunPkgList(); List<AppCategory> newCategorys = new ArrayList<>(); List<AppInfo> newInfos = new ArrayList<>(); for (TagCrawlerAppsWandoujia tag : list) { AppInfo appInfo = new AppInfo(); appInfo.setName(tag.getName()); appInfo.setCompany(tag.getMaker()); //豌豆荚爬取的都是安卓的 appInfo.setOs("Android"); appInfo.setPkgName(tag.getPkgname()); //判断这批app中是否有热云已有的 if (ry_pkgNameList.contains(tag.getPkgname())) { appInfo.setReyun(1); } appInfo.setLogoUrl(tag.getApplogo_link()); String types = tag.getMix_types(); //类别按照_分割,第一个是一级类别,第二个是二级类别,第三个是三级类别,剩下到组合起来放到其他类别中 //同时将爬取的数据中的类别字典取出来 if (!StringUtil.isEmpty(types)) { String[] typeArray = types.split("_"); String otherCase = ""; for (int i=0; i<typeArray.length; i++) { if (map.containsKey(String.valueOf(i+1)) && !map.get(String.valueOf(i+1)).contains(typeArray[i])) { List<String> mapValue = map.get(String.valueOf(i+1)); AppCategory newCate = new AppCategory(); newCate.setName(typeArray[i]); newCate.setLevel(String.valueOf(i+1)); newCategorys.add(newCate); mapValue.add(typeArray[i]); map.put(String.valueOf(i+1), mapValue); } else if (!map.containsKey(String.valueOf(i+1))) { AppCategory newCate = new AppCategory(); newCate.setName(typeArray[i]); newCate.setLevel(String.valueOf(i+1)); newCategorys.add(newCate); List<String> mapValue = new ArrayList<>(); mapValue.add(typeArray[i]); map.put(String.valueOf(i+1), mapValue); } if (i == 0) { appInfo.setFirstCate(typeArray[i]); } if (i == 1) { appInfo.setSecondCate(typeArray[i]); } if (i == 2) { appInfo.setThirdCate(typeArray[i]); } if (i > 2) { otherCase += "_" + typeArray[i]; } } if (otherCase.length() > 0) { appInfo.setOtherCate(otherCase.substring(1)); } } //看公司名字中是含有城市列表中的某个,记录app的公司所在地 for (String city : citys) { if (tag.getMaker().indexOf(city) > -1) { appInfo.setLocation(city); break; } } newInfos.add(appInfo); System.out.println(appInfo); } //把以前爬取的app不是热云的拿出来,看一下现在是不是热云的,是的话,更新进去。 List<AppInfo> oldNotRyAppList = appInfoRepository.findAppListByReyun(); System.out.println(oldNotRyAppList); for (AppInfo app : oldNotRyAppList) { if (ry_pkgNameList.contains(app.getPkgName())) { app.setReyun(1); System.out.println(app); newInfos.add(app); } } appInfoRepository.save(newInfos); appCategoryRepository.save(newCategorys); }*/ public void syncAppDeviceNum() { String ds = DateUtil.getBeforeDays(2); //ds = "2017-12-22"; System.out.println(ds); String skey = "export/pkg_dev_stats/ds=" + ds + "/"; final AwsS3Util s3Util = AwsS3Util.getInstance(); List itemList = AwsS3Util.getInstance().getS3Keys("reyunbpu", skey); if (itemList == null || itemList.size() == 0) { System.out.println("pkg_dev_stats null"); return; } for (int i = 0; i < itemList.size(); i++) { String s3key = String.valueOf(itemList.get(i)); readS3File(s3Util, s3key, "deviceNum"); } System.out.println("pkg_dev_stats end"); } public void syncAppInfo() { System.out.println("syncAppInfo start"); appInfoRepository.deleteAll(); String skey = "export/crawler_data/"; final AwsS3Util s3Util = AwsS3Util.getInstance(); List itemList = AwsS3Util.getInstance().getS3Keys("reyunbpu", skey); if (itemList == null || itemList.size() == 0) { return; } for (int i = 0; i < itemList.size(); i++) { String s3key = String.valueOf(itemList.get(i)); readS3File(s3Util, s3key, "appInfo"); } System.out.println("delete start"); //删除重复数据 try{ deleteRepeatData(); } catch (Exception e){ logger.error("delete faild " + e.getMessage(), e); } System.out.println("delete end"); } private void readS3File(AwsS3Util s3Util, String s3key, String getContent) { List<AppCategory> newCategorys = new ArrayList<>(); List<String> citys = new ArrayList<>(); List<AppCategory> appCategories = new ArrayList<>(); Map<String, List<String>> map = new HashMap<>(); List<String> ry_pkgNameList = new ArrayList<>(); if (getContent.equals("appInfo")) { citys = cityRepository.findCitys(); appCategories = appCategoryRepository.findAll(); for (AppCategory appCategory : appCategories) { List<String> innerList = map.get(appCategory.getLevel()); if (innerList == null) { innerList = new ArrayList<>(); } innerList.add(appCategory.getName()); map.put(appCategory.getLevel(), innerList); } ry_pkgNameList = findReyunPkgList(); } InputStream inputStream = null; BufferedReader br = null; try { final byte[] fileBytes = s3Util.downloadBytesFromS3("reyunbpu", s3key); inputStream = new ByteArrayInputStream(fileBytes); if (s3key.endsWith(".lzo_deflate")) { LzoAlgorithm algorithm = LzoAlgorithm.LZO1X; LzoDecompressor deCompressor = LzoLibrary.getInstance().newDecompressor(algorithm, null); LzoInputStream stream = new LzoInputStream(inputStream, deCompressor); br = new BufferedReader(new InputStreamReader(stream)); } else { br = new BufferedReader(new InputStreamReader(inputStream)); } String line = null; List<OtPkgDevStats> resultList = new ArrayList<>(); List<AppInfo> appResultList = new ArrayList<>(); while ((line = br.readLine()) != null) { // System.out.println(line); Object[] array = Splitter.on("\t").trimResults().splitToList(line).toArray(); if (array.length == 3) { OtPkgDevStats opds = new OtPkgDevStats(); opds.setPkgname(array[0].toString()); opds.setDevNum(Long.valueOf(array[1].toString())); opds.setDs(array[2].toString()); resultList.add(opds); if (resultList.size() >= 2000) { //System.out.println(resultList); otPkgDevStatsRepository.save(resultList); resultList = new ArrayList<>(); } } else if (array.length == 19) { AppInfo appInfo = generateAppInfo(array, ry_pkgNameList, map, newCategorys, citys); appResultList.add(appInfo); if (appResultList.size() >= 2000) { //System.out.println(appResultList); appInfoRepository.save(appResultList); appResultList = new ArrayList<>(); } } } if (resultList.size() > 0) { otPkgDevStatsRepository.save(resultList); } if (appResultList.size() > 0) { appInfoRepository.save(appResultList); } //System.out.println(newCategorys); if (newCategorys.size() > 0) { appCategoryRepository.save(newCategorys); } } catch (Exception e) { logger.error("读取s3文件错误", e); } finally { try { if (br != null) { br.close(); } if (inputStream != null) { inputStream.close(); } } catch (IOException e) { e.printStackTrace(); } } } public List<String> findReyunPkgList() { Map<String, String> conditions = new HashedMap(); String url = Constant.reportUrl + "/api/manager/bysql"; String sql = "select distinct pkgname from tkio.tkio_app_pkgname where pkgname!='NULL'"; conditions.put("sql", sql); conditions.put("dbtype", "mysql"); conditions.put("datatype", "list"); conditions.put("reportname", "pkgname_list"); String responseJson = HttpClientUtil.doHttpPostRequest(url, "manager", conditions); JSONObject object = JSONObject.fromObject(responseJson); JSONArray array = object.getJSONArray("val"); List<String> pkgName = new ArrayList<>(); for (int i=0; i<array.size(); i++) { JSONObject o = array.getJSONObject(i); pkgName.add(o.getString("pkgname")); } return pkgName; } public AppInfo generateAppInfo(Object[] array, List<String> ry_pkgNameList, Map<String, List<String>> map, List<AppCategory> newCategorys, List<String> citys) { AppInfo appInfo = new AppInfo(); appInfo.setName(array[4].toString()); appInfo.setCompany(array[9].toString()); //豌豆荚爬取的都是安卓的 appInfo.setOs("Android"); appInfo.setPkgName(array[6].toString()); //判断这批app中是否有热云已有的 if (ry_pkgNameList.contains(array[6].toString())) { appInfo.setReyun(1); } appInfo.setLogoUrl(array[7].toString()); String types = array[12].toString(); //类别按照_分割,第一个是一级类别,第二个是二级类别,第三个是三级类别,剩下到组合起来放到其他类别中 //同时将爬取的数据中的类别字典取出来 if (!StringUtil.isEmpty(types)) { String[] typeArray = types.split("_"); String otherCase = ""; for (int i=0; i<typeArray.length; i++) { if (map.containsKey(String.valueOf(i+1)) && !map.get(String.valueOf(i+1)).contains(typeArray[i])) { List<String> mapValue = map.get(String.valueOf(i+1)); AppCategory newCate = new AppCategory(); newCate.setName(typeArray[i]); newCate.setLevel(String.valueOf(i+1)); newCategorys.add(newCate); mapValue.add(typeArray[i]); map.put(String.valueOf(i+1), mapValue); } else if (!map.containsKey(String.valueOf(i+1))) { AppCategory newCate = new AppCategory(); newCate.setName(typeArray[i]); newCate.setLevel(String.valueOf(i+1)); newCategorys.add(newCate); List<String> mapValue = new ArrayList<>(); mapValue.add(typeArray[i]); map.put(String.valueOf(i+1), mapValue); } if (i == 0) { appInfo.setFirstCate(typeArray[i]); } if (i == 1) { appInfo.setSecondCate(typeArray[i]); } if (i == 2) { appInfo.setThirdCate(typeArray[i]); } if (i > 2) { otherCase += "_" + typeArray[i]; } } if (otherCase.length() > 0) { appInfo.setOtherCate(otherCase.substring(1)); } } //看公司名字中是含有城市列表中的某个,记录app的公司所在地 for (String city : citys) { if (array[9].toString().indexOf(city) > -1) { appInfo.setLocation(city); break; } } return appInfo; } //删除app_info的重复数据 void deleteRepeatData(){ String sql = "DELETE FROM app_info WHERE id IN (SELECT id FROM ( SELECT id as id FROM app_info" + " WHERE pkg_name IN ( SELECT pkg_name FROM app_info GROUP BY pkg_name " + "HAVING COUNT(*) > 1) AND id NOT IN ( SELECT min(id) FROM app_info GROUP BY " + "id HAVING count(*) > 1 )) b)"; DBUtil.newInstance().excute(sql); }; /*public static void main(String[] args) { SyncAppDataTask task = new SyncAppDataTask(); task.syncAppData(); }*/ }