上传文件至 /

This commit is contained in:
ljj 2025-07-11 15:11:34 +08:00
commit a8a7048fa0
1 changed files with 351 additions and 0 deletions

351
app.py Normal file
View File

@ -0,0 +1,351 @@
# -*- coding: utf-8 -*-
"""
@Project : data_upload_download
@File : app.py
@IDE : PyCharm
@Author : liu jian jiang
@Date : 2025/6/17 11:23
"""
import os
import uuid
import math
import json
import logging
from datetime import datetime, timedelta
from public_func import MySQLPool
from logging.handlers import RotatingFileHandler
from flask import Flask, request, jsonify, send_file, render_template, redirect, url_for
from flask_cors import CORS
db = MySQLPool()
app = Flask(__name__)
CORS(app)
# 配置日志
handler = RotatingFileHandler(
'./logs/flask_app.log',
maxBytes=1024 * 1024 * 1024,
backupCount=5
)
handler.setFormatter(logging.Formatter(
'%(asctime)s %(levelname)s: %(message)s'
))
app.logger.addHandler(handler)
app.logger.setLevel(logging.INFO)
# H5 项目接口
@app.route('/h5', methods=['GET'])
def get_h5_account_info():
sql_str = "select account_id,keep_rate,task_total,account_name,is_active, countries, link_url from account_h5_info"
try:
result = db.execute_query(sql_str)
return render_template('account_h5_info.html',
users=[
{'account_id': str(x[0]), 'keep_rate': x[1], "task_total": x[2], "account_name": x[3],
"countries": x[5], "link_url": x[6], "is_active": x[4]}
for x in
result])
except Exception as e:
app.logger.info("获取浏览器信息失败,失败原因:{}".format(str(e)))
return jsonify({})
@app.route('/add_h5_user', methods=['GET', 'POST'])
def add_h5_user():
if request.method == 'POST':
try:
keep_rate = request.form['keep_rate']
task_total = request.form['task_total']
account_name = request.form['account_name']
countries = request.form['countries']
link_url = request.form['link_url']
sql_str = "insert into account_h5_info(account_id,keep_rate,task_total,account_name,countries,link_url) values(%s,%s,%s,%s,%s,%s)"
db.execute_single(sql_str, (uuid.uuid1(), keep_rate, task_total, account_name, countries, link_url))
return redirect(url_for('get_h5_account_info'))
except Exception as e:
app.logger.info("<UNK>{}".format(str(e)))
return render_template('add_h5_user.html')
@app.route('/edit_user', methods=['GET', 'POST'])
def edit_user():
account_id = request.args.get('account_id')
if request.method == 'POST':
account_id = request.form['account_id']
keep_rate = request.form['keep_rate']
task_total = int(request.form['task_total'])
account_name = request.form['account_name']
countries = request.form['countries']
link_url = request.form['link_url']
is_active = request.form['is_active']
update_time = datetime.now()
sql_str = "UPDATE account_h5_info set keep_rate=%s, task_total=%s, account_name=%s ,is_active=%s ,countries=%s, link_url=%s,update_time=%s where account_id=%s"
db.execute_single(sql_str, (keep_rate, task_total, account_name, is_active, countries, link_url, update_time, account_id))
return redirect(url_for('get_h5_account_info'))
sql_str = "select account_id,keep_rate,task_total,account_name,is_active,countries, link_url from account_h5_info where account_id=%s"
result = db.execute_query(sql_str, (account_id,))
return render_template('edit_h5_user.html',
users={'account_id': result[0][0], 'keep_rate': result[0][1], 'task_total': result[0][2],
'account_name': result[0][3], "is_active": result[0][4], "countries": result[0][5],
"link_url": result[0][6]})
@app.route('/download_file', methods=['GET'])
def download_file():
file_name = request.args.get('fileName')
app.logger.info("download_file 接口被调用,开始下载文件:{}".format(file_name))
path = os.path.join(os.getcwd(), "file/h5")
file_path = os.path.join(path, f"{file_name}")
if not os.path.exists(file_path):
app.logger.info("该文件:{},不存在".format(file_name))
return jsonify({})
else:
return send_file(file_path, as_attachment=True)
@app.route('/upload_browser_info', methods=['POST'])
def upload_browser_info():
try:
params = json.loads(request.data)
visitor_id = params["fingerprint"]['visitorId']
app.logger.info("接口upload_browser_info 被调用,传入参数为:{}".format(str(visitor_id)))
# 对数据进行保存
sql_str = "insert into task_h5_browser_info(visitor_id,browser_info) values(%s,%s)"
db.execute_single(sql_str, (visitor_id, json.dumps(params)))
return jsonify({'status': 'ok'})
except Exception as e:
if "PRIMARY" in str(e):
return jsonify({'status': 'ok'})
app.logger.info("浏览器信息保存失败,失败原因:{}".format(str(e)))
return jsonify({'status': 'error', 'msg': str(e)})
@app.route('/get_browser_info', methods=['GET'])
def get_browser_info():
sql_str = "select browser_info from task_h5_browser_info ORDER BY RAND() LIMIT 1"
try:
result = db.execute_query(sql_str, )
app.logger.info("get_browser_info:获取数据长度为:{}".format(len(result)))
return jsonify(json.loads(result[0][0]))
except Exception as e:
app.logger.info("获取浏览器信息失败,失败原因:{}".format(str(e)))
return jsonify({'status': "error", "message": str(e)})
@app.route('/get_account', methods=['GET'])
def get_account_id():
sql_str = "select account_id from account_h5_info where is_active=1 ORDER BY RAND() LIMIT 1"
result = db.execute_query(sql_str, )
if result:
return jsonify({'accountId': result[0][0]})
return jsonify({'status': "error", "message": "<UNK>"})
@app.route('/h5_issue_task', methods=['GET'])
def issue_task():
account_id = request.args.get('accountId')
sql_str = "select account_id,keep_rate,task_total,account_name,is_active from account_h5_info where account_id=%s"
result = db.execute_query(sql_str, (account_id,))
if result:
task_total = int(result[0][2])
app.logger.info("账户:{}需要执行新增任务总数为:{}".format(account_id, task_total))
now_date = datetime.now().strftime("%Y-%m-%d")
sql_str = "select execute_task_number,id from task_execute_info where account_id=%s and save_date=%s"
result = db.execute_query(sql_str, (account_id, now_date))
execute_task_number = 0
if result:
execute_task_number = result[0][0]
must_task_number = math.ceil((task_total / 24) * int(datetime.today().hour))
app.logger.info("账户:{}已执行任务个数为:{}".format(account_id, execute_task_number))
app.logger.info("账户:{} 按照计划应该执行行任务个数为:{}".format(account_id, must_task_number))
if execute_task_number >= must_task_number * 1.2:
app.logger.info(
"账户:{},已经执行新增任务{}个,执行速度过快,需要暂停执行".format(str(account_id), execute_task_number))
return jsonify({"message": "账户:{} 执行任务过快,暂停执行".format(account_izd)}), 403
execute_task_number += 1
if result:
sql_str = "UPDATE task_execute_info SET execute_task_number = %s,update_time=%s WHERE account_id = %s and save_date=%s"
db.execute_update(sql_str, (execute_task_number, datetime.now(), account_id, now_date))
else:
sql_str = "insert into task_execute_info(account_id,execute_task_number,save_date) values(%s, %s, %s)"
db.execute_update(sql_str, (account_id, execute_task_number, now_date))
sql_str = "select browser_info from task_h5_browser_info ORDER BY RAND() LIMIT 1"
result = db.execute_query(sql_str)
if result:
return jsonify(json.loads(result[0][0]))
else:
app.logger.info("未获取到账户:{} 相关配置信息".format(account_id))
return jsonify({'status': 'error', 'msg': '传入账号错误'})
@app.route('/upload_file', methods=['POST'])
def upload_file():
file = request.files['file']
file_name = file.filename
account_id = request.args.get('accountId')
app.logger.info("接口upload_file 传入accountId 为:{}".format(account_id))
path = os.path.join(os.getcwd(), f"file/h5/{account_id}")
if not os.path.exists(path):
os.makedirs(path)
file_path = os.path.join(path, file_name)
# # 需要对数据进行存储
if os.path.exists(file_path):
app.logger.info("文件:{},当天上传过,需要先删除再保存".format(file_name))
os.remove(file_path)
file.save(file_path)
app.logger.info("文件:{}已保持".format(file_name))
return jsonify({'message': 'File uploaded successfully', 'path': file_path})
# bigo 项目接口
def get_device_info(day_date, percent, android_id, i):
# 1. 计算总记录数
sql_str = "select count(*) from device_info where save_date=%s and android_id=%s and is_use=%s ;"
result = db.execute_query(sql_str, (day_date, android_id, i))
total_count = result[0][0]
app.logger.info(("日期:{},总共有 {} 条数据".format(day_date, total_count)))
need_deal_count = math.ceil(total_count * percent)
app.logger.info("日期:{} ,需要处理 {} 条数据".format(day_date, need_deal_count))
if need_deal_count == 0:
return []
# 计算已经处理的数量
sql_str = "select count(*) from device_info where save_date=%s and is_use=%s and android_id=%s ;"
result = db.execute_query(sql_str, (day_date, i + 1, android_id))
have_deal_count = result[0][0]
app.logger.info("日期:{} ,已经处理的数据有:{}".format(day_date, have_deal_count))
if have_deal_count >= need_deal_count:
return []
# 准备需要处理的数据
limit_count = need_deal_count - have_deal_count
app.logger.info("日期:{} 还剩{}多少条数据需要处理".format(day_date, limit_count))
sql_str = "select id,android_id,device_info,package_name from device_info where save_date=%s and is_use=%s and android_id=%s limit %s;"
result = db.execute_query(sql_str, (day_date, i, android_id, limit_count))
return result[0]
@app.route('/', methods=['get'])
def test_post():
return jsonify({"key": "value"})
@app.route('/device_info_upload', methods=['POST'])
def device_info_upload():
try:
params = request.get_json()
app.logger.info("接口device_info_upload 被调用,传入参数为:{}".format(str(params)))
if not params:
return jsonify({})
android_id = request.args.get('id')
device_ip = request.args.get('deviceIp')
# 留存需要修改
if android_id == "FyZqWrStUvOpKlMn":
task_id = request.args.get('taskId')
package_name = request.args.get('packageName')
if package_name:
app.logger.info("传入参数android_id为{}package_name 为:{}".format(android_id, package_name))
app.logger.info("传入参数android_id为{}task_id为{}".format(android_id, task_id))
try:
sql_str = "INSERT INTO device_info(android_id, device_info,task_id,package_name,device_ip) VALUES (%s, %s,%s,%s,%s)"
db.execute_single(sql_str, (android_id, json.dumps(params), task_id, package_name, device_ip))
app.logger.info("android_id:{} 数据写入成功".format(str(android_id)))
return {"message": "android_id为{} 的设备信息保存成功".format(android_id)}
except Exception as e:
app.logger.info({"message": "android_id为{} 的设备信息保存失败,失败原因为:{}".format(str(android_id), str(e))})
else:
return jsonify({"status": "error", "msg": "android_id:{} 传入安装包名为{} ".format(android_id, package_name)})
return jsonify({})
except Exception as e:
app.logger.info({"message": "{}".format(str(e))})
return jsonify({})
@app.route('/device_info_download', methods=['GET'])
def device_info_download():
android_id = request.args.get('androidId')
app.logger.info("接口device_info_download 被调用,传入参数为:{}".format(str(android_id)))
try:
percent_list = [0.5, 0.3, 0.18, 0.11, 0.09, 0.06, 0.04]
for i in range(len(percent_list)):
day = (datetime.now() - timedelta(days=i + 1)).strftime('%Y-%m-%d')
result = get_device_info(day, percent_list[i], android_id=android_id, i=i)
if result:
sql_str = "UPDATE device_info SET is_use = %s WHERE id = %s"
db.execute_update(sql_str, (i + 1, result[0]))
param = json.loads(result[-2])
param["package_name"] = result[-1]
param["file_name"] = f"{result[1]}_{result[-1]}.zip"
return jsonify(json.dumps(param))
return jsonify({})
except Exception as e:
app.logger.info("获取设备:{} 的信息失败,失败原因为:{}".format(str(android_id), str(e)))
return jsonify({})
@app.route('/count_click', methods=['GET'])
def count_click_times():
click_id = request.args.get('ClickAd')
app.logger.info("接口:{}被调用传入参数click_id为{}".format(click_id, str(datetime.now())))
click_date = datetime.today().strftime('%Y%m%d%H')
try:
sql_str = "insert into count_click_times(click_id,click_date) values (%s, %s)"
db.execute_single(sql_str, (click_id, click_date))
return jsonify({"message": "click_id:{}已记录".format(click_id)})
except Exception as e:
app.logger.info({"message": "{}".format(str(e))})
return jsonify({"message": "click_id:{},数据记录失败,失败原因{}".format(click_id, str(e))}), 403
@app.route('/download_code_file', methods=['GET'])
def download_code_file():
file_name = request.args.get('file_name')
app.logger.info("download_code_file 接口被调用,开始下载文件:{}".format(file_name))
path = os.path.join(os.getcwd(), "file")
file_path = os.path.join(path, f"{file_name}")
if not os.path.exists(file_path):
app.logger.info("该文件:{},不存在".format(file_name))
return jsonify({})
else:
return send_file(file_path, as_attachment=True)
@app.route('/upload_package', methods=['POST'])
def upload_package():
app.logger.info("upload_package 接口已被调用")
file = request.files['file']
file_name = file.filename
app.logger.info("upload_package 接口被调用,开始查询该文件是否存在:{}".format(file_name))
path = os.path.join(os.getcwd(), f"file")
if not os.path.exists(path):
os.makedirs(path)
file_path = os.path.join(path, f"{file_name}")
# 需要对数据进行存储
if os.path.exists(file_path):
app.logger.info("该 package{},当天上传过,需要先删除再保存".format(file_name))
os.remove(file_path)
file.save(file_path)
app.logger.info("package:{}已保持".format(file_name))
return jsonify({'message': 'File uploaded successfully', 'path': file_path})
@app.route('/download_package', methods=['GET'])
def download_package():
app.logger.info("接口被调用")
file_name = request.args.get('file_name')
path = os.path.join(os.getcwd(), "file")
file_path = os.path.join(path, f"{file_name}")
app.logger.info(file_path)
if not os.path.exists(file_path):
app.logger.info("压缩包:{},不存在".format(file_name))
return jsonify({}), 404
else:
return send_file(file_path, as_attachment=True)
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000)