代码提交

This commit is contained in:
liujianjiang 2025-11-26 17:40:11 +08:00
commit ebaabae188
15 changed files with 568 additions and 0 deletions

View File

@ -0,0 +1 @@
# -*- coding: utf-8 -*-

View File

@ -0,0 +1,61 @@
# -*- coding: utf-8 -*-
import os
from typing import Optional, Dict, Any, List
from public_function.asyn_mysql import AsyncMySQL
class DealAccount:
def __init__(self, config_data: Dict[str, Any]):
self.config_data = config_data
self.db_pool: Optional[AsyncMySQL] = AsyncMySQL(self.config_data["advert_policy"])
async def add_account(self, params: List[Dict[str, Any]]):
"""新增账户"""
await self.db_pool.initialize()
result = await self.db_pool.insert_many(table="crawler_account_record_info", data=params)
if result:
return True
return False
async def delete_account(self, params: List[str]):
"""删除账户"""
if len(params) == 1:
condition = "account_id={}".format(params[0])
else:
condition = f"account_id in ({','.join(params)})"
await self.db_pool.initialize()
result = await self.db_pool.delete(table="crawler_account_record_info", where_conditions=condition)
if result:
return True
return False
async def query_account_info(self, params: Dict[str, Any]):
"""查询具体账户信息"""
sql_str = f"""select * from crawler_account_record_info where account_id='{params['account_id']}'"""
await self.db_pool.initialize()
result = await self.db_pool.fetch_all(sql_str, )
if result:
return result
return []
async def obtain_account_info(self, app_name, country, number=1):
"""获取指定个数账户信息"""
sql_str = f"""select account_id,password from crawler_account_record_info
where status=1 and app_name='{app_name}' and country='{country}' limit {number}"""
await self.db_pool.initialize()
result = await self.db_pool.fetch_all(sql_str, )
if result:
return result
return []
async def update_account_info(self, set_param: Dict[str, Any], params):
"""更新账户信息"""
# params = {'name': '张三', 'age': 25}
where_conditions = "account_id = %s"
await self.db_pool.initialize()
affected_rows = await self.db_pool.update(table='crawler_account_record_info', set_columns=set_param,
where_conditions=where_conditions, params=params)
if affected_rows:
return True
return False

View File

@ -0,0 +1 @@
# -*- coding: utf-8 -*-

View File

@ -0,0 +1 @@
# -*- coding: utf-8 -*-

112
main.py Normal file
View File

@ -0,0 +1,112 @@
import os
import yaml
import uuid
import asyncio
import uvicorn
from pathlib import Path
from typing import Dict, Any, Optional, List
from fastapi import FastAPI, HTTPException, Depends
from public_function.public_func import read_config
from model.model import AccountCreate, AccountUpdate, CrawlerTask
app = FastAPI()
def get_config():
"""获取配置文件"""
config_path = os.path.join(Path(__file__).resolve().parent, 'public_function/config.yaml')
try:
# 这里假设read_config函数存在
from public_function.public_func import read_config
return read_config(config_path)
except ImportError:
logger.warning("未找到read_config函数使用默认配置")
return {'host': 'localhost', 'port': 3306, 'user': 'root', 'password': 'password', 'db': 'test_db', 'max_overflow': 10}
def get_account_manager():
"""获取账号管理器实例"""
config = get_config()
try:
from account_management.deal_account import DealAccount
return DealAccount(config)
except ImportError:
logger.warning("未找到DealAccount类返回模拟实例")
return None
def get_task_manager():
"""获任务管理器实例"""
config = get_config()
try:
from task_management.all_task_management import AllTask
return AllTask(config)
except ImportError:
logger.warning("未找到AllTask类返回模拟实例")
return None
# 账号处理相关
@app.get("/obtain_account", summary="获取可用账号")
async def obtain_account(app_name: str, country: str, account_manager: Any = Depends(get_account_manager)):
"""
获取指定应用的可用账号
- **app_name**: 应用名称
"""
if not app_name or not app_name.strip():
raise HTTPException(status_code=400, detail="应用名称不能为空")
if not country or not country.strip():
raise HTTPException(status_code=400, detail="国家不能为空会")
try:
result = await account_manager.obtain_account_info(app_name, country)
if result:
return {"code": 200, "message": "获取账号成功", "data": result[0]}
else:
raise HTTPException(status_code=404, detail="没有可用的账号")
except Exception as e:
print(f"获取账号失败: {e}")
raise HTTPException(status_code=404, detail="{}".format(e))
@app.post("/add_account", summary="新增账号")
async def add_account(account_data: AccountCreate, account_manager: Any = Depends(get_account_manager)):
"""
新增爬虫账号
- **account_id**: 账号ID
- **password**: 密码
- **app_name**: 应用名称
"""
try:
print(account_data.dict())
# 这里应该调用实际的添加账号方法
result = await account_manager.add_account([account_data.dict()])
return {"code": 200, "message": "新增账号成功", "data": result}
except Exception as e:
print(f"新增账号失败: {e}")
raise HTTPException(status_code=500, detail="新增账号失败,失败原因:{}".format(e))
@app.post("/receive_data")
async def receive_data(params: Dict[str, Any]):
"""数据接收接口"""
# 接收道德数据表写入redis后存入mysql(可以根据业务需求确认是否需要永久保存)
@app.get("/crawler_task")
async def crawler_task(task_data: CrawlerTask, task_manager: Any = Depends(get_task_manager)):
"""爬虫任务接口"""
try:
params = task_data.dict()
params['task_id'] = uuid.uuid4().hex
result = await task_manager.task_distribution(params)
if result:
return {"code": 200, "message": "<UNK>", "data": result}
raise HTTPException(status_code=404, detail="抓取商品数据失败,请重新尝试")
except Exception as e:
print(f"<UNK>: {e}")
raise HTTPException(status_code=500, detail="获取数据失败;失败原因{}".format(e))
if __name__ == '__main__':
uvicorn.run(app)

1
model/__init__.py Normal file
View File

@ -0,0 +1 @@
# -*- coding: utf-8 -*-

22
model/model.py Normal file
View File

@ -0,0 +1,22 @@
# -*- coding: utf-8 -*-
from pydantic import BaseModel, Field
# 定义数据模型
class AccountCreate(BaseModel):
account_id: str = Field(..., min_length=1, max_length=128, description="账号ID")
password: str = Field(..., min_length=1, max_length=128, description="密码")
country: str = Field(..., min_length=1, max_length=128, description="账号所在国家")
app_name: str = Field(..., min_length=1, max_length=128, description="应用名称")
class AccountUpdate(BaseModel):
account_id: str = Field(..., description="账号ID")
status: int = Field(..., ge=1, le=2, description="状态1-空闲2-使用中")
class CrawlerTask(BaseModel):
country: str = Field(..., min_length=1, max_length=128, description="账号所在国家")
app_name: str = Field(..., min_length=1, max_length=128, description="应用名称")
goods_id: str = Field(..., min_length=1, max_length=128, description="账号所在国家")
store_id: str = Field(..., min_length=1, max_length=128, description="应用名称")

View File

@ -0,0 +1,16 @@
# param = {'endTime': 1762591513782, 'padCode': 'ACP5B44DWZTR7DP6', 'padStatus': None, 'taskContent': '', 'taskId': 3422505,
# 'taskResult': 'Success', 'taskStatus': 3}
# task_info = {'mirrorName': 'bkp-ACP251016US1U4WP-1762437337310', 'countryCode': 'US',
# 'deviceId': 'ACP251016US1U4WP-2025-11-06 07:48:47-0gsctk80', 'retentionScript': 'reback.js', 'retentionFuture': True,
# 'pad_code': ['ACP5B44DWZTR7DP6'], 'task_id': 'acp5b44dwztr7dp6_d773b1a222d84a089ec38d8dacbd37f2', 'taskId': 3422505,
# 'task_type': 'restore'}
# print(param.get('taskId', 0))
# print(task_info.get('taskId', 0))
# print(param.get("taskResult", ""))
# if param.get('taskId', 0) == task_info.get("taskId", 0) and param.get("taskResult", "") == "Success":
# print(task_info)
# # 需要判断该任务是备份韩式还原
# task_type = task_info.get("task_type", "")
# if task_type == "restore":
# print(f"{get_local_time()};{param['padCode']};实例还原成功")

View File

@ -0,0 +1,142 @@
import asyncio
import aiomysql
from typing import List, Tuple, Dict, Any
class AsyncMySQL:
def __init__(self, config_data: Dict):
self.config = {
'host': config_data['host'],
'port': config_data['port'],
'user': config_data['user'],
'password': config_data['password'],
'db': config_data['db'],
'autocommit': True,
'minsize': 1,
'maxsize': config_data['max_overflow'],
}
self.pool = None
async def initialize(self):
"""初始化连接池"""
self.pool = await aiomysql.create_pool(**self.config)
return self
async def close(self):
"""关闭连接池"""
if self.pool:
self.pool.close()
await self.pool.wait_closed()
async def execute(self, query: str, params=None):
"""执行单条SQL语句"""
async with self.pool.acquire() as conn:
async with conn.cursor() as cursor:
await cursor.execute(query, params)
return cursor.rowcount
async def executemany(self, query: str, params_list: List[Tuple]):
"""批量执行SQL语句"""
async with self.pool.acquire() as conn:
async with conn.cursor() as cursor:
await cursor.executemany(query, params_list)
return cursor.rowcount
async def insert_many_tuple(self, table: str, columns: List[str], data: List[Tuple]):
"""批量插入数据到指定表"""
placeholders = ', '.join(['%s'] * len(columns))
columns_str = ', '.join(columns)
query = f"INSERT INTO {table} ({columns_str}) VALUES ({placeholders})"
return await self.executemany(query, data)
async def insert_many(self, table: str, data: List[Dict[str, Any]]):
columns = list(data[0].keys())
# 从字典数据中提取值,转换为元组列表
params_list = [tuple(record.get(col) for col in columns) for record in data]
placeholders = ', '.join(['%s'] * len(columns))
columns_str = ', '.join(columns)
query = f"INSERT INTO {table} ({columns_str}) VALUES ({placeholders})"
return await self.executemany(query, params_list)
async def fetch_all(self, query: str, params=None) -> List[Dict[str, Any]]:
"""查询多条记录"""
async with self.pool.acquire() as conn:
async with conn.cursor(aiomysql.DictCursor) as cursor:
await cursor.execute(query, params)
return await cursor.fetchall()
async def delete(self, table: str, where_conditions: str = None, params: Tuple = None) -> int:
if where_conditions:
query = f"DELETE FROM {table} WHERE {where_conditions}"
else:
query = f"DELETE FROM {table}"
return await self.execute(query, params)
async def delete_many(self, table: str, conditions_list: List[Tuple[str, Tuple]]) -> int:
total_affected = 0
for where_conditions, params in conditions_list:
affected_rows = await self.delete(table, where_conditions, params)
total_affected += affected_rows
return total_affected
async def update(self, table: str, set_columns: Dict[str, Any], where_conditions: str = None,
params: Tuple = None) -> int:
"""
更新单条记录
:param table: 表名
:param set_columns: 要更新的字段和值字典
:param where_conditions: WHERE条件
:param params: 参数列表
:return: 受影响的行数
"""
set_clause = ', '.join([f"{k} = %s" for k in set_columns])
query = f"UPDATE {table} SET {set_clause}"
if where_conditions:
query += f" WHERE {where_conditions}"
# 构造参数列表
update_params = list(set_columns.values())
if where_conditions:
if params:
update_params.extend(params)
else:
raise ValueError("当使用WHERE条件时必须提供参数")
return await self.execute(query, update_params)
async def update_many(self, table: str, set_columns: Dict[str, Any], conditions_list: List[Tuple[str, Tuple]]) -> int:
"""
批量更新多条记录
:param table: 表名
:param set_columns: 要更新的字段和值字典
:param conditions_list: 条件列表每个元素是(where_conditions, params)
:return: 受影响的总行数
"""
set_clause = ', '.join([f"{k} = %s" for k in set_columns])
total_affected = 0
for where_conditions, params in conditions_list:
query = f"UPDATE {table} SET {set_clause} WHERE {where_conditions}"
update_params = list(set_columns.values())
if params:
update_params.extend(params)
affected_rows = await self.execute(query, update_params)
total_affected += affected_rows
return total_affected
if __name__ == '__main__':
from public_function.public_func import read_config
config = read_config(r"C:\workfile\crawler_task_management\public_function\config.yaml")
obj = AsyncMySQL(config['advert_policy'])
sql_str = f"""select account_id,password from crawler_account_record_info
where status=1 and app_name='xiapi' limit 1"""
obj.fetch_all(sql_str)

View File

@ -0,0 +1,21 @@
advert_policy:
host: pc-2ze85s1hw783u87wlo.mysql.polardb.rds.aliyuncs.com
port: 3306
user: policyuser
password: IvM@ck#z9$Eqy3KGBK74hk
db: advert_policy
table: save_create_device_info
# 连接池配置
pool_size: 10
max_overflow: 20
pool_recycle: 3600
pool_timeout: 30
access_key_id: LTAI5tA92Av7DQmSQY2MTJPe
access_key_secret: KI5s3C78HcPX9MDjUwPwVDytFzxEjY
redis_config:
host: 182.92.181.218
port: 6372
password: qaz@wsx
db: 0

View File

@ -0,0 +1,12 @@
# -*- coding: utf-8 -*-
import yaml
from pathlib import Path
def read_config(path):
if Path(path).exists():
with open(path, encoding="utf-8") as f:
config = yaml.safe_load(f)
return config
else:
raise FileNotFoundError

View File

@ -0,0 +1,72 @@
import os
import json
import redis
from typing import List, Dict, Any, Optional
class RedisTaskManager:
def __init__(self, config_data: Dict[str, Any]):
self.config_data = config_data
"""初始化Redis连接"""
self.redis_client = redis.Redis(
host=self.config_data['redis_config']['host'],
port=self.config_data['redis_config']['port'],
password=self.config_data['redis_config']['password'],
db=self.config_data['redis_config']['db'],
decode_responses=True
)
self.expire_hours = 24 # 过期时间24小时
def write_data(self, key: str, data: Any, expire_time: Optional[int] = None):
"""
写入数据到Redis设置过期时间
Args:
key: Redis键名
data: 要存储的数据
expire_time: 自定义过期时间默认24小时
Returns:
bool: 写入是否成功
"""
try:
# 如果数据是字典或列表先序列化为JSON
if isinstance(data, (dict, list)):
data_str = json.dumps(data, ensure_ascii=False)
else:
data_str = str(data)
# 设置过期时间,优先使用自定义时间
expire_seconds = expire_time if expire_time is not None else self.expire_hours * 3600
result = self.redis_client.setex(key, expire_seconds, data_str)
if result:
print(f"数据写入成功 - 键: {key}, 过期时间: {expire_seconds}")
return True
else:
print(f"数据写入失败 - 键: {key}")
return False
except Exception as e:
print(f"写入Redis数据时发生错误: {e}")
return False
def read_data(self, key: str):
"""
从Redis读取数据
Args:
key: Redis键名
Returns:
Optional[Any]: 读取到的数据如果键不存在返回None
"""
try:
data_str = self.redis_client.get(key)
if data_str is None:
print(f"键不存在或已过期 - 键: {key}")
return None
# 尝试解析JSON数据
try:
return json.loads(data_str)
except json.JSONDecodeError:
# 如果不是JSON格式返回原始字符串
return data_str
except Exception as e:
print(f"读取Redis数据时发生错误: {e}")
return None

View File

@ -0,0 +1,61 @@
CREATE TABLE crawler_device_info_record
(
id INT(11) NOT NULL AUTO_INCREMENT COMMENT '表自增ID',
pad_code VARCHAR(50) NOT NULL COMMENT '设备唯一标识',
name VARCHAR(100) NOT NULL DEFAULT '' COMMENT '设备名称',
status int NOT NULL DEFAULT 1 COMMENT '任务状态1、空闲 2、使用中',
create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
update_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (id),
UNIQUE KEY uk_pad_code (pad_code),
KEY idx_status (status)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='爬虫设备信息表';
create table crawler_task_record_info
(
id INT(11) NOT NULL AUTO_INCREMENT COMMENT '表自增ID',
task_id varchar(50) NOT NULL COMMENT '任务ID',
app_name VARCHAR(50) NOT NULL COMMENT 'app名称',
country VARCHAR(50) NOT NULL COMMENT '国家',
status int NOT NULL DEFAULT 1 COMMENT '任务状态1、开始执行2、执行成功3、执行失败',
create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
update_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (id),
UNIQUE KEY uk_task_id (task_id),
KEY idx_status (status)
)ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='爬虫任务执行记录表';
CREATE TABLE crawler_account_record_info
(
id INT(11) NOT NULL AUTO_INCREMENT COMMENT '表自增ID',
account_id VARCHAR(50) NOT NULL COMMENT '账号名称',
password VARCHAR(50) NOT NULL COMMENT '账号密码',
country VARCHAR(50) NOT NULL COMMENT '国家',
status INT NOT NULL DEFAULT 1 COMMENT '任务状态1、空闲2、使用中',
app_name VARCHAR(50) NOT NULL COMMENT 'app名称',
create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
update_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (id),
UNIQUE KEY uk_account_id (account_id,app_name),
KEY idx_status (status),
KEY idx_country (country)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='爬虫账号记录表';
CREATE TABLE goods_information_record
(
id INT(11) NOT NULL AUTO_INCREMENT COMMENT '表自增ID',
goods_id VARCHAR(50) NOT NULL COMMENT '商品ID',
store_id VARCHAR(50) NOT NULL COMMENT '店铺ID',
country VARCHAR(50) NOT NULL COMMENT '国家',
app_name VARCHAR(50) NOT NULL COMMENT 'app名称',
goods_info text NOT NULL COMMENT '商品具体价格详情等信息',
create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
update_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (id),
UNIQUE KEY uk_goods_info (goods_id,store_id,country,app_name),
KEY idx_status (status),
KEY idx_country (country)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='爬虫账号记录表';

View File

@ -0,0 +1 @@
# -*- coding: utf-8 -*-

View File

@ -0,0 +1,44 @@
# -*- coding: utf-8 -*-
import time
from typing import Dict, Any
from public_function.asyn_mysql import AsyncMySQL
from public_function.redis_task_manager import RedisTaskManager
class AllTask:
def __init__(self, config_data: Dict[str, Any]):
self.config_data = config_data
self.redis_conn = RedisTaskManager(self.config_data["redis_config"])
self.db_pool: Optional[AsyncMySQL] = AsyncMySQL(self.config_data["advert_policy"])
async def deal_shopee_task(self, param):
# 查询redis数据库redis 数据库存在该数据直接返回
key = f"{param['app_name']}:{param['store_id']}:{param['goods_id']}"
result = self.redis_conn.read_data(key)
if result:
return result
# 调用对应爬虫任务
# 任务结束后开始等待
endtime = time.time() + 55
while time.time() < endtime:
await time.sleep(1)
result = self.redis_conn.read_data(key)
if result:
return result
return []
async def task_distribution(self, data: Dict[str, Any]):
# 需要对任务进行记录
try:
await self.db_pool.initialize()
# 将任务记录到mysql
param = {"app_name": data["app_name"], "task_id": data["task_id"], "country": data["country"]}
await self.db_pool.insert_many(table="crawler_task_record_info", data=param)
except Exception as e:
print("将任务记录到数据库失败,失败原因为:{}".format(e))
if param["app_name"] == "Shopee":
result = await self.deal_shopee_task(param)
if result:
return result
return None