Commit 23dfd046 by AnHui

Initial commit

parents
# Default ignored files
/shelf/
/workspace.xml
# Datasource local storage ignored files
/dataSources/
/dataSources.local.xml
# Editor-based HTTP Client requests
/httpRequests/
<component name="InspectionProjectProfileManager">
<settings>
<option name="USE_PROJECT_PROFILE" value="false" />
<version value="1.0" />
</settings>
</component>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.6 (py3.6)" project-jdk-type="Python SDK" />
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/ocean_engine.iml" filepath="$PROJECT_DIR$/.idea/ocean_engine.iml" />
</modules>
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$" />
<orderEntry type="jdk" jdkName="Python 3.6 (py3.6)" jdkType="Python SDK" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
</module>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="$PROJECT_DIR$" vcs="Git" />
</component>
</project>
\ No newline at end of file
# -*- ecoding: utf-8 -*-
# @ModuleName: __init__.py
# @Function:
# @Author: 安辉
# @Time: 2022/1/17 5:55 下午
# -*- ecoding: utf-8 -*-
# @ModuleName: ocean_engine_creative
# @Function:
# @Author: 安辉
# @Time: 2022/1/17 5:55 下午
import sys
sys.path.append('../')
import json
import traceback
from datetime import datetime
from utils.common import PERIOD_TYPE, LIST_TYPE, AGGR_CATEGORY_LIST, LANDING_TYPE, AGGR_APP_CODE, VIDEO_TYPE, \
VIDEO_DURATION_TYPE, IMAGE_MODE, AGGR_BUSINESS_CODE, ORDER_BY
from utils.proxy_utils import fetch_proxy_by_service
from utils.redis_utils import RedisUtils
from utils.mongo_utils import MongoUtils
from utils.logs import OceanCreativeLogger
import requests
import smtplib
from email.mime.text import MIMEText
from email.header import Header
from email.mime.multipart import MIMEMultipart
import pandas as pd
import redis
from itertools import combinations, product
from tqdm import tqdm
from multiprocessing import Pool
class OceanCreative(object):
def __init__(self):
self.log = OceanCreativeLogger
self.db_name = 'ocean_creative'
self.db = 9
self.redis_conn = RedisUtils(db=self.db)
self.mg_collection = f'{self.db_name}_{datetime.now().strftime("%Y-%m-%d")}'
self.mongo_conn = MongoUtils(db=self.db_name, collection=self.mg_collection)
self.video_ids = []
self.date = datetime.now().strftime("%Y{y}%m{m}%d{d}").format(y="年", m="月", d="日")
self.url = 'https://cc.oceanengine.com/creative_radar_api/v1/material/list'
self.headers = {
'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/97.0.4692.71 Safari/537.36',
'referer': 'https://cc.oceanengine.com/inspiration/creative-radar/video',
'accept-language': 'zh-CN,zh;q=0.9',
}
def send_email(self):
# 发送邮箱服务器
smtp_server = "smtp.qq.com"
# 发送邮箱用户名密码
user = "984007548@qq.com"
password = "trpmnetjnvrhbedj"
# 发送和接收邮箱
sender = "984007548@qq.com"
receive = "anhui@reyun.com"
# 发送邮件主题和内容
subject = "巨量创意素材标签"
content = "<html><h1 style='color:red'>巨量创意素材标签</h1></html>"
# HTML邮件正文
# msg = MIMEText(content, 'html', 'utf-8')
# msg['Subject'] = Header(subject, 'utf-8')
# msg['From'] = "984007548@qq.com"
# msg['To'] = "anhui@reyun.com"
# SSL协议端口号要使用465
smtp = smtplib.SMTP_SSL(smtp_server, 465)
# HELO向服务器标志用户身份
smtp.helo(smtp_server)
# 服务器返回结果确认
smtp.ehlo(smtp_server)
# 登录邮箱服务器用户名密码
smtp.login(user, password)
send_file = open(r"./test.txt", "rb").read()
att = MIMEText(send_file, "base64", 'utf-8')
att['Content-Type'] = 'application/octet-stream'
att['Content-Disposition'] = f'attachment;filename="{self.date}巨量创意素材标签.csv"'
msgRoot = MIMEMultipart()
msgRoot.attach(MIMEText(content, 'html', 'utf-8'))
msgRoot['Subject'] = subject
msgRoot['From'] = sender
msgRoot['To'] = ','.join(receive)
msgRoot.attach(att)
self.log.info("Send email start...")
smtp.sendmail(sender, receive, msgRoot.as_string())
smtp.quit()
self.log.info("email send end!")
def test_params(self, list_type, aggr_category_list):
params = (
('list_type', list_type),
('material_type', '3'),
('order_by', 'total_play'),
('period_type', '3'),
('aggr_app_code', '4'),
('aggr_category_list', aggr_category_list),
('video_type', '[]'),
('landing_type', '[]'),
('limit', '24'),
('page', '1'),
('video_duration_type', '5'),
)
return params
def get_params(self, parameter, page):
meta = {}
params = (
('list_type', parameter[0]),
('material_type', '3'),
('order_by', 'total_play'),
('period_type', '3'),
('aggr_app_code', parameter[1]),
('aggr_category_list', parameter[2]),
('video_type', [parameter[3]]),
('landing_type', '[]'),
# ('image_mode', parameter.get('image_mode')),
('video_duration_type', '5'),
('aggr_business_code', parameter[4]),
('limit', '24'),
('page', page),
)
meta['ad_type_id'] = parameter[0]
meta['media_id'] = parameter[1]
meta['video_type_id'] = parameter[3]
return params, meta
def parse(self, data):
self.log.info('正在解析数据...')
ad_data = data.get('data', {})
materials = ad_data.get('materials', {})
if not materials:
self.log.info(f'data:{data}')
self.log.info('当前参数组合暂无数据...')
return []
meta = data.get('meta', {})
results = []
for item in materials:
result = {}
# 广告类型
ad_type_id = meta.get('ad_type_id', '')
ad_type = LIST_TYPE.get(ad_type_id, '')
# 素材ID
material_id = item.get('material_id', '')
# 行业
first_industry = item.get('first_game_ad_industry_name', '')
second_industry = item.get('second_game_ad_industry_name', '')
industry = f'{first_industry}-{second_industry}'
# 文案
creative = item.get('bestTitle', '')
# 素材类型
video_type_id = meta.get('video_type_id')
material_type = VIDEO_TYPE.get(video_type_id, [])
# 推广目标
landing_type = item.get('landing_type', '')
promotion_target = LANDING_TYPE.get(landing_type, '')
media_id = meta.get('media_id', '')
media = AGGR_APP_CODE.get(media_id, '')
video_id = item.get('vid', '')
material_url = f'https://api.amemv.com/aweme/v1/play/?video_id={video_id}'
result['material_id'] = material_id
result['creative'] = creative
result['material_type'] = material_type
result['ad_type'] = ad_type
result['industry'] = industry
result['promotion_target'] = promotion_target
result['media'] = media
result['material_url'] = material_url
# if video_id in self.video_ids:
# self.log.info('数据重复,已过滤...')
# continue
# if video_id not in self.video_ids:
# self.video_ids.append(video_id)
# self.mongo_conn.insert(result)
# self.log.info(f'新增{len(self.video_ids)}条数据')
results.append(result)
self.mongo_conn.insert(results)
self.log.info('数据解析完毕...')
def get_response(self, params, meta=None):
try:
_, _, _, proxies = fetch_proxy_by_service()
proxies = proxies if sys.platform not in ['win32', 'darwin'] else None
response = requests.get(url=self.url, headers=self.headers, params=params, proxies=proxies, verify=False,
timeout=5)
data = response.json()
data['meta'] = meta
if not meta:
return data
except:
traceback.print_exc()
else:
self.parse(data)
def run(self, parameter):
page = 1
self.log.info(f'当前参数组合:{parameter}')
params, meta = self.get_params(parameter, page)
self.get_response(params, meta)
if __name__ == '__main__':
industry = AGGR_CATEGORY_LIST.values()
industry_ids = [i.get('child_ids') for i in industry]
parameters = product(LIST_TYPE.keys(), [4, 8], industry_ids, VIDEO_TYPE.keys(), AGGR_BUSINESS_CODE.keys())
parameters = list(parameters)
parameter = parameters.pop()
oc = OceanCreative()
while True:
lens = len(parameters)
if lens <= 0:
break
batch = 10 if lens >= 10 else lens
pool = Pool(batch)
pool.apply_async(oc.run, (parameter,))
pool.close()
pool.join()
# -*- ecoding: utf-8 -*-
# @ModuleName: test
# @Function:
# @Author: 安辉
# @Time: 2022/1/19 5:49 下午
from itertools import combinations, product
from multiprocessing import Pool
import os, time
from tqdm import tqdm
import time
tmp = product((1, 2, 3), [4, 5], (6, 7))
print(list(tmp))
# for i in tqdm(range(100)):
# # print('测试')
# time.sleep(0.1)
# print('')
# def work(n):
# print('%s run' % os.getpid())
# time.sleep(3)
# return n ** 2
#
#
# if __name__ == '__main__':
# p = Pool(3) # 进程池中从无到有创建三个进程,以后一直是这三个进程在执行任务
# res_l = []
# for i in range(10):
# res = p.apply_async(work, args=(i,)) # 同步运行,阻塞、直到本次任务执行完毕拿到res
# res_l.append(res)
#
# # 异步apply_async用法:如果使用异步提交的任务,主进程需要使用jion,等待进程池内任务都处理完,然后可以用get收集结果,否则,主进程结束,进程池可能还没来得及执行,也就跟着一起结束了
# p.close()
# p.join()
# for res in res_l:
# print(res.get()) # 使用get来获取apply_aync的结果,如果是apply,则没有get方法,因为apply是同步执行,立刻获取结果,也根本无需get
hosts=(
39.104.17.39
)
# 本地目录
local_dir="/Users/anhui/codes/ocean_engine/"
#远程目录
target_dir="/data/ocean_engine"
#远程服务器登陆名
username="root"
# 远程服务器ip
#ip="39.104.170.192"
run_rsync(){
# ssh $username@$1 "sudo supervisorctl stop ry_load:"
# ssh $username@$1 "sudo supervisorctl stop wechat:"
rsync -avh --rsync-path="sudo rsync" \
--exclude="__pycache__" \
--exclude=".git" \
--exclude=".idea" \
--exclude=".gitignore" \
--exclude="update.sh" \
$local_dir \
$username@$1:$target_dir
# ssh $username@$1 "sudo supervisorctl restart ry_load:"
# ssh $username@$1 "sudo supervisorctl start wechat:"
}
for host in $hosts;do
run_rsync $host
echo $host "ok"
done
\ No newline at end of file
# -*- ecoding: utf-8 -*-
# @ModuleName: __init__.py
# @Function:
# @Author: 安辉
# @Time: 2022/1/17 5:51 下午
This diff is collapsed. Click to expand it.
# -*- ecoding: utf-8 -*-
# @ModuleName: logs
# @Function:
# @Author: 安辉
# @Time: 2022/1/19 3:01 下午
import re
import logging
import sys
from logging.handlers import TimedRotatingFileHandler
if sys.platform in ["win32", "darwin"]:
PRJ_DEBUG = True
else:
PRJ_DEBUG = False
class Log(object):
"""日志类"""
def __init__(self, name, filename, level=logging.INFO):
self.logger = logging.getLogger(name)
# 控制日志文件中记录级别
self.formatter = logging.Formatter('%(levelname)s|%(asctime)s|%(module)s|%(funcName)s|%(message)s')
# 生产环境
if not PRJ_DEBUG:
# 日志保留3天,一天保存一个文件
self.fh = TimedRotatingFileHandler(filename, when='midnight', interval=1, backupCount=3)
# 删除设置
self.fh.suffix = '%Y-%m-%d.log'
self.fh.extMatch = re.compile(r'^\d{4}-\d{2}-\d{2}.log$')
# 定义日志文件中格式
self.fh.setFormatter(self.formatter)
self.logger.addHandler(self.fh)
self.logger.setLevel(level)
else:
# 控制输出到控制台日志格式、级别
self.ch = logging.StreamHandler()
self.ch.setFormatter(self.formatter)
self.logger.addHandler(self.ch)
self.logger.setLevel(logging.DEBUG)
def debug(self, msg):
self.logger.debug(msg)
def warning(self, msg):
self.logger.warning(msg)
def info(self, msg):
self.logger.info(msg)
def error(self, msg):
self.logger.error(msg)
def critical(self, msg):
self.logger.critical(msg)
def exception(self, msg):
self.logger.exception(msg)
def close(self):
self.logger.removeHandler(self.fh)
OceanCreativeLogger = Log('ocean_creative', '/data/logs/ocean_engine/ocean_creative.log')
# -*- ecoding: utf-8 -*-
# @ModuleName: mongo_utils
# @Function:
# @Author: 安辉
# @Time: 2022/1/18 5:30 下午
import sys
import pymongo
if sys.platform in ['win32', 'darwin']:
MONGO_CONF = {
'host': '39.99.32.199',
'port': 27016,
}
mongo_client = pymongo.MongoClient(host=MONGO_CONF['host'], port=MONGO_CONF['port'])
else:
MONGO_CONF = {
'host': '172.24.24.229',
'port': 27016,
'user': 'root',
'passwd': 'liujiatian@reyun'
}
mongo_client = pymongo.MongoClient(host=MONGO_CONF['host'], port=MONGO_CONF['port'], username=MONGO_CONF['user'],
password=MONGO_CONF['passwd'])
class MongoUtils(object):
def __init__(self, db=None, collection=None):
self.mongo_db = mongo_client[db]
self.collection = self.mongo_db[collection]
def insert(self, data):
if isinstance(data, list):
self.collection.insert_many(data)
elif isinstance(data, dict):
self.collection.insert(data)
def close_db(self):
mongo_client.close()
# -*- ecoding: utf-8 -*-
# @ModuleName: proxy_utils.py
# @Function:
# @Author: 安辉
# @Time: 2021/11/17 15:00
import sys
sys.path.append('../')
import time
import re
import random
from base64 import urlsafe_b64encode
from utils.redis_utils import redis_db3_conn, redis_db0_conn
CITY_IP_CONFIG = {
'池州': ['2001', '2002', '2003', '2004', '2005', '2006', '2007', '2008', '2009', '2010'],
'黄冈': ['2011', '2012', '2013', '2014', '2015', '2016', '2017', '2018', '2019', '2020'],
'扬州': ['2021', '2022', '2023', '2024', '2025', '2026', '2027', '2028', '2029', '2030'],
'吉安': ['2031', '2032', '2033', '2034', '2035', '2036', '2037', '2038', '2039', '2040'],
'九江': ['2041', '2042', '2043', '2044', '2045', '2046', '2047', '2048', '2049', '2050'],
'芜湖': ['2051', '2052', '2053', '2054', '2055', '2056', '2057', '2058', '2059', '2060'],
'鹰潭': ['2061', '2062', '2063', '2064', '2065', '2066', '2067', '2068', '2069', '2070'],
'丽水': ['2071', '2072', '2073', '2074', '2075', '2076', '2077', '2078', '2079', '2080'],
'台州': ['2081', '2082', '2083', '2084', '2085', '2086', '2087', '2088', '2089', '2090'],
'绍兴': ['2091', '2092', '2093', '2094', '2095', '2096', '2097', '2098', '2099', '2100'],
}
def fetch_proxy_by_service(source='all'):
'''
通过第三方服务拿到代理
:return:
'''
# 阿布云
abuyun_proxy_list = [
'http://H60X43R32773Q47P:6740785F761EB814@http-pro.abuyun.com:9010',
'http://H8EYV13W48Y3BUXP:F5E9A0031C2682C0@http-pro.abuyun.com:9010',
'http://H52HS5328ICQ017P:6EBF9D1757F23D2E@http-pro.abuyun.com:9010',
'http://H670P7BTG7436H4P:EE57DA5267877D78@http-pro.abuyun.com:9010',
]
# 多贝云
duobei_proxy_list = [
'http://RYNETHTT3:gL0I092UHjf@http-proxy-t1.dobel.cn:9180',
'http://RYNETHTT4:gL0I092UHjf@http-proxy-t1.dobel.cn:9180',
'http://RYNETHTT5:gL0I092UHjf@http-proxy-t1.dobel.cn:9180',
'http://RYNETHTT6:gL0I092UHjf@http-proxy-t1.dobel.cn:9180',
# 'http://RYNETHTT7:gL0I092UHjf@http-proxy-t1.dobel.cn:9180', 逆向占用
]
if source == 'duobei':
proxy_list = duobei_proxy_list
elif source == 'abuyun':
proxy_list = abuyun_proxy_list
else:
proxy_list = abuyun_proxy_list + duobei_proxy_list
proxy = random.choice(proxy_list)
r = redis_db0_conn
user, password, host = re.search('http://(\w+):(\w+)@(.*)', proxy).groups()
proxy = f'http://{host}'
proxyAuth = 'Basic ' + urlsafe_b64encode(bytes((user + ':' + password), 'ascii')).decode('utf8')
full_proxy = f'http://{user}:{password}@{host}'
real_ip = r.get_real_ip(full_proxy)
proxy_request_format = {
'http': full_proxy,
'https': full_proxy
}
return proxy, real_ip, proxyAuth, proxy_request_format
last_fetch_time = 0
ip_list = []
def fetch_proxy_by_vps(city=''):
'''
通过vps获取代理IP
:return:
'''
global last_fetch_time, ip_list
current_ts = time.time()
fetch_interval = 10
redis_key = 'proxies_clean'
user = 'xzszlz'
password = 'Lsjkcbdjz666'
port = 18119
if current_ts - last_fetch_time > fetch_interval:
r = redis_db3_conn
last_fetch_time = current_ts
ip_list = r.redis_hvals(redis_key)
# if city in CITY_IP_CONFIG:
# ip_id = random.choice(CITY_IP_CONFIG[city])
# real_ip = r.redis_hget(redis_key, ip_id)
# else:
real_ip = random.choice(ip_list)
if isinstance(real_ip, bytes):
real_ip = real_ip.decode()
proxy = f'http://{real_ip}:{port}'
proxyAuth = 'Basic ' + urlsafe_b64encode(bytes((user + ':' + password), 'ascii')).decode('utf8')
full_proxy = f'http://{user}:{password}@{password}'
proxy_request_format = {
'http://': full_proxy,
'https://': full_proxy
}
return proxy, real_ip, proxyAuth, proxy_request_format
# -*- ecoding: utf-8 -*-
# @ModuleName: redis_utils
# @Function:
# @Author: 安辉
# @Time: 2022/1/18 4:31 下午
import re
import sys
import redis
if sys.platform in ['win32', 'darwin']:
# redis配置
REDIS_HOST = '39.99.32.199'
REDIS_PORT = 6379
REDIS_PARAMS = {
'password': 'reyun_adi',
}
else:
# redis配置
REDIS_HOST = 'r-hp3d05tt4mpw03sw4u.redis.huhehaote.rds.aliyuncs.com'
REDIS_PORT = 6379
REDIS_PARAMS = {
'password': 'Reyun_adi_redis',
}
REDIS_PASS = REDIS_PARAMS['password']
REDIS_DB = 0
class RedisUtils(object):
def __init__(self, db=REDIS_DB):
self.pool = redis.ConnectionPool(host=REDIS_HOST, port=REDIS_PORT, db=db, password=REDIS_PASS,
decode_responses=True)
self.conn = redis.Redis(connection_pool=self.pool)
def redis_sadd(self, key, value):
self.conn.sadd(key, value)
def get_real_ip(self, proxy: str):
if re.search(r'((?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?))',
proxy):
return re.search(
r'((?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?))',
proxy).group(1)
r = self.conn
try:
real_ip = r.hget('proxy:real_ip', proxy).decode('utf-8')
except:
real_ip = ''
return real_ip
redis_db3_conn = RedisUtils(3)
redis_db0_conn = RedisUtils(0)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment