# -*- coding: utf-8 -*- """ @Project : data_upload_download @File : app.py @IDE : PyCharm @Author : liu jian jiang @Date : 2025/6/17 11:23 """ import os import uuid import math import json import logging from datetime import datetime, timedelta from public_func import MySQLPool from logging.handlers import RotatingFileHandler from flask import Flask, request, jsonify, send_file, render_template, redirect, url_for from flask_cors import CORS db = MySQLPool() app = Flask(__name__) CORS(app) # 配置日志 handler = RotatingFileHandler( './logs/flask_app.log', maxBytes=1024 * 1024 * 1024, backupCount=5 ) handler.setFormatter(logging.Formatter( '%(asctime)s %(levelname)s: %(message)s' )) app.logger.addHandler(handler) app.logger.setLevel(logging.INFO) # H5 项目接口 @app.route('/h5', methods=['GET']) def get_h5_account_info(): sql_str = "select account_id,keep_rate,task_total,account_name,is_active, countries, link_url from account_h5_info" try: result = db.execute_query(sql_str) return render_template('account_h5_info.html', users=[ {'account_id': str(x[0]), 'keep_rate': x[1], "task_total": x[2], "account_name": x[3], "countries": x[5], "link_url": x[6], "is_active": x[4]} for x in result]) except Exception as e: app.logger.info("获取浏览器信息失败,失败原因:{}".format(str(e))) return jsonify({}) @app.route('/add_h5_user', methods=['GET', 'POST']) def add_h5_user(): if request.method == 'POST': try: keep_rate = request.form['keep_rate'] task_total = request.form['task_total'] account_name = request.form['account_name'] countries = request.form['countries'] link_url = request.form['link_url'] max_retries = 3 # 最大重试次数 retry_count = 0 success = False while not success and retry_count < max_retries: try: account_id = uuid.uuid4().hex sql_str = """INSERT INTO account_h5_info (account_id, keep_rate, task_total, account_name, countries, link_url) VALUES (%s, %s, %s, %s, %s, %s)""" db.execute_single(sql_str, (account_id, keep_rate, task_total, account_name, countries, link_url)) success = True return redirect(url_for('get_h5_account_info')) except Exception as e: if "Duplicate entry" in str(e) and "account_id" in str(e): retry_count += 1 app.logger.warning(f"UUID冲突,正在重试({retry_count}/{max_retries})") continue raise # 如果不是UUID冲突错误,直接抛出异常 if not success: raise Exception("UUID生成冲突,已达到最大重试次数") except Exception as e: app.logger.error(f"添加H5用户失败: {str(e)}") # 这里可以添加错误处理逻辑,比如返回错误页面或提示信息 return render_template('add_h5_user.html') @app.route('/edit_user', methods=['GET', 'POST']) def edit_user(): account_id = request.args.get('account_id') if request.method == 'POST': account_id = request.form['account_id'] keep_rate = request.form['keep_rate'] task_total = int(request.form['task_total']) account_name = request.form['account_name'] countries = request.form['countries'] link_url = request.form['link_url'] is_active = request.form['is_active'] update_time = datetime.now() sql_str = "UPDATE account_h5_info set keep_rate=%s, task_total=%s, account_name=%s ,is_active=%s ,countries=%s, link_url=%s,update_time=%s where account_id=%s" db.execute_single(sql_str, (keep_rate, task_total, account_name, is_active, countries, link_url, update_time, account_id)) return redirect(url_for('get_h5_account_info')) sql_str = "select account_id,keep_rate,task_total,account_name,is_active,countries, link_url from account_h5_info where account_id=%s" result = db.execute_query(sql_str, (account_id,)) return render_template('edit_h5_user.html', users={'account_id': result[0][0], 'keep_rate': result[0][1], 'task_total': result[0][2], 'account_name': result[0][3], "is_active": result[0][4], "countries": result[0][5], "link_url": result[0][6]}) @app.route('/download_file', methods=['GET']) def download_file(): file_name = request.args.get('fileName') app.logger.info("download_file 接口被调用,开始下载文件:{}".format(file_name)) path = os.path.join(os.getcwd(), "file/h5") file_path = os.path.join(path, f"{file_name}") if not os.path.exists(file_path): app.logger.info("该文件:{},不存在".format(file_name)) return jsonify({}) else: return send_file(file_path, as_attachment=True) @app.route('/upload_browser_info', methods=['POST']) def upload_browser_info(): try: params = json.loads(request.data) visitor_id = params["fingerprint"]['visitorId'] app.logger.info("接口:upload_browser_info 被调用,传入参数为:{}".format(str(visitor_id))) # 对数据进行保存 sql_str = "insert into task_h5_browser_info(visitor_id,browser_info) values(%s,%s)" db.execute_single(sql_str, (visitor_id, json.dumps(params))) return jsonify({'status': 'ok'}) except Exception as e: if "PRIMARY" in str(e): return jsonify({'status': 'ok'}) app.logger.info("浏览器信息保存失败,失败原因:{}".format(str(e))) return jsonify({'status': 'error', 'msg': str(e)}) @app.route('/get_browser_info', methods=['GET']) def get_browser_info(): sql_str = "select browser_info from task_h5_browser_info ORDER BY RAND() LIMIT 1" try: result = db.execute_query(sql_str, ) app.logger.info("get_browser_info:获取数据长度为:{}".format(len(result))) return jsonify(json.loads(result[0][0])) except Exception as e: app.logger.info("获取浏览器信息失败,失败原因:{}".format(str(e))) return jsonify({'status': "error", "message": str(e)}) @app.route('/get_account', methods=['GET']) def get_account_id(): sql_str = "select account_id from account_h5_info where is_active=1 ORDER BY RAND() LIMIT 1" result = db.execute_query(sql_str, ) if result: return jsonify({'accountId': result[0][0]}) return jsonify({'status': "error", "message": ""}) @app.route('/h5_issue_task', methods=['GET']) def issue_task(): account_id = request.args.get('accountId') sql_str = "select account_id,keep_rate,task_total,account_name,is_active from account_h5_info where account_id=%s" result = db.execute_query(sql_str, (account_id,)) if result: task_total = int(result[0][2]) app.logger.info("账户:{}需要执行新增任务总数为:{}".format(account_id, task_total)) now_date = datetime.now().strftime("%Y-%m-%d") sql_str = "select execute_task_number,id from task_execute_info where account_id=%s and save_date=%s" result = db.execute_query(sql_str, (account_id, now_date)) execute_task_number = 0 if result: execute_task_number = result[0][0] must_task_number = math.ceil((task_total / 24) * int(datetime.today().hour)) app.logger.info("账户:{}已执行任务个数为:{}".format(account_id, execute_task_number)) app.logger.info("账户:{} 按照计划应该执行行任务个数为:{}".format(account_id, must_task_number)) if execute_task_number >= must_task_number * 1.2: app.logger.info( "账户:{},已经执行新增任务{}个,执行速度过快,需要暂停执行".format(str(account_id), execute_task_number)) return jsonify({"message": "账户:{} 执行任务过快,暂停执行".format(account_izd)}), 403 execute_task_number += 1 if result: sql_str = "UPDATE task_execute_info SET execute_task_number = %s,update_time=%s WHERE account_id = %s and save_date=%s" db.execute_update(sql_str, (execute_task_number, datetime.now(), account_id, now_date)) else: sql_str = "insert into task_execute_info(account_id,execute_task_number,save_date) values(%s, %s, %s)" db.execute_update(sql_str, (account_id, execute_task_number, now_date)) sql_str = "select browser_info from task_h5_browser_info ORDER BY RAND() LIMIT 1" result = db.execute_query(sql_str) if result: return jsonify(json.loads(result[0][0])) else: app.logger.info("未获取到账户:{} 相关配置信息".format(account_id)) return jsonify({'status': 'error', 'msg': '传入账号错误'}) @app.route('/upload_file', methods=['POST']) def upload_file(): file = request.files['file'] file_name = file.filename account_id = request.args.get('accountId') app.logger.info("接口upload_file 传入accountId 为:{}".format(account_id)) path = os.path.join(os.getcwd(), f"file/h5/{account_id}") if not os.path.exists(path): os.makedirs(path) file_path = os.path.join(path, file_name) # # 需要对数据进行存储 if os.path.exists(file_path): app.logger.info("文件:{},当天上传过,需要先删除再保存".format(file_name)) os.remove(file_path) file.save(file_path) app.logger.info("文件:{}已保持".format(file_name)) return jsonify({'message': 'File uploaded successfully', 'path': file_path}) @app.route('/', methods=['get']) def test_post(): return jsonify({"key": "value"}) if __name__ == '__main__': app.run(host='0.0.0.0', port=5000)