crawler_task_management/account_management/deal_account.py

144 lines
6.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
import os
import time
from datetime import datetime
from typing import Optional, Dict, Any, List
from public_function.asyn_mysql import AsyncMySQL
from public_function.deal_all_task import DealAllTask
from public_function.public_func import create_logger
logger = create_logger("shopee_crawler")
class DealAccount:
def __init__(self, config_data: Dict[str, Any]):
self.config_data = config_data
self.db_pool: Optional[AsyncMySQL] = AsyncMySQL(self.config_data["advert_policy"])
self.task_all = DealAllTask(logger=logger)
async def add_account(self, params: List[Dict[str, Any]]):
"""新增账户"""
await self.db_pool.initialize()
result = await self.db_pool.insert_many(table="crawler_account_record_info", data=params)
if result:
return True
return False
async def delete_account(self, params: Dict[str, Any]):
"""删除账户"""
condition = "account_id='{}' and app_name='{}'".format(params["account_id"], params["app_name"])
await self.db_pool.initialize()
result = await self.db_pool.delete(table="crawler_account_record_info", where_conditions=condition)
if result:
return True
return False
async def query_account_info(self, params: Dict[str, Any]):
"""查询具体账户信息"""
sql_str = f"""select account_id,password,app_name,status from crawler_account_record_info
where app_name='{params['app_name']}' and account_id='{params['account_id']}'"""
await self.db_pool.initialize()
result = await self.db_pool.fetch_all(sql_str, )
if result:
return result[0]
return []
async def obtain_account_info(self, app_name, country, number=1):
"""获取指定个数账户信息"""
sql_str = f"""select account_id,password,app_name from crawler_account_record_info
where status=1 and app_name='{app_name}' and region='{country}' limit {number}"""
await self.db_pool.initialize()
result = await self.db_pool.fetch_all(sql_str, )
if result:
return result
return []
async def update_account_info(self, set_param: Dict[str, Any], where_conditions, params):
"""更新账户信息"""
await self.db_pool.initialize()
affected_rows = await self.db_pool.update(table='crawler_account_record_info', set_columns=set_param,
where_conditions=where_conditions, params=params)
if affected_rows:
return True
return False
async def update_device_status(self, set_param: Dict[str, Any], where_conditions, params):
"""更新账户信息"""
affected_rows = await self.db_pool.update(table='shoppe_device_record', set_columns=set_param,
where_conditions=where_conditions, params=params)
if affected_rows:
return True
return False
async def deal_backup_task(self, data: Dict[str, Any]):
# 先查询该账户是否备份过,备份过就修改数据库状态,
await self.db_pool.initialize()
sql_str = f"""select account_id from shoppe_device_record where account_id='{data['account_id']}'"""
result = await self.db_pool.fetch_all(sql_str, )
if result:
logger.info(f"该账号:{data['account_id']} 已经备份过,不需要在备份")
return True
else:
tasks = self.task_all.backup_system([data["pad_code"]], data["pad_code"].lower())
if tasks:
backup_name = tasks[0]["backupId"]
payload = {"account_id": data["account_id"], "backup_name": backup_name, "script_name": data["script_name"]}
# 将数据插入mysql 的表 shoppe_device_record 中
await self.db_pool.insert_many(table="shoppe_device_record", data=[payload])
result = self.task_all.check_phone_status(pad_code=data["pad_code"], file_name="test_abc.js")
if result:
logger.info(f"云机:{data['pad_code']}备份完成")
return True
else:
logger.info(f"云机:{data['pad_code']} 300秒备份失败需要认为干预")
else:
logger.info(f"实例:{data['pad_code']} 不存在")
return False
async def deal_restore_system(self, data: Dict[str, Any]):
await self.db_pool.initialize()
sql_str = f"""SELECT account_id,backup_name,script_name FROM shoppe_device_record
WHERE update_time < NOW() - INTERVAL 24 HOUR ORDER BY RAND() LIMIT 1"""
result = await self.db_pool.fetch_all(sql_str, )
if result:
res_dict = result[0]
logger.info(f"<UNK>{data['account_id']} <UNK>")
set_param = {"status": 2}
params = (res_dict["account_id"],)
where_conditions = "account_id = %s "
result_sql = await self.update_device_status(set_param, where_conditions, params)
logger.info(f"<UNK>{data['account_id']} 开始修改数据状态,修改结果为:{result_sql}")
# if result_sql:
logger.info(f"{data['pad_code']} 开始进行还原")
restore = self.task_all.restore_system(pad_code=[data["pad_code"]], backup_name=res_dict["backup_name"])
logger.info(f"<UNK>{data['pad_code']}还原结果为: {restore}")
if restore:
try:
result_status = self.task_all.check_phone_status(pad_code=[data["pad_code"]], file_name="test_abc.js")
logger.info(f"<UNK>:{data['pad_code']}{result_status}")
if result_status:
logger.info(f"云机:{data['pad_code']} 环境还原成功,开始下载文件:{res_dict['script_name']}")
# 下载执行脚本
self.task_all.upload_file_to_phone([data["pad_code"]], file_name=res_dict["script_name"])
# 设置代理
logger.info(data)
time.sleep(10)
logger.info(f"<UNK>{data['pad_code']} 开始设置代理")
self.task_all.set_network_proxy(pad_code=[data["pad_code"]], country=data["country"])
# 启动对应脚本
time.sleep(10)
self.task_all.async_execute_adb_command(pad_code=data["pad_code"], file_name=res_dict["script_name"])
# 需要新增函数
return True
except Exception as e:
logger.info(f"<UNK>{data['pad_code']} <UNK>{e}")
else:
logger.info(f"云机:{data['pad_code']} 300环境还原失败需要认为干预")
else:
logger.info("未查询到可用还原数据")
return False
if __name__ == '__main__':
pass