From 22de3cf7a540fc94059f16e484cfb597e73bede0 Mon Sep 17 00:00:00 2001 From: ljj Date: Fri, 11 Jul 2025 15:14:19 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=96=B0=20app.py?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app.py | 570 ++++++++++++++++++++++----------------------------------- 1 file changed, 219 insertions(+), 351 deletions(-) diff --git a/app.py b/app.py index f3b2a10..ebf19a7 100644 --- a/app.py +++ b/app.py @@ -1,351 +1,219 @@ -# -*- coding: utf-8 -*- -""" -@Project : data_upload_download -@File : app.py -@IDE : PyCharm -@Author : liu jian jiang -@Date : 2025/6/17 11:23 -""" -import os -import uuid -import math -import json -import logging -from datetime import datetime, timedelta -from public_func import MySQLPool -from logging.handlers import RotatingFileHandler -from flask import Flask, request, jsonify, send_file, render_template, redirect, url_for -from flask_cors import CORS - -db = MySQLPool() - -app = Flask(__name__) -CORS(app) -# 配置日志 -handler = RotatingFileHandler( - './logs/flask_app.log', - maxBytes=1024 * 1024 * 1024, - backupCount=5 -) -handler.setFormatter(logging.Formatter( - '%(asctime)s %(levelname)s: %(message)s' -)) -app.logger.addHandler(handler) -app.logger.setLevel(logging.INFO) - - -# H5 项目接口 -@app.route('/h5', methods=['GET']) -def get_h5_account_info(): - sql_str = "select account_id,keep_rate,task_total,account_name,is_active, countries, link_url from account_h5_info" - try: - result = db.execute_query(sql_str) - return render_template('account_h5_info.html', - users=[ - {'account_id': str(x[0]), 'keep_rate': x[1], "task_total": x[2], "account_name": x[3], - "countries": x[5], "link_url": x[6], "is_active": x[4]} - for x in - result]) - except Exception as e: - app.logger.info("获取浏览器信息失败,失败原因:{}".format(str(e))) - return jsonify({}) - - -@app.route('/add_h5_user', methods=['GET', 'POST']) -def add_h5_user(): - if request.method == 'POST': - try: - keep_rate = request.form['keep_rate'] - task_total = request.form['task_total'] - account_name = request.form['account_name'] - countries = request.form['countries'] - link_url = request.form['link_url'] - - sql_str = "insert into account_h5_info(account_id,keep_rate,task_total,account_name,countries,link_url) values(%s,%s,%s,%s,%s,%s)" - db.execute_single(sql_str, (uuid.uuid1(), keep_rate, task_total, account_name, countries, link_url)) - return redirect(url_for('get_h5_account_info')) - except Exception as e: - app.logger.info("{}".format(str(e))) - return render_template('add_h5_user.html') - - -@app.route('/edit_user', methods=['GET', 'POST']) -def edit_user(): - account_id = request.args.get('account_id') - if request.method == 'POST': - account_id = request.form['account_id'] - keep_rate = request.form['keep_rate'] - task_total = int(request.form['task_total']) - account_name = request.form['account_name'] - countries = request.form['countries'] - link_url = request.form['link_url'] - is_active = request.form['is_active'] - update_time = datetime.now() - sql_str = "UPDATE account_h5_info set keep_rate=%s, task_total=%s, account_name=%s ,is_active=%s ,countries=%s, link_url=%s,update_time=%s where account_id=%s" - db.execute_single(sql_str, (keep_rate, task_total, account_name, is_active, countries, link_url, update_time, account_id)) - return redirect(url_for('get_h5_account_info')) - sql_str = "select account_id,keep_rate,task_total,account_name,is_active,countries, link_url from account_h5_info where account_id=%s" - result = db.execute_query(sql_str, (account_id,)) - return render_template('edit_h5_user.html', - users={'account_id': result[0][0], 'keep_rate': result[0][1], 'task_total': result[0][2], - 'account_name': result[0][3], "is_active": result[0][4], "countries": result[0][5], - "link_url": result[0][6]}) - - -@app.route('/download_file', methods=['GET']) -def download_file(): - file_name = request.args.get('fileName') - app.logger.info("download_file 接口被调用,开始下载文件:{}".format(file_name)) - path = os.path.join(os.getcwd(), "file/h5") - file_path = os.path.join(path, f"{file_name}") - if not os.path.exists(file_path): - app.logger.info("该文件:{},不存在".format(file_name)) - return jsonify({}) - else: - return send_file(file_path, as_attachment=True) - - -@app.route('/upload_browser_info', methods=['POST']) -def upload_browser_info(): - try: - params = json.loads(request.data) - visitor_id = params["fingerprint"]['visitorId'] - app.logger.info("接口:upload_browser_info 被调用,传入参数为:{}".format(str(visitor_id))) - # 对数据进行保存 - sql_str = "insert into task_h5_browser_info(visitor_id,browser_info) values(%s,%s)" - db.execute_single(sql_str, (visitor_id, json.dumps(params))) - return jsonify({'status': 'ok'}) - except Exception as e: - if "PRIMARY" in str(e): - return jsonify({'status': 'ok'}) - app.logger.info("浏览器信息保存失败,失败原因:{}".format(str(e))) - return jsonify({'status': 'error', 'msg': str(e)}) - - -@app.route('/get_browser_info', methods=['GET']) -def get_browser_info(): - sql_str = "select browser_info from task_h5_browser_info ORDER BY RAND() LIMIT 1" - try: - result = db.execute_query(sql_str, ) - app.logger.info("get_browser_info:获取数据长度为:{}".format(len(result))) - return jsonify(json.loads(result[0][0])) - except Exception as e: - app.logger.info("获取浏览器信息失败,失败原因:{}".format(str(e))) - return jsonify({'status': "error", "message": str(e)}) - - -@app.route('/get_account', methods=['GET']) -def get_account_id(): - sql_str = "select account_id from account_h5_info where is_active=1 ORDER BY RAND() LIMIT 1" - result = db.execute_query(sql_str, ) - if result: - return jsonify({'accountId': result[0][0]}) - return jsonify({'status': "error", "message": ""}) - - -@app.route('/h5_issue_task', methods=['GET']) -def issue_task(): - account_id = request.args.get('accountId') - sql_str = "select account_id,keep_rate,task_total,account_name,is_active from account_h5_info where account_id=%s" - result = db.execute_query(sql_str, (account_id,)) - if result: - task_total = int(result[0][2]) - app.logger.info("账户:{}需要执行新增任务总数为:{}".format(account_id, task_total)) - now_date = datetime.now().strftime("%Y-%m-%d") - sql_str = "select execute_task_number,id from task_execute_info where account_id=%s and save_date=%s" - result = db.execute_query(sql_str, (account_id, now_date)) - execute_task_number = 0 - if result: - execute_task_number = result[0][0] - must_task_number = math.ceil((task_total / 24) * int(datetime.today().hour)) - app.logger.info("账户:{}已执行任务个数为:{}".format(account_id, execute_task_number)) - app.logger.info("账户:{} 按照计划应该执行行任务个数为:{}".format(account_id, must_task_number)) - if execute_task_number >= must_task_number * 1.2: - app.logger.info( - "账户:{},已经执行新增任务{}个,执行速度过快,需要暂停执行".format(str(account_id), execute_task_number)) - return jsonify({"message": "账户:{} 执行任务过快,暂停执行".format(account_izd)}), 403 - execute_task_number += 1 - if result: - - sql_str = "UPDATE task_execute_info SET execute_task_number = %s,update_time=%s WHERE account_id = %s and save_date=%s" - db.execute_update(sql_str, (execute_task_number, datetime.now(), account_id, now_date)) - else: - sql_str = "insert into task_execute_info(account_id,execute_task_number,save_date) values(%s, %s, %s)" - db.execute_update(sql_str, (account_id, execute_task_number, now_date)) - sql_str = "select browser_info from task_h5_browser_info ORDER BY RAND() LIMIT 1" - result = db.execute_query(sql_str) - if result: - return jsonify(json.loads(result[0][0])) - else: - app.logger.info("未获取到账户:{} 相关配置信息".format(account_id)) - return jsonify({'status': 'error', 'msg': '传入账号错误'}) - - -@app.route('/upload_file', methods=['POST']) -def upload_file(): - file = request.files['file'] - file_name = file.filename - account_id = request.args.get('accountId') - app.logger.info("接口upload_file 传入accountId 为:{}".format(account_id)) - path = os.path.join(os.getcwd(), f"file/h5/{account_id}") - if not os.path.exists(path): - os.makedirs(path) - file_path = os.path.join(path, file_name) - # # 需要对数据进行存储 - if os.path.exists(file_path): - app.logger.info("文件:{},当天上传过,需要先删除再保存".format(file_name)) - os.remove(file_path) - - file.save(file_path) - app.logger.info("文件:{}已保持".format(file_name)) - return jsonify({'message': 'File uploaded successfully', 'path': file_path}) - - -# bigo 项目接口 - -def get_device_info(day_date, percent, android_id, i): - # 1. 计算总记录数 - sql_str = "select count(*) from device_info where save_date=%s and android_id=%s and is_use=%s ;" - result = db.execute_query(sql_str, (day_date, android_id, i)) - total_count = result[0][0] - app.logger.info(("日期:{},总共有 {} 条数据".format(day_date, total_count))) - need_deal_count = math.ceil(total_count * percent) - app.logger.info("日期:{} ,需要处理 {} 条数据".format(day_date, need_deal_count)) - if need_deal_count == 0: - return [] - # 计算已经处理的数量 - sql_str = "select count(*) from device_info where save_date=%s and is_use=%s and android_id=%s ;" - result = db.execute_query(sql_str, (day_date, i + 1, android_id)) - have_deal_count = result[0][0] - app.logger.info("日期:{} ,已经处理的数据有:{}条".format(day_date, have_deal_count)) - if have_deal_count >= need_deal_count: - return [] - # 准备需要处理的数据 - limit_count = need_deal_count - have_deal_count - app.logger.info("日期:{} 还剩{}多少条数据需要处理".format(day_date, limit_count)) - sql_str = "select id,android_id,device_info,package_name from device_info where save_date=%s and is_use=%s and android_id=%s limit %s;" - result = db.execute_query(sql_str, (day_date, i, android_id, limit_count)) - return result[0] - - -@app.route('/', methods=['get']) -def test_post(): - return jsonify({"key": "value"}) - - -@app.route('/device_info_upload', methods=['POST']) -def device_info_upload(): - try: - params = request.get_json() - app.logger.info("接口:device_info_upload 被调用,传入参数为:{}".format(str(params))) - if not params: - return jsonify({}) - android_id = request.args.get('id') - device_ip = request.args.get('deviceIp') - # 留存需要修改 - if android_id == "FyZqWrStUvOpKlMn": - task_id = request.args.get('taskId') - package_name = request.args.get('packageName') - if package_name: - app.logger.info("传入参数:android_id为{},package_name 为:{}".format(android_id, package_name)) - app.logger.info("传入参数:android_id为{},task_id为:{}".format(android_id, task_id)) - try: - sql_str = "INSERT INTO device_info(android_id, device_info,task_id,package_name,device_ip) VALUES (%s, %s,%s,%s,%s)" - db.execute_single(sql_str, (android_id, json.dumps(params), task_id, package_name, device_ip)) - app.logger.info("android_id:{} 数据写入成功".format(str(android_id))) - return {"message": "android_id为:{} 的设备信息保存成功".format(android_id)} - except Exception as e: - app.logger.info({"message": "android_id为:{} 的设备信息保存失败,失败原因为:{}".format(str(android_id), str(e))}) - else: - return jsonify({"status": "error", "msg": "android_id:{} 传入安装包名为{} ".format(android_id, package_name)}) - return jsonify({}) - except Exception as e: - app.logger.info({"message": "{}".format(str(e))}) - return jsonify({}) - - -@app.route('/device_info_download', methods=['GET']) -def device_info_download(): - android_id = request.args.get('androidId') - app.logger.info("接口:device_info_download 被调用,传入参数为:{}".format(str(android_id))) - try: - percent_list = [0.5, 0.3, 0.18, 0.11, 0.09, 0.06, 0.04] - for i in range(len(percent_list)): - day = (datetime.now() - timedelta(days=i + 1)).strftime('%Y-%m-%d') - result = get_device_info(day, percent_list[i], android_id=android_id, i=i) - if result: - sql_str = "UPDATE device_info SET is_use = %s WHERE id = %s" - db.execute_update(sql_str, (i + 1, result[0])) - param = json.loads(result[-2]) - param["package_name"] = result[-1] - param["file_name"] = f"{result[1]}_{result[-1]}.zip" - return jsonify(json.dumps(param)) - return jsonify({}) - except Exception as e: - app.logger.info("获取设备:{} 的信息失败,失败原因为:{}".format(str(android_id), str(e))) - return jsonify({}) - - -@app.route('/count_click', methods=['GET']) -def count_click_times(): - click_id = request.args.get('ClickAd') - app.logger.info("接口:{}被调用,传入参数click_id为:{}".format(click_id, str(datetime.now()))) - click_date = datetime.today().strftime('%Y%m%d%H') - try: - sql_str = "insert into count_click_times(click_id,click_date) values (%s, %s)" - db.execute_single(sql_str, (click_id, click_date)) - return jsonify({"message": "click_id:{}已记录".format(click_id)}) - except Exception as e: - app.logger.info({"message": "{}".format(str(e))}) - return jsonify({"message": "click_id:{},数据记录失败,失败原因{}".format(click_id, str(e))}), 403 - - -@app.route('/download_code_file', methods=['GET']) -def download_code_file(): - file_name = request.args.get('file_name') - app.logger.info("download_code_file 接口被调用,开始下载文件:{}".format(file_name)) - path = os.path.join(os.getcwd(), "file") - file_path = os.path.join(path, f"{file_name}") - if not os.path.exists(file_path): - app.logger.info("该文件:{},不存在".format(file_name)) - return jsonify({}) - else: - return send_file(file_path, as_attachment=True) - - -@app.route('/upload_package', methods=['POST']) -def upload_package(): - app.logger.info("upload_package 接口已被调用") - file = request.files['file'] - file_name = file.filename - app.logger.info("upload_package 接口被调用,开始查询该文件是否存在:{}".format(file_name)) - path = os.path.join(os.getcwd(), f"file") - if not os.path.exists(path): - os.makedirs(path) - file_path = os.path.join(path, f"{file_name}") - # 需要对数据进行存储 - if os.path.exists(file_path): - app.logger.info("该 package:{},当天上传过,需要先删除再保存".format(file_name)) - os.remove(file_path) - - file.save(file_path) - app.logger.info("package:{}已保持".format(file_name)) - return jsonify({'message': 'File uploaded successfully', 'path': file_path}) - - -@app.route('/download_package', methods=['GET']) -def download_package(): - app.logger.info("接口被调用") - file_name = request.args.get('file_name') - path = os.path.join(os.getcwd(), "file") - file_path = os.path.join(path, f"{file_name}") - app.logger.info(file_path) - if not os.path.exists(file_path): - app.logger.info("压缩包:{},不存在".format(file_name)) - return jsonify({}), 404 - else: - return send_file(file_path, as_attachment=True) - - -if __name__ == '__main__': - app.run(host='0.0.0.0', port=5000) +# -*- coding: utf-8 -*- +""" +@Project : data_upload_download +@File : app.py +@IDE : PyCharm +@Author : liu jian jiang +@Date : 2025/6/17 11:23 +""" +import os +import uuid +import math +import json +import logging +from datetime import datetime, timedelta +from public_func import MySQLPool +from logging.handlers import RotatingFileHandler +from flask import Flask, request, jsonify, send_file, render_template, redirect, url_for +from flask_cors import CORS + +db = MySQLPool() + +app = Flask(__name__) +CORS(app) +# 配置日志 +handler = RotatingFileHandler( + './logs/flask_app.log', + maxBytes=1024 * 1024 * 1024, + backupCount=5 +) +handler.setFormatter(logging.Formatter( + '%(asctime)s %(levelname)s: %(message)s' +)) +app.logger.addHandler(handler) +app.logger.setLevel(logging.INFO) + + +# H5 项目接口 +@app.route('/h5', methods=['GET']) +def get_h5_account_info(): + sql_str = "select account_id,keep_rate,task_total,account_name,is_active, countries, link_url from account_h5_info" + try: + result = db.execute_query(sql_str) + return render_template('account_h5_info.html', + users=[ + {'account_id': str(x[0]), 'keep_rate': x[1], "task_total": x[2], "account_name": x[3], + "countries": x[5], "link_url": x[6], "is_active": x[4]} + for x in + result]) + except Exception as e: + app.logger.info("获取浏览器信息失败,失败原因:{}".format(str(e))) + return jsonify({}) + + +@app.route('/add_h5_user', methods=['GET', 'POST']) +def add_h5_user(): + if request.method == 'POST': + try: + keep_rate = request.form['keep_rate'] + task_total = request.form['task_total'] + account_name = request.form['account_name'] + countries = request.form['countries'] + link_url = request.form['link_url'] + + sql_str = "insert into account_h5_info(account_id,keep_rate,task_total,account_name,countries,link_url) values(%s,%s,%s,%s,%s,%s)" + db.execute_single(sql_str, (uuid.uuid1(), keep_rate, task_total, account_name, countries, link_url)) + return redirect(url_for('get_h5_account_info')) + except Exception as e: + app.logger.info("{}".format(str(e))) + return render_template('add_h5_user.html') + + +@app.route('/edit_user', methods=['GET', 'POST']) +def edit_user(): + account_id = request.args.get('account_id') + if request.method == 'POST': + account_id = request.form['account_id'] + keep_rate = request.form['keep_rate'] + task_total = int(request.form['task_total']) + account_name = request.form['account_name'] + countries = request.form['countries'] + link_url = request.form['link_url'] + is_active = request.form['is_active'] + update_time = datetime.now() + sql_str = "UPDATE account_h5_info set keep_rate=%s, task_total=%s, account_name=%s ,is_active=%s ,countries=%s, link_url=%s,update_time=%s where account_id=%s" + db.execute_single(sql_str, (keep_rate, task_total, account_name, is_active, countries, link_url, update_time, account_id)) + return redirect(url_for('get_h5_account_info')) + sql_str = "select account_id,keep_rate,task_total,account_name,is_active,countries, link_url from account_h5_info where account_id=%s" + result = db.execute_query(sql_str, (account_id,)) + return render_template('edit_h5_user.html', + users={'account_id': result[0][0], 'keep_rate': result[0][1], 'task_total': result[0][2], + 'account_name': result[0][3], "is_active": result[0][4], "countries": result[0][5], + "link_url": result[0][6]}) + + +@app.route('/download_file', methods=['GET']) +def download_file(): + file_name = request.args.get('fileName') + app.logger.info("download_file 接口被调用,开始下载文件:{}".format(file_name)) + path = os.path.join(os.getcwd(), "file/h5") + file_path = os.path.join(path, f"{file_name}") + if not os.path.exists(file_path): + app.logger.info("该文件:{},不存在".format(file_name)) + return jsonify({}) + else: + return send_file(file_path, as_attachment=True) + + +@app.route('/upload_browser_info', methods=['POST']) +def upload_browser_info(): + try: + params = json.loads(request.data) + visitor_id = params["fingerprint"]['visitorId'] + app.logger.info("接口:upload_browser_info 被调用,传入参数为:{}".format(str(visitor_id))) + # 对数据进行保存 + sql_str = "insert into task_h5_browser_info(visitor_id,browser_info) values(%s,%s)" + db.execute_single(sql_str, (visitor_id, json.dumps(params))) + return jsonify({'status': 'ok'}) + except Exception as e: + if "PRIMARY" in str(e): + return jsonify({'status': 'ok'}) + app.logger.info("浏览器信息保存失败,失败原因:{}".format(str(e))) + return jsonify({'status': 'error', 'msg': str(e)}) + + +@app.route('/get_browser_info', methods=['GET']) +def get_browser_info(): + sql_str = "select browser_info from task_h5_browser_info ORDER BY RAND() LIMIT 1" + try: + result = db.execute_query(sql_str, ) + app.logger.info("get_browser_info:获取数据长度为:{}".format(len(result))) + return jsonify(json.loads(result[0][0])) + except Exception as e: + app.logger.info("获取浏览器信息失败,失败原因:{}".format(str(e))) + return jsonify({'status': "error", "message": str(e)}) + + +@app.route('/get_account', methods=['GET']) +def get_account_id(): + sql_str = "select account_id from account_h5_info where is_active=1 ORDER BY RAND() LIMIT 1" + result = db.execute_query(sql_str, ) + if result: + return jsonify({'accountId': result[0][0]}) + return jsonify({'status': "error", "message": ""}) + + +@app.route('/h5_issue_task', methods=['GET']) +def issue_task(): + account_id = request.args.get('accountId') + sql_str = "select account_id,keep_rate,task_total,account_name,is_active from account_h5_info where account_id=%s" + result = db.execute_query(sql_str, (account_id,)) + if result: + task_total = int(result[0][2]) + app.logger.info("账户:{}需要执行新增任务总数为:{}".format(account_id, task_total)) + now_date = datetime.now().strftime("%Y-%m-%d") + sql_str = "select execute_task_number,id from task_execute_info where account_id=%s and save_date=%s" + result = db.execute_query(sql_str, (account_id, now_date)) + execute_task_number = 0 + if result: + execute_task_number = result[0][0] + must_task_number = math.ceil((task_total / 24) * int(datetime.today().hour)) + app.logger.info("账户:{}已执行任务个数为:{}".format(account_id, execute_task_number)) + app.logger.info("账户:{} 按照计划应该执行行任务个数为:{}".format(account_id, must_task_number)) + if execute_task_number >= must_task_number * 1.2: + app.logger.info( + "账户:{},已经执行新增任务{}个,执行速度过快,需要暂停执行".format(str(account_id), execute_task_number)) + return jsonify({"message": "账户:{} 执行任务过快,暂停执行".format(account_izd)}), 403 + execute_task_number += 1 + if result: + + sql_str = "UPDATE task_execute_info SET execute_task_number = %s,update_time=%s WHERE account_id = %s and save_date=%s" + db.execute_update(sql_str, (execute_task_number, datetime.now(), account_id, now_date)) + else: + sql_str = "insert into task_execute_info(account_id,execute_task_number,save_date) values(%s, %s, %s)" + db.execute_update(sql_str, (account_id, execute_task_number, now_date)) + sql_str = "select browser_info from task_h5_browser_info ORDER BY RAND() LIMIT 1" + result = db.execute_query(sql_str) + if result: + return jsonify(json.loads(result[0][0])) + else: + app.logger.info("未获取到账户:{} 相关配置信息".format(account_id)) + return jsonify({'status': 'error', 'msg': '传入账号错误'}) + + +@app.route('/upload_file', methods=['POST']) +def upload_file(): + file = request.files['file'] + file_name = file.filename + account_id = request.args.get('accountId') + app.logger.info("接口upload_file 传入accountId 为:{}".format(account_id)) + path = os.path.join(os.getcwd(), f"file/h5/{account_id}") + if not os.path.exists(path): + os.makedirs(path) + file_path = os.path.join(path, file_name) + # # 需要对数据进行存储 + if os.path.exists(file_path): + app.logger.info("文件:{},当天上传过,需要先删除再保存".format(file_name)) + os.remove(file_path) + + file.save(file_path) + app.logger.info("文件:{}已保持".format(file_name)) + return jsonify({'message': 'File uploaded successfully', 'path': file_path}) + + + + + +@app.route('/', methods=['get']) +def test_post(): + return jsonify({"key": "value"}) + + + + + + + + +if __name__ == '__main__': + app.run(host='0.0.0.0', port=5000)