254 lines
12 KiB
Python
254 lines
12 KiB
Python
# -*- coding: utf-8 -*-
|
||
"""
|
||
@Project : data_upload_download
|
||
@File : app.py
|
||
@IDE : PyCharm
|
||
@Author : liu jian jiang
|
||
@Date : 2025/6/17 11:23
|
||
"""
|
||
import os
|
||
import uuid
|
||
import math
|
||
import json
|
||
import logging
|
||
from datetime import datetime, timedelta
|
||
from public_func import MySQLPool
|
||
from logging.handlers import RotatingFileHandler
|
||
from flask import Flask, request, jsonify, send_file, render_template, redirect, url_for
|
||
from flask_cors import CORS
|
||
|
||
db = MySQLPool()
|
||
|
||
app = Flask(__name__)
|
||
CORS(app)
|
||
# 配置日志
|
||
handler = RotatingFileHandler(
|
||
'./logs/flask_app.log',
|
||
maxBytes=1024 * 1024 * 1024,
|
||
backupCount=5
|
||
)
|
||
handler.setFormatter(logging.Formatter(
|
||
'%(asctime)s %(levelname)s: %(message)s'
|
||
))
|
||
app.logger.addHandler(handler)
|
||
app.logger.setLevel(logging.INFO)
|
||
|
||
|
||
# H5 项目接口
|
||
@app.route('/h5', methods=['GET'])
|
||
def get_h5_account_info():
|
||
sql_str = "select offer_id,keep_rate,task_total,account_name,is_active, countries, link_url from account_h5_info"
|
||
try:
|
||
result = db.execute_query(sql_str)
|
||
return render_template('account_h5_info.html',
|
||
users=[
|
||
{'offer_id': str(x[0]), 'keep_rate': x[1], "task_total": x[2], "account_name": x[3],
|
||
"countries": x[5], "link_url": x[6], "is_active": x[4]}
|
||
for x in
|
||
result])
|
||
except Exception as e:
|
||
app.logger.info("获取浏览器信息失败,失败原因:{}".format(str(e)))
|
||
return jsonify({})
|
||
|
||
|
||
@app.route('/add_h5_user', methods=['GET', 'POST'])
|
||
def add_h5_user():
|
||
if request.method == 'POST':
|
||
try:
|
||
keep_rate = request.form['keep_rate']
|
||
task_total = request.form['task_total']
|
||
account_name = request.form['account_name']
|
||
countries = request.form['countries']
|
||
link_url = request.form['link_url']
|
||
max_retries = 3 # 最大重试次数
|
||
retry_count = 0
|
||
success = False
|
||
while not success and retry_count < max_retries:
|
||
offer_id = uuid.uuid4().int
|
||
try:
|
||
sql_str = """INSERT INTO account_h5_info
|
||
(offer_id, keep_rate, task_total,
|
||
account_name, countries, link_url)
|
||
VALUES (%s, %s, %s, %s, %s, %s)"""
|
||
db.execute_single(sql_str, (offer_id, keep_rate,
|
||
task_total, account_name,
|
||
countries, link_url))
|
||
sql_str = "insert into h5_new_task_record_info(offer_id, task_total) values (%s, %s) ON DUPLICATE KEY UPDATE task_total = task_total;"
|
||
db.execute_single(sql_str, (offer_id, task_total))
|
||
success = True
|
||
return redirect(url_for('get_h5_account_info'))
|
||
except Exception as e:
|
||
if "Duplicate entry" in str(e) and str(offer_id) in str(e):
|
||
retry_count += 1
|
||
app.logger.warning(f"UUID冲突,正在重试({retry_count}/{max_retries})")
|
||
continue
|
||
raise # 如果不是UUID冲突错误,直接抛出异常
|
||
|
||
if not success:
|
||
raise Exception("UUID生成冲突,已达到最大重试次数")
|
||
|
||
except Exception as e:
|
||
app.logger.error(f"添加H5用户失败: {str(e)}")
|
||
# 这里可以添加错误处理逻辑,比如返回错误页面或提示信息
|
||
|
||
return render_template('add_h5_user.html')
|
||
|
||
|
||
@app.route('/edit_user', methods=['GET', 'POST'])
|
||
def edit_user():
|
||
offer_id = request.args.get('offer_id')
|
||
if request.method == 'POST':
|
||
offer_id = request.form['offer_id']
|
||
keep_rate = request.form['keep_rate']
|
||
task_total = int(request.form['task_total'])
|
||
account_name = request.form['account_name']
|
||
countries = request.form['countries']
|
||
link_url = request.form['link_url']
|
||
is_active = request.form['is_active']
|
||
update_time = datetime.now()
|
||
sql_str = "UPDATE account_h5_info set keep_rate=%s, task_total=%s, account_name=%s ,is_active=%s ,countries=%s, link_url=%s,update_time=%s where offer_id=%s"
|
||
db.execute_single(sql_str, (keep_rate, task_total, account_name, is_active, countries, link_url, update_time, offer_id))
|
||
sql_str = "insert into h5_new_task_record_info(offer_id, task_total) values (%s, %s) ON DUPLICATE KEY UPDATE task_total = task_total"
|
||
db.execute_single(sql_str, (offer_id, task_total))
|
||
return redirect(url_for('get_h5_account_info'))
|
||
sql_str = "select offer_id,keep_rate,task_total,account_name,is_active,countries, link_url from account_h5_info where offer_id=%s"
|
||
result = db.execute_query(sql_str, (offer_id,))
|
||
return render_template('edit_h5_user.html',
|
||
users={'offer_id': result[0][0], 'keep_rate': result[0][1], 'task_total': result[0][2],
|
||
'account_name': result[0][3], "is_active": result[0][4], "countries": result[0][5],
|
||
"link_url": result[0][6]})
|
||
|
||
|
||
@app.route('/download_file', methods=['GET'])
|
||
def download_file():
|
||
file_name = request.args.get('fileName')
|
||
app.logger.info("download_file 接口被调用,开始下载文件:{}".format(file_name))
|
||
path = os.path.join(os.getcwd(), "file/h5")
|
||
file_path = os.path.join(path, f"{file_name}")
|
||
if not os.path.exists(file_path):
|
||
app.logger.info("该文件:{},不存在".format(file_name))
|
||
return jsonify({})
|
||
else:
|
||
return send_file(file_path, as_attachment=True)
|
||
|
||
|
||
@app.route('/upload_browser_info', methods=['POST'])
|
||
def upload_browser_info():
|
||
params = json.loads(request.data)
|
||
visitor_id = params["fingerprint"]['visitorId']
|
||
try:
|
||
app.logger.info("接口:upload_browser_info 被调用,传入参数为:{}".format(str(visitor_id)))
|
||
# 对数据进行保存
|
||
sql_str = "insert into task_h5_browser_info(visitor_id,browser_info) values(%s,%s)"
|
||
db.execute_single(sql_str, (visitor_id, json.dumps(params)))
|
||
return jsonify({'status': 'ok'})
|
||
except Exception as e:
|
||
if "PRIMARY" in str(e):
|
||
sql_str = "UPDATE task_h5_browser_info SET browser_info = %s,update_time=%s WHERE visitor_id = %s "
|
||
db.execute_update(sql_str, (json.dumps(params), datetime.now(), visitor_id))
|
||
return jsonify({'status': 'ok'})
|
||
app.logger.info("浏览器信息保存失败,失败原因:{}".format(str(e)))
|
||
return jsonify({'status': 'error', 'msg': str(e)})
|
||
|
||
|
||
@app.route('/get_browser_info', methods=['GET'])
|
||
def get_browser_info():
|
||
sql_str = "select browser_info from task_h5_browser_info ORDER BY RAND() LIMIT 1"
|
||
try:
|
||
result = db.execute_query(sql_str, )
|
||
app.logger.info("get_browser_info:获取数据长度为:{}".format(len(result)))
|
||
return jsonify(json.loads(result[0][0]))
|
||
except Exception as e:
|
||
app.logger.info("获取浏览器信息失败,失败原因:{}".format(str(e)))
|
||
return jsonify({'status': "error", "message": str(e)})
|
||
|
||
|
||
@app.route('/upload_task_status', methods=['GET'])
|
||
def upload_task_status():
|
||
offer_id = request.args.get('offerId')
|
||
task_id = request.args.get('taskId')
|
||
task_status = request.args.get("taskStatus")
|
||
app.logger.info("upload_task_status <UNK>{},{},{}".format(task_status, task_id, offer_id))
|
||
try:
|
||
sql_str = "UPDATE task_execute_status SET task_status=%s WHERE offer_id=%s and task_id=%s"
|
||
db.execute_update(sql_str, (task_status, offer_id, task_id))
|
||
return jsonify({'status': 'ok'})
|
||
except Exception as e:
|
||
if offer_id in str(e):
|
||
sql_str = "insert into task_execute_status(offer_id,task_id,task_status) values(%s,%s,%s)"
|
||
db.execute_single(sql_str, (offer_id, task_id, task_status))
|
||
return jsonify({'status': 'ok'})
|
||
app.logger.info("<UNK>{}".format(str(e)))
|
||
return jsonify({'status': "error", "message": str(e)})
|
||
|
||
|
||
@app.route('/h5_issue_task', methods=['GET'])
|
||
def issue_task():
|
||
sql_str = "select offer_id,keep_rate,task_total,account_name,is_active,countries,link_url from account_h5_info where is_active=1 ORDER BY RAND() LIMIT 1"
|
||
result = db.execute_query(sql_str, )
|
||
if result:
|
||
offer_id = result[0][0]
|
||
countries = result[0][-2]
|
||
link_url = result[0][-1]
|
||
task_total = int(result[0][2])
|
||
app.logger.info("账户:{}需要执行新增任务总数为:{}".format(offer_id, task_total))
|
||
now_date = datetime.now().strftime("%Y-%m-%d")
|
||
sql_str = "select execute_task_number,id from task_execute_info where offer_id=%s and save_date=%s"
|
||
result = db.execute_query(sql_str, (offer_id, now_date))
|
||
execute_task_number = 0
|
||
if result:
|
||
execute_task_number = result[0][0]
|
||
must_task_number = math.ceil((task_total / 24) * int(datetime.today().hour))
|
||
app.logger.info("账户:{}已执行任务个数为:{}".format(offer_id, execute_task_number))
|
||
app.logger.info("账户:{} 按照计划应该执行行任务个数为:{}".format(offer_id, must_task_number))
|
||
if execute_task_number >= must_task_number * 1.2:
|
||
app.logger.info(
|
||
"账户:{},已经执行新增任务{}个,执行速度过快,需要暂停执行".format(str(offer_id), execute_task_number))
|
||
return jsonify({"message": "账户:{} 执行任务过快,暂停执行".format(account_izd)}), 403
|
||
execute_task_number += 1
|
||
if result:
|
||
|
||
sql_str = "UPDATE task_execute_info SET execute_task_number = %s,update_time=%s WHERE offer_id = %s and save_date=%s"
|
||
db.execute_update(sql_str, (execute_task_number, datetime.now(), offer_id, now_date))
|
||
else:
|
||
sql_str = "insert into task_execute_info(offer_id,execute_task_number,save_date) values(%s, %s, %s)"
|
||
db.execute_update(sql_str, (offer_id, execute_task_number, now_date))
|
||
sql_str = "select browser_info from task_h5_browser_info ORDER BY RAND() LIMIT 1"
|
||
result = db.execute_query(sql_str)
|
||
if result:
|
||
task_id = uuid.uuid4().hex
|
||
# 将任务,记录到对应表中
|
||
sql_str = "insert into task_execute_status(offer_id,task_id) values(%s,%s)"
|
||
db.execute_single(sql_str, (offer_id, task_id))
|
||
return jsonify({"link_url": link_url, "task_id": task_id,
|
||
"countries": countries, "file_name": f"{offer_id}/main.js", "browser_info": json.loads(result[0][0])})
|
||
return jsonify({'status': 'error', 'msg': '传入账号错误'})
|
||
|
||
|
||
@app.route('/upload_file', methods=['POST'])
|
||
def upload_file():
|
||
file = request.files['file']
|
||
file_name = file.filename
|
||
offer_id = request.args.get('offerId')
|
||
app.logger.info("接口upload_file 传入 offerId 为:{}".format(offer_id))
|
||
path = os.path.join(os.getcwd(), f"file/h5/{offer_id}")
|
||
if not os.path.exists(path):
|
||
os.makedirs(path)
|
||
file_path = os.path.join(path, file_name)
|
||
# # 需要对数据进行存储
|
||
if os.path.exists(file_path):
|
||
app.logger.info("文件:{},当天上传过,需要先删除再保存".format(file_name))
|
||
os.remove(file_path)
|
||
|
||
file.save(file_path)
|
||
app.logger.info("文件:{}已保持".format(file_name))
|
||
return jsonify({'message': 'File uploaded successfully', 'path': file_path})
|
||
|
||
@app.route('/', methods=['get'])
|
||
def test_post():
|
||
return jsonify({"key": "value"})
|
||
|
||
|
||
if __name__ == '__main__':
|
||
app.run(host='0.0.0.0', port=5000)
|