import pymysql import json from datetime import datetime, timedelta def get_phone_numbers(): """从userlogin数据库的personnel表获取phoneNumber列表""" conn = None cursor = None phone_numbers = [] try: # 连接userlogin数据库 conn = pymysql.connect( host='1.95.162.61', port=3306, user='root', password='schl@2025', database='userlogin', charset='utf8mb4', cursorclass=pymysql.cursors.DictCursor ) cursor = conn.cursor() # 查询personnel表中的phoneNumber cursor.execute("SELECT phoneNumber FROM personnel WHERE phoneNumber IS NOT NULL AND phoneNumber != ''") # 获取所有结果 results = cursor.fetchall() phone_numbers = [row['phoneNumber'] for row in results] print(f"成功从personnel表获取到 {len(phone_numbers)} 个电话号码") except Exception as e: print(f"获取电话号码时出错: {e}") finally: if cursor: cursor.close() if conn: conn.close() return phone_numbers def get_user_ids_by_phone(phone_numbers): """根据电话号码获取用户ID列表""" conn = None cursor = None user_ids = [] try: # 连接wechat_app数据库(users表在wechat_app中) conn = pymysql.connect( host='1.95.162.61', port=3306, user='root', password='schl@2025', database='wechat_app', charset='utf8mb4', cursorclass=pymysql.cursors.DictCursor ) cursor = conn.cursor() if not phone_numbers: print("没有电话号码可查询") return user_ids # 构建查询语句 placeholders = ', '.join(['%s'] * len(phone_numbers)) query = f"SELECT userId FROM users WHERE phoneNumber IN ({placeholders})" cursor.execute(query, phone_numbers) results = cursor.fetchall() user_ids = [row['userId'] for row in results] print(f"成功根据电话号码获取到 {len(user_ids)} 个用户ID") except Exception as e: print(f"获取用户ID时出错: {e}") finally: if cursor: cursor.close() if conn: conn.close() return user_ids def calculate_active_duration(operations): """计算活跃时长的函数(基于系统的计算逻辑)""" if not operations or len(operations) == 0: return 0 total_duration = 0 # 解析操作记录,提取事件类型 parsed_operations = [] for op in operations: try: original_data = json.loads(op['originalData']) if op['originalData'] else {} # 处理operationTime,确保它是datetime对象 operation_time = op['operationTime'] if isinstance(operation_time, str): operation_time = datetime.strptime(operation_time, '%Y-%m-%d %H:%M:%S') parsed_operations.append({ 'operationTime': operation_time, 'originalData': original_data }) except: # 处理operationTime,确保它是datetime对象 operation_time = op['operationTime'] if isinstance(operation_time, str): operation_time = datetime.strptime(operation_time, '%Y-%m-%d %H:%M:%S') parsed_operations.append({ 'operationTime': operation_time, 'originalData': {} }) # 按时间排序 parsed_operations.sort(key=lambda x: x['operationTime']) # 1. 首先处理带有sessionDuration的app_hide事件 for op in parsed_operations: action = op['originalData'].get('action') if action == 'app_hide' and 'sessionDuration' in op['originalData']: # sessionDuration 单位是毫秒,转换为秒 duration_in_seconds = op['originalData']['sessionDuration'] / 1000 total_duration += duration_in_seconds # 2. 处理app_show没有对应的app_hide事件的情况 current_session_start = None last_action = None for op in parsed_operations: action = op['originalData'].get('action') if action == 'app_show': current_session_start = op['operationTime'] last_action = 'app_show' elif action == 'app_hide': current_session_start = None # 已有sessionDuration,不需要额外计算 last_action = 'app_hide' elif action: last_action = action # 只有当最后一个事件确实是app_show,且没有对应的app_hide时,才计算到现在的时间差 if current_session_start and last_action == 'app_show': now = datetime.now() # 限制最大时间差为30分钟,避免异常值 max_duration_in_seconds = 30 * 60 # 30分钟 duration_in_seconds = min(max_duration_in_seconds, (now - current_session_start).total_seconds()) total_duration += duration_in_seconds # 3. 兜底逻辑:如果没有从app_show/app_hide事件获取到活跃时长,基于操作记录的时间范围计算 if total_duration == 0: # 计算第一条和最后一条操作记录之间的时间差 first_operation_time = parsed_operations[0]['operationTime'] last_operation_time = parsed_operations[-1]['operationTime'] # 限制最大时间差为5分钟,避免异常值 max_duration_in_seconds = 5 * 60 # 5分钟 duration_in_seconds = min(max_duration_in_seconds, max(30, (last_operation_time - first_operation_time).total_seconds())) # 最少30秒,最多5分钟 total_duration = duration_in_seconds # 4. 最终校验:限制单个用户的总活跃时长,避免异常值 max_user_duration_in_seconds = 24 * 60 * 60 # 最多24小时 total_duration = min(max_user_duration_in_seconds, total_duration) return total_duration def delete_user_traces(phone_numbers, days=30): """使用电话号码删除wechat_app数据库的usertraces表记录""" conn = None cursor = None try: # 连接wechat_app数据库 conn = pymysql.connect( host='1.95.162.61', port=3306, user='root', password='schl@2025', database='wechat_app', charset='utf8mb4', cursorclass=pymysql.cursors.DictCursor ) cursor = conn.cursor() # 如果没有电话号码,直接返回 if not phone_numbers: print("没有电话号码可删除") return # 计算时间范围 end_date = datetime.now() start_date = end_date - timedelta(days=days) start_date_str = start_date.strftime('%Y-%m-%d %H:%M:%S') end_date_str = end_date.strftime('%Y-%m-%d %H:%M:%S') print(f"删除时间范围: {start_date_str} 到 {end_date_str}") # 首先查询要删除的记录数量,以便确认操作 placeholders = ', '.join(['%s'] * len(phone_numbers)) count_query = f"SELECT COUNT(*) as count FROM usertraces WHERE phoneNumber IN ({placeholders}) AND operationTime BETWEEN %s AND %s" cursor.execute(count_query, phone_numbers + [start_date_str, end_date_str]) count_result = cursor.fetchone() delete_count = count_result['count'] print(f"\n确认删除: 将删除 {delete_count} 条usertraces记录") print(f"涉及电话号码: {phone_numbers}") # 构建删除语句,使用IN子句批量删除 delete_query = f"DELETE FROM usertraces WHERE phoneNumber IN ({placeholders}) AND operationTime BETWEEN %s AND %s" # 执行删除操作 affected_rows = cursor.execute(delete_query, phone_numbers + [start_date_str, end_date_str]) # 提交事务 conn.commit() print(f"\n删除完成: 成功删除 {affected_rows} 条记录") except Exception as e: print(f"删除usertraces时出错: {e}") # 发生错误时回滚事务 if conn: conn.rollback() finally: if cursor: cursor.close() if conn: conn.close() def delete_user_active_logs(user_ids, days=30): """使用用户ID删除wechat_app数据库的user_active_logs表记录""" conn = None cursor = None try: # 连接wechat_app数据库 conn = pymysql.connect( host='1.95.162.61', port=3306, user='root', password='schl@2025', database='wechat_app', charset='utf8mb4', cursorclass=pymysql.cursors.DictCursor ) cursor = conn.cursor() # 如果没有用户ID,直接返回 if not user_ids: print("没有用户ID可删除") return # 计算时间范围 end_date = datetime.now() start_date = end_date - timedelta(days=days) start_date_str = start_date.strftime('%Y-%m-%d') end_date_str = end_date.strftime('%Y-%m-%d') print(f"删除时间范围: {start_date_str} 到 {end_date_str}") # 首先查询要删除的记录数量,以便确认操作 placeholders = ', '.join(['%s'] * len(user_ids)) count_query = f"SELECT COUNT(*) as count FROM user_active_logs WHERE user_id IN ({placeholders}) AND active_date BETWEEN %s AND %s" cursor.execute(count_query, user_ids + [start_date_str, end_date_str]) count_result = cursor.fetchone() delete_count = count_result['count'] print(f"\n确认删除: 将删除 {delete_count} 条user_active_logs记录") print(f"涉及用户ID数量: {len(user_ids)}") # 构建删除语句,使用IN子句批量删除 delete_query = f"DELETE FROM user_active_logs WHERE user_id IN ({placeholders}) AND active_date BETWEEN %s AND %s" # 执行删除操作 affected_rows = cursor.execute(delete_query, user_ids + [start_date_str, end_date_str]) # 提交事务 conn.commit() print(f"\n删除完成: 成功删除 {affected_rows} 条user_active_logs记录") except Exception as e: print(f"删除user_active_logs时出错: {e}") # 发生错误时回滚事务 if conn: conn.rollback() finally: if cursor: cursor.close() if conn: conn.close() def delete_user_product_views(user_ids, days=30): """使用用户ID删除wechat_app数据库的user_product_views表记录""" conn = None cursor = None try: # 连接wechat_app数据库 conn = pymysql.connect( host='1.95.162.61', port=3306, user='root', password='schl@2025', database='wechat_app', charset='utf8mb4', cursorclass=pymysql.cursors.DictCursor ) cursor = conn.cursor() # 如果没有用户ID,直接返回 if not user_ids: print("没有用户ID可删除") return # 计算时间范围 end_date = datetime.now() start_date = end_date - timedelta(days=days) start_date_str = start_date.strftime('%Y-%m-%d') end_date_str = end_date.strftime('%Y-%m-%d') print(f"删除时间范围: {start_date_str} 到 {end_date_str}") # 首先查询要删除的记录数量,以便确认操作 placeholders = ', '.join(['%s'] * len(user_ids)) count_query = f"SELECT COUNT(*) as count FROM user_product_views WHERE user_id IN ({placeholders}) AND active_date BETWEEN %s AND %s" cursor.execute(count_query, user_ids + [start_date_str, end_date_str]) count_result = cursor.fetchone() delete_count = count_result['count'] print(f"\n确认删除: 将删除 {delete_count} 条user_product_views记录") print(f"涉及用户ID数量: {len(user_ids)}") # 构建删除语句,使用IN子句批量删除 delete_query = f"DELETE FROM user_product_views WHERE user_id IN ({placeholders}) AND active_date BETWEEN %s AND %s" # 执行删除操作 affected_rows = cursor.execute(delete_query, user_ids + [start_date_str, end_date_str]) # 提交事务 conn.commit() print(f"\n删除完成: 成功删除 {affected_rows} 条user_product_views记录") except Exception as e: print(f"删除user_product_views时出错: {e}") # 发生错误时回滚事务 if conn: conn.rollback() finally: if cursor: cursor.close() if conn: conn.close() def main(): """主函数""" print("开始删除定向数据...") # 获取电话号码 phone_numbers = get_phone_numbers() if phone_numbers: # 获取对应的用户ID user_ids = get_user_ids_by_phone(phone_numbers) if user_ids: # 删除user_active_logs表记录 print("\n=== 删除user_active_logs表记录 ===") delete_user_active_logs(user_ids, days=30) # 删除user_product_views表记录 print("\n=== 删除user_product_views表记录 ===") delete_user_product_views(user_ids, days=30) # 删除usertraces表记录 print("\n=== 删除usertraces表记录 ===") delete_user_traces(phone_numbers, days=30) print("\n删除完成!") if __name__ == "__main__": main()