issue_task_h5/app.py

254 lines
12 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
"""
@Project : data_upload_download
@File : app.py
@IDE : PyCharm
@Author : liu jian jiang
@Date : 2025/6/17 11:23
"""
import os
import uuid
import math
import json
import logging
from datetime import datetime, timedelta
from public_func import MySQLPool
from logging.handlers import RotatingFileHandler
from flask import Flask, request, jsonify, send_file, render_template, redirect, url_for
from flask_cors import CORS
db = MySQLPool()
app = Flask(__name__)
CORS(app)
# 配置日志
handler = RotatingFileHandler(
'./logs/flask_app.log',
maxBytes=1024 * 1024 * 1024,
backupCount=5
)
handler.setFormatter(logging.Formatter(
'%(asctime)s %(levelname)s: %(message)s'
))
app.logger.addHandler(handler)
app.logger.setLevel(logging.INFO)
# H5 项目接口
@app.route('/h5', methods=['GET'])
def get_h5_account_info():
sql_str = "select offer_id,keep_rate,task_total,account_name,is_active, countries, link_url from account_h5_info"
try:
result = db.execute_query(sql_str)
return render_template('account_h5_info.html',
users=[
{'offer_id': str(x[0]), 'keep_rate': x[1], "task_total": x[2], "account_name": x[3],
"countries": x[5], "link_url": x[6], "is_active": x[4]}
for x in
result])
except Exception as e:
app.logger.info("获取浏览器信息失败,失败原因:{}".format(str(e)))
return jsonify({})
@app.route('/add_h5_user', methods=['GET', 'POST'])
def add_h5_user():
if request.method == 'POST':
try:
keep_rate = request.form['keep_rate']
task_total = request.form['task_total']
account_name = request.form['account_name']
countries = request.form['countries']
link_url = request.form['link_url']
max_retries = 3 # 最大重试次数
retry_count = 0
success = False
while not success and retry_count < max_retries:
offer_id = uuid.uuid4().int
try:
sql_str = """INSERT INTO account_h5_info
(offer_id, keep_rate, task_total,
account_name, countries, link_url)
VALUES (%s, %s, %s, %s, %s, %s)"""
db.execute_single(sql_str, (offer_id, keep_rate,
task_total, account_name,
countries, link_url))
sql_str = "insert into h5_new_task_record_info(offer_id, task_total) values (%s, %s) ON DUPLICATE KEY UPDATE task_total = task_total;"
db.execute_single(sql_str, (offer_id, task_total))
success = True
return redirect(url_for('get_h5_account_info'))
except Exception as e:
if "Duplicate entry" in str(e) and str(offer_id) in str(e):
retry_count += 1
app.logger.warning(f"UUID冲突正在重试({retry_count}/{max_retries})")
continue
raise # 如果不是UUID冲突错误直接抛出异常
if not success:
raise Exception("UUID生成冲突已达到最大重试次数")
except Exception as e:
app.logger.error(f"添加H5用户失败: {str(e)}")
# 这里可以添加错误处理逻辑,比如返回错误页面或提示信息
return render_template('add_h5_user.html')
@app.route('/edit_user', methods=['GET', 'POST'])
def edit_user():
offer_id = request.args.get('offer_id')
if request.method == 'POST':
offer_id = request.form['offer_id']
keep_rate = request.form['keep_rate']
task_total = int(request.form['task_total'])
account_name = request.form['account_name']
countries = request.form['countries']
link_url = request.form['link_url']
is_active = request.form['is_active']
update_time = datetime.now()
sql_str = "UPDATE account_h5_info set keep_rate=%s, task_total=%s, account_name=%s ,is_active=%s ,countries=%s, link_url=%s,update_time=%s where offer_id=%s"
db.execute_single(sql_str, (keep_rate, task_total, account_name, is_active, countries, link_url, update_time, offer_id))
sql_str = "insert into h5_new_task_record_info(offer_id, task_total) values (%s, %s) ON DUPLICATE KEY UPDATE task_total = task_total"
db.execute_single(sql_str, (offer_id, task_total))
return redirect(url_for('get_h5_account_info'))
sql_str = "select offer_id,keep_rate,task_total,account_name,is_active,countries, link_url from account_h5_info where offer_id=%s"
result = db.execute_query(sql_str, (offer_id,))
return render_template('edit_h5_user.html',
users={'offer_id': result[0][0], 'keep_rate': result[0][1], 'task_total': result[0][2],
'account_name': result[0][3], "is_active": result[0][4], "countries": result[0][5],
"link_url": result[0][6]})
@app.route('/download_file', methods=['GET'])
def download_file():
file_name = request.args.get('fileName')
app.logger.info("download_file 接口被调用,开始下载文件:{}".format(file_name))
path = os.path.join(os.getcwd(), "file/h5")
file_path = os.path.join(path, f"{file_name}")
if not os.path.exists(file_path):
app.logger.info("该文件:{},不存在".format(file_name))
return jsonify({})
else:
return send_file(file_path, as_attachment=True)
@app.route('/upload_browser_info', methods=['POST'])
def upload_browser_info():
params = json.loads(request.data)
visitor_id = params["fingerprint"]['visitorId']
try:
app.logger.info("接口upload_browser_info 被调用,传入参数为:{}".format(str(visitor_id)))
# 对数据进行保存
sql_str = "insert into task_h5_browser_info(visitor_id,browser_info) values(%s,%s)"
db.execute_single(sql_str, (visitor_id, json.dumps(params)))
return jsonify({'status': 'ok'})
except Exception as e:
if "PRIMARY" in str(e):
sql_str = "UPDATE task_h5_browser_info SET browser_info = %s,update_time=%s WHERE visitor_id = %s "
db.execute_update(sql_str, (json.dumps(params), datetime.now(), visitor_id))
return jsonify({'status': 'ok'})
app.logger.info("浏览器信息保存失败,失败原因:{}".format(str(e)))
return jsonify({'status': 'error', 'msg': str(e)})
@app.route('/get_browser_info', methods=['GET'])
def get_browser_info():
sql_str = "select browser_info from task_h5_browser_info ORDER BY RAND() LIMIT 1"
try:
result = db.execute_query(sql_str, )
app.logger.info("get_browser_info:获取数据长度为:{}".format(len(result)))
return jsonify(json.loads(result[0][0]))
except Exception as e:
app.logger.info("获取浏览器信息失败,失败原因:{}".format(str(e)))
return jsonify({'status': "error", "message": str(e)})
@app.route('/upload_task_status', methods=['GET'])
def upload_task_status():
offer_id = request.args.get('offerId')
task_id = request.args.get('taskId')
task_status = request.args.get("taskStatus")
app.logger.info("upload_task_status <UNK>{},{},{}".format(task_status, task_id, offer_id))
try:
sql_str = "UPDATE task_execute_status SET task_status=%s WHERE offer_id=%s and task_id=%s"
db.execute_update(sql_str, (task_status, offer_id, task_id))
return jsonify({'status': 'ok'})
except Exception as e:
if offer_id in str(e):
sql_str = "insert into task_execute_status(offer_id,task_id,task_status) values(%s,%s,%s)"
db.execute_single(sql_str, (offer_id, task_id, task_status))
return jsonify({'status': 'ok'})
app.logger.info("<UNK>{}".format(str(e)))
return jsonify({'status': "error", "message": str(e)})
@app.route('/h5_issue_task', methods=['GET'])
def issue_task():
sql_str = "select offer_id,keep_rate,task_total,account_name,is_active,countries,link_url from account_h5_info where is_active=1 ORDER BY RAND() LIMIT 1"
result = db.execute_query(sql_str, )
if result:
offer_id = result[0][0]
countries = result[0][-2]
link_url = result[0][-1]
task_total = int(result[0][2])
app.logger.info("账户:{}需要执行新增任务总数为:{}".format(offer_id, task_total))
now_date = datetime.now().strftime("%Y-%m-%d")
sql_str = "select execute_task_number,id from task_execute_info where offer_id=%s and save_date=%s"
result = db.execute_query(sql_str, (offer_id, now_date))
execute_task_number = 0
if result:
execute_task_number = result[0][0]
must_task_number = math.ceil((task_total / 24) * int(datetime.today().hour))
app.logger.info("账户:{}已执行任务个数为:{}".format(offer_id, execute_task_number))
app.logger.info("账户:{} 按照计划应该执行行任务个数为:{}".format(offer_id, must_task_number))
if execute_task_number >= must_task_number * 1.2:
app.logger.info(
"账户:{},已经执行新增任务{}个,执行速度过快,需要暂停执行".format(str(offer_id), execute_task_number))
return jsonify({"message": "账户:{} 执行任务过快,暂停执行".format(account_izd)}), 403
execute_task_number += 1
if result:
sql_str = "UPDATE task_execute_info SET execute_task_number = %s,update_time=%s WHERE offer_id = %s and save_date=%s"
db.execute_update(sql_str, (execute_task_number, datetime.now(), offer_id, now_date))
else:
sql_str = "insert into task_execute_info(offer_id,execute_task_number,save_date) values(%s, %s, %s)"
db.execute_update(sql_str, (offer_id, execute_task_number, now_date))
sql_str = "select browser_info from task_h5_browser_info ORDER BY RAND() LIMIT 1"
result = db.execute_query(sql_str)
if result:
task_id = uuid.uuid4().hex
# 将任务,记录到对应表中
sql_str = "insert into task_execute_status(offer_id,task_id) values(%s,%s)"
db.execute_single(sql_str, (offer_id, task_id))
return jsonify({"link_url": link_url, "task_id": task_id,
"countries": countries, "file_name": f"{offer_id}/main.js", "browser_info": json.loads(result[0][0])})
return jsonify({'status': 'error', 'msg': '传入账号错误'})
@app.route('/upload_file', methods=['POST'])
def upload_file():
file = request.files['file']
file_name = file.filename
offer_id = request.args.get('offerId')
app.logger.info("接口upload_file 传入 offerId 为:{}".format(offer_id))
path = os.path.join(os.getcwd(), f"file/h5/{offer_id}")
if not os.path.exists(path):
os.makedirs(path)
file_path = os.path.join(path, file_name)
# # 需要对数据进行存储
if os.path.exists(file_path):
app.logger.info("文件:{},当天上传过,需要先删除再保存".format(file_name))
os.remove(file_path)
file.save(file_path)
app.logger.info("文件:{}已保持".format(file_name))
return jsonify({'message': 'File uploaded successfully', 'path': file_path})
@app.route('/', methods=['get'])
def test_post():
return jsonify({"key": "value"})
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000)