更新 app.py

This commit is contained in:
ljj 2025-07-11 15:14:19 +08:00
parent b4bd2ad13f
commit 22de3cf7a5
1 changed files with 219 additions and 351 deletions

570
app.py
View File

@ -1,351 +1,219 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
""" """
@Project : data_upload_download @Project : data_upload_download
@File : app.py @File : app.py
@IDE : PyCharm @IDE : PyCharm
@Author : liu jian jiang @Author : liu jian jiang
@Date : 2025/6/17 11:23 @Date : 2025/6/17 11:23
""" """
import os import os
import uuid import uuid
import math import math
import json import json
import logging import logging
from datetime import datetime, timedelta from datetime import datetime, timedelta
from public_func import MySQLPool from public_func import MySQLPool
from logging.handlers import RotatingFileHandler from logging.handlers import RotatingFileHandler
from flask import Flask, request, jsonify, send_file, render_template, redirect, url_for from flask import Flask, request, jsonify, send_file, render_template, redirect, url_for
from flask_cors import CORS from flask_cors import CORS
db = MySQLPool() db = MySQLPool()
app = Flask(__name__) app = Flask(__name__)
CORS(app) CORS(app)
# 配置日志 # 配置日志
handler = RotatingFileHandler( handler = RotatingFileHandler(
'./logs/flask_app.log', './logs/flask_app.log',
maxBytes=1024 * 1024 * 1024, maxBytes=1024 * 1024 * 1024,
backupCount=5 backupCount=5
) )
handler.setFormatter(logging.Formatter( handler.setFormatter(logging.Formatter(
'%(asctime)s %(levelname)s: %(message)s' '%(asctime)s %(levelname)s: %(message)s'
)) ))
app.logger.addHandler(handler) app.logger.addHandler(handler)
app.logger.setLevel(logging.INFO) app.logger.setLevel(logging.INFO)
# H5 项目接口 # H5 项目接口
@app.route('/h5', methods=['GET']) @app.route('/h5', methods=['GET'])
def get_h5_account_info(): def get_h5_account_info():
sql_str = "select account_id,keep_rate,task_total,account_name,is_active, countries, link_url from account_h5_info" sql_str = "select account_id,keep_rate,task_total,account_name,is_active, countries, link_url from account_h5_info"
try: try:
result = db.execute_query(sql_str) result = db.execute_query(sql_str)
return render_template('account_h5_info.html', return render_template('account_h5_info.html',
users=[ users=[
{'account_id': str(x[0]), 'keep_rate': x[1], "task_total": x[2], "account_name": x[3], {'account_id': str(x[0]), 'keep_rate': x[1], "task_total": x[2], "account_name": x[3],
"countries": x[5], "link_url": x[6], "is_active": x[4]} "countries": x[5], "link_url": x[6], "is_active": x[4]}
for x in for x in
result]) result])
except Exception as e: except Exception as e:
app.logger.info("获取浏览器信息失败,失败原因:{}".format(str(e))) app.logger.info("获取浏览器信息失败,失败原因:{}".format(str(e)))
return jsonify({}) return jsonify({})
@app.route('/add_h5_user', methods=['GET', 'POST']) @app.route('/add_h5_user', methods=['GET', 'POST'])
def add_h5_user(): def add_h5_user():
if request.method == 'POST': if request.method == 'POST':
try: try:
keep_rate = request.form['keep_rate'] keep_rate = request.form['keep_rate']
task_total = request.form['task_total'] task_total = request.form['task_total']
account_name = request.form['account_name'] account_name = request.form['account_name']
countries = request.form['countries'] countries = request.form['countries']
link_url = request.form['link_url'] link_url = request.form['link_url']
sql_str = "insert into account_h5_info(account_id,keep_rate,task_total,account_name,countries,link_url) values(%s,%s,%s,%s,%s,%s)" sql_str = "insert into account_h5_info(account_id,keep_rate,task_total,account_name,countries,link_url) values(%s,%s,%s,%s,%s,%s)"
db.execute_single(sql_str, (uuid.uuid1(), keep_rate, task_total, account_name, countries, link_url)) db.execute_single(sql_str, (uuid.uuid1(), keep_rate, task_total, account_name, countries, link_url))
return redirect(url_for('get_h5_account_info')) return redirect(url_for('get_h5_account_info'))
except Exception as e: except Exception as e:
app.logger.info("<UNK>{}".format(str(e))) app.logger.info("<UNK>{}".format(str(e)))
return render_template('add_h5_user.html') return render_template('add_h5_user.html')
@app.route('/edit_user', methods=['GET', 'POST']) @app.route('/edit_user', methods=['GET', 'POST'])
def edit_user(): def edit_user():
account_id = request.args.get('account_id') account_id = request.args.get('account_id')
if request.method == 'POST': if request.method == 'POST':
account_id = request.form['account_id'] account_id = request.form['account_id']
keep_rate = request.form['keep_rate'] keep_rate = request.form['keep_rate']
task_total = int(request.form['task_total']) task_total = int(request.form['task_total'])
account_name = request.form['account_name'] account_name = request.form['account_name']
countries = request.form['countries'] countries = request.form['countries']
link_url = request.form['link_url'] link_url = request.form['link_url']
is_active = request.form['is_active'] is_active = request.form['is_active']
update_time = datetime.now() update_time = datetime.now()
sql_str = "UPDATE account_h5_info set keep_rate=%s, task_total=%s, account_name=%s ,is_active=%s ,countries=%s, link_url=%s,update_time=%s where account_id=%s" sql_str = "UPDATE account_h5_info set keep_rate=%s, task_total=%s, account_name=%s ,is_active=%s ,countries=%s, link_url=%s,update_time=%s where account_id=%s"
db.execute_single(sql_str, (keep_rate, task_total, account_name, is_active, countries, link_url, update_time, account_id)) db.execute_single(sql_str, (keep_rate, task_total, account_name, is_active, countries, link_url, update_time, account_id))
return redirect(url_for('get_h5_account_info')) return redirect(url_for('get_h5_account_info'))
sql_str = "select account_id,keep_rate,task_total,account_name,is_active,countries, link_url from account_h5_info where account_id=%s" sql_str = "select account_id,keep_rate,task_total,account_name,is_active,countries, link_url from account_h5_info where account_id=%s"
result = db.execute_query(sql_str, (account_id,)) result = db.execute_query(sql_str, (account_id,))
return render_template('edit_h5_user.html', return render_template('edit_h5_user.html',
users={'account_id': result[0][0], 'keep_rate': result[0][1], 'task_total': result[0][2], users={'account_id': result[0][0], 'keep_rate': result[0][1], 'task_total': result[0][2],
'account_name': result[0][3], "is_active": result[0][4], "countries": result[0][5], 'account_name': result[0][3], "is_active": result[0][4], "countries": result[0][5],
"link_url": result[0][6]}) "link_url": result[0][6]})
@app.route('/download_file', methods=['GET']) @app.route('/download_file', methods=['GET'])
def download_file(): def download_file():
file_name = request.args.get('fileName') file_name = request.args.get('fileName')
app.logger.info("download_file 接口被调用,开始下载文件:{}".format(file_name)) app.logger.info("download_file 接口被调用,开始下载文件:{}".format(file_name))
path = os.path.join(os.getcwd(), "file/h5") path = os.path.join(os.getcwd(), "file/h5")
file_path = os.path.join(path, f"{file_name}") file_path = os.path.join(path, f"{file_name}")
if not os.path.exists(file_path): if not os.path.exists(file_path):
app.logger.info("该文件:{},不存在".format(file_name)) app.logger.info("该文件:{},不存在".format(file_name))
return jsonify({}) return jsonify({})
else: else:
return send_file(file_path, as_attachment=True) return send_file(file_path, as_attachment=True)
@app.route('/upload_browser_info', methods=['POST']) @app.route('/upload_browser_info', methods=['POST'])
def upload_browser_info(): def upload_browser_info():
try: try:
params = json.loads(request.data) params = json.loads(request.data)
visitor_id = params["fingerprint"]['visitorId'] visitor_id = params["fingerprint"]['visitorId']
app.logger.info("接口upload_browser_info 被调用,传入参数为:{}".format(str(visitor_id))) app.logger.info("接口upload_browser_info 被调用,传入参数为:{}".format(str(visitor_id)))
# 对数据进行保存 # 对数据进行保存
sql_str = "insert into task_h5_browser_info(visitor_id,browser_info) values(%s,%s)" sql_str = "insert into task_h5_browser_info(visitor_id,browser_info) values(%s,%s)"
db.execute_single(sql_str, (visitor_id, json.dumps(params))) db.execute_single(sql_str, (visitor_id, json.dumps(params)))
return jsonify({'status': 'ok'}) return jsonify({'status': 'ok'})
except Exception as e: except Exception as e:
if "PRIMARY" in str(e): if "PRIMARY" in str(e):
return jsonify({'status': 'ok'}) return jsonify({'status': 'ok'})
app.logger.info("浏览器信息保存失败,失败原因:{}".format(str(e))) app.logger.info("浏览器信息保存失败,失败原因:{}".format(str(e)))
return jsonify({'status': 'error', 'msg': str(e)}) return jsonify({'status': 'error', 'msg': str(e)})
@app.route('/get_browser_info', methods=['GET']) @app.route('/get_browser_info', methods=['GET'])
def get_browser_info(): def get_browser_info():
sql_str = "select browser_info from task_h5_browser_info ORDER BY RAND() LIMIT 1" sql_str = "select browser_info from task_h5_browser_info ORDER BY RAND() LIMIT 1"
try: try:
result = db.execute_query(sql_str, ) result = db.execute_query(sql_str, )
app.logger.info("get_browser_info:获取数据长度为:{}".format(len(result))) app.logger.info("get_browser_info:获取数据长度为:{}".format(len(result)))
return jsonify(json.loads(result[0][0])) return jsonify(json.loads(result[0][0]))
except Exception as e: except Exception as e:
app.logger.info("获取浏览器信息失败,失败原因:{}".format(str(e))) app.logger.info("获取浏览器信息失败,失败原因:{}".format(str(e)))
return jsonify({'status': "error", "message": str(e)}) return jsonify({'status': "error", "message": str(e)})
@app.route('/get_account', methods=['GET']) @app.route('/get_account', methods=['GET'])
def get_account_id(): def get_account_id():
sql_str = "select account_id from account_h5_info where is_active=1 ORDER BY RAND() LIMIT 1" sql_str = "select account_id from account_h5_info where is_active=1 ORDER BY RAND() LIMIT 1"
result = db.execute_query(sql_str, ) result = db.execute_query(sql_str, )
if result: if result:
return jsonify({'accountId': result[0][0]}) return jsonify({'accountId': result[0][0]})
return jsonify({'status': "error", "message": "<UNK>"}) return jsonify({'status': "error", "message": "<UNK>"})
@app.route('/h5_issue_task', methods=['GET']) @app.route('/h5_issue_task', methods=['GET'])
def issue_task(): def issue_task():
account_id = request.args.get('accountId') account_id = request.args.get('accountId')
sql_str = "select account_id,keep_rate,task_total,account_name,is_active from account_h5_info where account_id=%s" sql_str = "select account_id,keep_rate,task_total,account_name,is_active from account_h5_info where account_id=%s"
result = db.execute_query(sql_str, (account_id,)) result = db.execute_query(sql_str, (account_id,))
if result: if result:
task_total = int(result[0][2]) task_total = int(result[0][2])
app.logger.info("账户:{}需要执行新增任务总数为:{}".format(account_id, task_total)) app.logger.info("账户:{}需要执行新增任务总数为:{}".format(account_id, task_total))
now_date = datetime.now().strftime("%Y-%m-%d") now_date = datetime.now().strftime("%Y-%m-%d")
sql_str = "select execute_task_number,id from task_execute_info where account_id=%s and save_date=%s" sql_str = "select execute_task_number,id from task_execute_info where account_id=%s and save_date=%s"
result = db.execute_query(sql_str, (account_id, now_date)) result = db.execute_query(sql_str, (account_id, now_date))
execute_task_number = 0 execute_task_number = 0
if result: if result:
execute_task_number = result[0][0] execute_task_number = result[0][0]
must_task_number = math.ceil((task_total / 24) * int(datetime.today().hour)) must_task_number = math.ceil((task_total / 24) * int(datetime.today().hour))
app.logger.info("账户:{}已执行任务个数为:{}".format(account_id, execute_task_number)) app.logger.info("账户:{}已执行任务个数为:{}".format(account_id, execute_task_number))
app.logger.info("账户:{} 按照计划应该执行行任务个数为:{}".format(account_id, must_task_number)) app.logger.info("账户:{} 按照计划应该执行行任务个数为:{}".format(account_id, must_task_number))
if execute_task_number >= must_task_number * 1.2: if execute_task_number >= must_task_number * 1.2:
app.logger.info( app.logger.info(
"账户:{},已经执行新增任务{}个,执行速度过快,需要暂停执行".format(str(account_id), execute_task_number)) "账户:{},已经执行新增任务{}个,执行速度过快,需要暂停执行".format(str(account_id), execute_task_number))
return jsonify({"message": "账户:{} 执行任务过快,暂停执行".format(account_izd)}), 403 return jsonify({"message": "账户:{} 执行任务过快,暂停执行".format(account_izd)}), 403
execute_task_number += 1 execute_task_number += 1
if result: if result:
sql_str = "UPDATE task_execute_info SET execute_task_number = %s,update_time=%s WHERE account_id = %s and save_date=%s" sql_str = "UPDATE task_execute_info SET execute_task_number = %s,update_time=%s WHERE account_id = %s and save_date=%s"
db.execute_update(sql_str, (execute_task_number, datetime.now(), account_id, now_date)) db.execute_update(sql_str, (execute_task_number, datetime.now(), account_id, now_date))
else: else:
sql_str = "insert into task_execute_info(account_id,execute_task_number,save_date) values(%s, %s, %s)" sql_str = "insert into task_execute_info(account_id,execute_task_number,save_date) values(%s, %s, %s)"
db.execute_update(sql_str, (account_id, execute_task_number, now_date)) db.execute_update(sql_str, (account_id, execute_task_number, now_date))
sql_str = "select browser_info from task_h5_browser_info ORDER BY RAND() LIMIT 1" sql_str = "select browser_info from task_h5_browser_info ORDER BY RAND() LIMIT 1"
result = db.execute_query(sql_str) result = db.execute_query(sql_str)
if result: if result:
return jsonify(json.loads(result[0][0])) return jsonify(json.loads(result[0][0]))
else: else:
app.logger.info("未获取到账户:{} 相关配置信息".format(account_id)) app.logger.info("未获取到账户:{} 相关配置信息".format(account_id))
return jsonify({'status': 'error', 'msg': '传入账号错误'}) return jsonify({'status': 'error', 'msg': '传入账号错误'})
@app.route('/upload_file', methods=['POST']) @app.route('/upload_file', methods=['POST'])
def upload_file(): def upload_file():
file = request.files['file'] file = request.files['file']
file_name = file.filename file_name = file.filename
account_id = request.args.get('accountId') account_id = request.args.get('accountId')
app.logger.info("接口upload_file 传入accountId 为:{}".format(account_id)) app.logger.info("接口upload_file 传入accountId 为:{}".format(account_id))
path = os.path.join(os.getcwd(), f"file/h5/{account_id}") path = os.path.join(os.getcwd(), f"file/h5/{account_id}")
if not os.path.exists(path): if not os.path.exists(path):
os.makedirs(path) os.makedirs(path)
file_path = os.path.join(path, file_name) file_path = os.path.join(path, file_name)
# # 需要对数据进行存储 # # 需要对数据进行存储
if os.path.exists(file_path): if os.path.exists(file_path):
app.logger.info("文件:{},当天上传过,需要先删除再保存".format(file_name)) app.logger.info("文件:{},当天上传过,需要先删除再保存".format(file_name))
os.remove(file_path) os.remove(file_path)
file.save(file_path) file.save(file_path)
app.logger.info("文件:{}已保持".format(file_name)) app.logger.info("文件:{}已保持".format(file_name))
return jsonify({'message': 'File uploaded successfully', 'path': file_path}) return jsonify({'message': 'File uploaded successfully', 'path': file_path})
# bigo 项目接口
def get_device_info(day_date, percent, android_id, i):
# 1. 计算总记录数 @app.route('/', methods=['get'])
sql_str = "select count(*) from device_info where save_date=%s and android_id=%s and is_use=%s ;" def test_post():
result = db.execute_query(sql_str, (day_date, android_id, i)) return jsonify({"key": "value"})
total_count = result[0][0]
app.logger.info(("日期:{},总共有 {} 条数据".format(day_date, total_count)))
need_deal_count = math.ceil(total_count * percent)
app.logger.info("日期:{} ,需要处理 {} 条数据".format(day_date, need_deal_count))
if need_deal_count == 0:
return []
# 计算已经处理的数量
sql_str = "select count(*) from device_info where save_date=%s and is_use=%s and android_id=%s ;"
result = db.execute_query(sql_str, (day_date, i + 1, android_id)) if __name__ == '__main__':
have_deal_count = result[0][0] app.run(host='0.0.0.0', port=5000)
app.logger.info("日期:{} ,已经处理的数据有:{}".format(day_date, have_deal_count))
if have_deal_count >= need_deal_count:
return []
# 准备需要处理的数据
limit_count = need_deal_count - have_deal_count
app.logger.info("日期:{} 还剩{}多少条数据需要处理".format(day_date, limit_count))
sql_str = "select id,android_id,device_info,package_name from device_info where save_date=%s and is_use=%s and android_id=%s limit %s;"
result = db.execute_query(sql_str, (day_date, i, android_id, limit_count))
return result[0]
@app.route('/', methods=['get'])
def test_post():
return jsonify({"key": "value"})
@app.route('/device_info_upload', methods=['POST'])
def device_info_upload():
try:
params = request.get_json()
app.logger.info("接口device_info_upload 被调用,传入参数为:{}".format(str(params)))
if not params:
return jsonify({})
android_id = request.args.get('id')
device_ip = request.args.get('deviceIp')
# 留存需要修改
if android_id == "FyZqWrStUvOpKlMn":
task_id = request.args.get('taskId')
package_name = request.args.get('packageName')
if package_name:
app.logger.info("传入参数android_id为{}package_name 为:{}".format(android_id, package_name))
app.logger.info("传入参数android_id为{}task_id为{}".format(android_id, task_id))
try:
sql_str = "INSERT INTO device_info(android_id, device_info,task_id,package_name,device_ip) VALUES (%s, %s,%s,%s,%s)"
db.execute_single(sql_str, (android_id, json.dumps(params), task_id, package_name, device_ip))
app.logger.info("android_id:{} 数据写入成功".format(str(android_id)))
return {"message": "android_id为{} 的设备信息保存成功".format(android_id)}
except Exception as e:
app.logger.info({"message": "android_id为{} 的设备信息保存失败,失败原因为:{}".format(str(android_id), str(e))})
else:
return jsonify({"status": "error", "msg": "android_id:{} 传入安装包名为{} ".format(android_id, package_name)})
return jsonify({})
except Exception as e:
app.logger.info({"message": "{}".format(str(e))})
return jsonify({})
@app.route('/device_info_download', methods=['GET'])
def device_info_download():
android_id = request.args.get('androidId')
app.logger.info("接口device_info_download 被调用,传入参数为:{}".format(str(android_id)))
try:
percent_list = [0.5, 0.3, 0.18, 0.11, 0.09, 0.06, 0.04]
for i in range(len(percent_list)):
day = (datetime.now() - timedelta(days=i + 1)).strftime('%Y-%m-%d')
result = get_device_info(day, percent_list[i], android_id=android_id, i=i)
if result:
sql_str = "UPDATE device_info SET is_use = %s WHERE id = %s"
db.execute_update(sql_str, (i + 1, result[0]))
param = json.loads(result[-2])
param["package_name"] = result[-1]
param["file_name"] = f"{result[1]}_{result[-1]}.zip"
return jsonify(json.dumps(param))
return jsonify({})
except Exception as e:
app.logger.info("获取设备:{} 的信息失败,失败原因为:{}".format(str(android_id), str(e)))
return jsonify({})
@app.route('/count_click', methods=['GET'])
def count_click_times():
click_id = request.args.get('ClickAd')
app.logger.info("接口:{}被调用传入参数click_id为{}".format(click_id, str(datetime.now())))
click_date = datetime.today().strftime('%Y%m%d%H')
try:
sql_str = "insert into count_click_times(click_id,click_date) values (%s, %s)"
db.execute_single(sql_str, (click_id, click_date))
return jsonify({"message": "click_id:{}已记录".format(click_id)})
except Exception as e:
app.logger.info({"message": "{}".format(str(e))})
return jsonify({"message": "click_id:{},数据记录失败,失败原因{}".format(click_id, str(e))}), 403
@app.route('/download_code_file', methods=['GET'])
def download_code_file():
file_name = request.args.get('file_name')
app.logger.info("download_code_file 接口被调用,开始下载文件:{}".format(file_name))
path = os.path.join(os.getcwd(), "file")
file_path = os.path.join(path, f"{file_name}")
if not os.path.exists(file_path):
app.logger.info("该文件:{},不存在".format(file_name))
return jsonify({})
else:
return send_file(file_path, as_attachment=True)
@app.route('/upload_package', methods=['POST'])
def upload_package():
app.logger.info("upload_package 接口已被调用")
file = request.files['file']
file_name = file.filename
app.logger.info("upload_package 接口被调用,开始查询该文件是否存在:{}".format(file_name))
path = os.path.join(os.getcwd(), f"file")
if not os.path.exists(path):
os.makedirs(path)
file_path = os.path.join(path, f"{file_name}")
# 需要对数据进行存储
if os.path.exists(file_path):
app.logger.info("该 package{},当天上传过,需要先删除再保存".format(file_name))
os.remove(file_path)
file.save(file_path)
app.logger.info("package:{}已保持".format(file_name))
return jsonify({'message': 'File uploaded successfully', 'path': file_path})
@app.route('/download_package', methods=['GET'])
def download_package():
app.logger.info("接口被调用")
file_name = request.args.get('file_name')
path = os.path.join(os.getcwd(), "file")
file_path = os.path.join(path, f"{file_name}")
app.logger.info(file_path)
if not os.path.exists(file_path):
app.logger.info("压缩包:{},不存在".format(file_name))
return jsonify({}), 404
else:
return send_file(file_path, as_attachment=True)
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000)