最近心血来潮,想通过Python编写一个微信机器人。网上搜索了一下可以使用itchat这个包来完成:
https://github.com/why2lyj/ItChat-UOS
需要注意的时普通的itchat包已经用不了了,因为微信在很久以前就关闭了通过网络登录的途径,但是上面这个UOS的版本,在我博客发布的时候还是可以正常使用的:
pip install itchat-uos==1.5.0.dev0
注意需要安装这个1.5.0版本,直接安装是1.4版本实测已经无法使用(会提示wxid相关报错)
我全部的微信机器人代码在github上:
www.baidu.com(还没建)
以下是我主要部分的关键代码,直接让gpt-o1帮忙写的。主要有以下几点:
1.只处理私聊和@的信息
2.通过多线程和队列处理信息,使用锁确保一时间同一人只能占用一个线程。因为后面会涉及到进行游戏和计算游戏豆的模块
3.不同的模块使用同一的类名和方法,仅文件名不同,这样可以通过动态导入的方式往里面加自定义的模块
4.在初始化的时候需要扫描module文件夹下的所有模块,这些模块会以单例模式初始化,然后把对象保存在字典里,通过命令调用指定的模块
5.数据库的部分用sqlite3解决
"""
启动微信机器人并导入不同的功能模块
"""
import traceback
import threading
import queue
import logging
import itchat
from itchat.content import TEXT
from utils.scan_module import get_command_module_dict# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(threadName)s %(message)s')class WeChatBot:"""微信机器人"""def __init__(self):# 功能模块映射,根据消息前缀映射到对应的模块名self.module_mapping = get_command_module_dict()# 定义消息队列self.message_queue = queue.Queue()self.num_worker_threads = 5 # 工作线程数# 初始化发送者锁字典和锁self.sender_locks = {}self.sender_locks_lock = threading.Lock()self.handle_private_message = Noneself.handle_group_message = None# 初始化 itchatitchat.auto_login(hotReload=False)# 注册消息处理函数self.register_handlers()# 启动消息处理线程self.start_worker_threads()def register_handlers(self):"""注册消息处理器"""# 由于装饰器的使用,我们需要将函数定义在这里,并使用 self 作为参数@itchat.msg_register(TEXT, isFriendChat=True)def handle_private_message(msg):# 将消息放入队列,不直接处理self.message_queue.put(('private', msg))@itchat.msg_register(TEXT, isGroupChat=True)def handle_group_message(msg):# 将消息放入队列,不直接处理if msg['IsAt']:self.message_queue.put(('group', msg))# 将函数绑定到实例self.handle_private_message = handle_private_messageself.handle_group_message = handle_group_messagedef start_worker_threads(self):"""启动工作线程"""for i in range(self.num_worker_threads):thread_pool = threading.Thread(target=self.message_worker, name=f'Worker-{i + 1}')thread_pool.daemon = Truethread_pool.start()def message_worker(self):"""通过多线程和队列处理信息,使用锁确保一时间同一人只能占用一个线程"""while True:try:msg_type, msg_data = self.message_queue.get(timeout=1)sender_id = msg_data['FromUserName']# 获取或创建发送者的锁with self.sender_locks_lock:if sender_id not in self.sender_locks:self.sender_locks[sender_id] = threading.Lock()sender_lock = self.sender_locks[sender_id]# 使用发送者的锁,确保同一时间只有一个线程处理该发送者的消息with sender_lock:if msg_type == 'private':self.handle_private_message_worker(msg_data)elif msg_type == 'group':self.handle_group_message_worker(msg_data)self.message_queue.task_done()except queue.Empty:continueexcept Exception as exception:logging.error("消息处理时发生异常:%s", exception)def handle_private_message_worker(self, msg):"""处理私聊消息"""sender = msg['User']nickname = sender['NickName']content = msg['Text']logging.info("私聊消息 - 来自 %s:%s", nickname, content)reply = self.generate_reply(nickname, content)itchat.send(reply, toUserName=sender['UserName'])def handle_group_message_worker(self, msg):"""处理群消息"""if msg['IsAt']:group_name = msg['User']['NickName']sender_nickname = msg['ActualNickName']actual_content = msg['Content']# 获取自己的昵称my_nickname = itchat.search_friends()['NickName']# 去除@信息,提取实际内容content = actual_content.replace(f'@{my_nickname}', '').strip()logging.info("群聊消息 - %s 中 @%s 说:%s", group_name, sender_nickname, content)reply_content = self.generate_reply(sender_nickname, content)reply = f"@{sender_nickname} {reply_content}"itchat.send(reply, toUserName=msg['FromUserName'])def generate_reply(self, nickname, content):"""调用不同的功能模块,处理消息生成回复"""try:command_sign = content.split(" ")[0]if command_sign in self.module_mapping:module_instance = self.module_mapping[command_sign]reply = module_instance.process_messages(nickname, content)else:reply = "对不起,暂时还没有这个功能"return replyexcept Exception as exception:print(traceback.format_exc())logging.error("处理模块时发生异常:%s", exception)return '抱歉,出现了一些错误。'@staticmethoddef run():"""开始运行机器人"""while True:try:itchat.run(blockThread=True)except KeyboardInterrupt:# 如果用户手动中断,退出循环logging.info("微信机器人已停止")breakexcept Exception as exception:logging.error("主循环发生异常:%s", exception)if 'request' in str(exception) or 'Logout' in str(exception) or 'login' in str(exception).lower():try:itchat.auto_login(hotReload=True)logging.info("重新登录成功")except Exception as login_exception:logging.error("重新登录失败:%s", login_exception)print("无法重新登录,程序即将退出")break # 退出循环,不再尝试重新登录else:logging.error("无法识别的异常,未能重新登录:%s", exception)print("遇到无法识别的异常,程序即将退出")break # 退出循环,不再尝试if __name__ == '__main__':bot = WeChatBot()bot.run()
比如以下就是一个自定义的21点游戏模块:
import random
# 请确保 BeanManager 类的代码已经正确导入或者定义
from utils.bean_actions import BeanManager # 导入你的 BeanManager 类class FunctionModule:"""FunctionModule 类,实现线程安全的单例模式。"""_instance = None# 命令标识,用于标注什么样的命令开头会调用这个功能模块# 如@机器人 21点 或者@机器人 blackjack就会触发这个模块_command_sign = ["21点", "blackjack", "要牌", "停牌"]# 如果未激活那么不会使用is_active = Truedef __new__(cls):"""线程安全的单例实现。"""if cls._instance is None:cls._instance = super(FunctionModule, cls).__new__(cls)return cls._instancedef __init__(self):"""初始化方法。"""if not hasattr(self, '_initialized'):self._initialized = True# 用户游戏状态存储,key为用户昵称,value为玩家状态self.user_states = {}# 初始化一副牌self.deck = self.create_deck()self.help_commands_set = set()for command in self._command_sign:# 构建帮助命令的各种可能格式help_variants = [f"{command} 介绍",f"{command} 帮助",f"{command} 说明",f"{command} help",f"{command} 功能"]# 将所有变体转换为小写并添加到集合中,确保大小写不敏感for variant in help_variants:self.help_commands_set.add(variant.lower())# 初始化 BeanManager 实例self.bean_manager = BeanManager()def get_command_sign(self):"""返回当前模块的命令标识"""return self._command_sign@staticmethoddef get_simple_description():"""返回简单的功能描述"""return "21点游戏"@staticmethoddef get_detail_description():"""返回详细的功能描述"""description_string = """一个简单的21点(Blackjack)游戏。你可以发送'开始21点'来开始游戏,'要牌'来获取一张新牌,'停牌'来结束当前回合。
玩家与庄家比较牌面点数,最接近21点而不爆牌(超过21点)的玩家获胜。庄家在16点或以下必须继续要牌,17点或以上则停牌。
A:两种方式,可以作为11点(软手),亦作为1点(硬手)。2-10:牌面点数即其数值。J、Q、K:每张牌的点数为10点。"""return description_string.strip()def process_messages(self, sender_nickname, content):"""根据发消息人的昵称和发消息的内容,制作回复"""# 初始化玩家状态,如果不存在if sender_nickname not in self.user_states:self.user_states[sender_nickname] = {'player_hand': [],'dealer_hand': [],'in_game': False,'deck': [],'bet_amount': None # 押注金额,None 表示未押注}user_state = self.user_states[sender_nickname]reply_string = ""content_lower = content.strip().lower()if content_lower in self.help_commands_set:return self.get_detail_description()# 解析用户输入,检查是否包含押注金额if any(content_lower.startswith(cmd) for cmd in ['21点', 'blackjack']):# 解析押注金额parts = content.strip().split()bet_amount = Noneif len(parts) >= 2:# 尝试解析押注金额bet_input = parts[1]try:# 支持多种押注指令,如'押1000','赌1000',或直接'1000'if bet_input.startswith(('押', '赌')):bet_amount = int(bet_input[1:])else:bet_amount = int(bet_input)except ValueError:reply_string = "请输入有效的押注金额,例如:'21点 押1000',或直接发送'21点'开始游戏。"return reply_stringif user_state['in_game']:reply_string = "你已经在游戏中了!"else:if bet_amount is not None:# 用户选择押注,检查豆子余额total_beans = self.bean_manager.get_bean_count(sender_nickname)if total_beans < bet_amount:reply_string = f"你的豆子不足!当前豆子:{total_beans},需要押注:{bet_amount}"return reply_stringelif bet_amount <= 0:reply_string = "押注金额必须大于0!"return reply_string# 扣除押注金额self.bean_manager.add_beans(sender_nickname, -bet_amount)user_state['bet_amount'] = bet_amountelse:# 用户未选择押注,设置押注金额为 Noneuser_state['bet_amount'] = None# 重新创建并洗牌user_state['deck'] = self.create_deck()random.shuffle(user_state['deck'])user_state['player_hand'] = []user_state['dealer_hand'] = []user_state['in_game'] = True# 玩家发两张初始牌user_state['player_hand'].append(self.draw_card(user_state['deck']))user_state['player_hand'].append(self.draw_card(user_state['deck']))# 庄家发两张牌,一张明牌,一张暗牌user_state['dealer_hand'].append(self.draw_card(user_state['deck']))user_state['dealer_hand'].append(self.draw_card(user_state['deck']))player_total = self.calculate_total(user_state['player_hand'])dealer_visible_card = user_state['dealer_hand'][0]if bet_amount is not None:potential_win = bet_amount * 2 * 0.95 # 计算可能的赢取金额,扣除5%抽水reply_string = (f"游戏开始!你押注了 {bet_amount} 豆子,可赢取 {int(potential_win)} 豆子(扣除5%抽水)。\n")else:reply_string = "游戏开始!\n"reply_string += (f"你的手牌是:{self.format_hand(user_state['player_hand'])},总点数:{player_total}。\n"f"庄家明牌:{dealer_visible_card}。\n""你可以选择'要牌'或者'停牌'。")elif content_lower == '要牌':if not user_state['in_game']:reply_string = "你还没有开始游戏,请发送'21点'或'21点 押注金额'来开始游戏。"else:user_state['player_hand'].append(self.draw_card(user_state['deck']))player_total = self.calculate_total(user_state['player_hand'])if player_total > 21:reply_string = (f"你抽到了 {self.format_hand([user_state['player_hand'][-1]])},你的手牌是:{self.format_hand(user_state['player_hand'])},总点数 {player_total}。\n""爆掉了!你输了!\n")if user_state['bet_amount'] is not None:reply_string += f"你失去了 {user_state['bet_amount']} 豆子。\n"reply_string += "游戏结束。"user_state['in_game'] = Falseuser_state['bet_amount'] = Noneelif player_total == 21:reply_string = (f"你抽到了 {self.format_hand([user_state['player_hand'][-1]])},你的手牌是:{self.format_hand(user_state['player_hand'])},总点数 {player_total}。\n""恭喜你,Blackjack!你赢了!\n")if user_state['bet_amount'] is not None:winnings = int(user_state['bet_amount'] * 2 * 0.95)self.bean_manager.add_beans(sender_nickname, winnings)reply_string += f"你赢得了 {winnings} 豆子(扣除5%抽水)。\n"reply_string += "游戏结束。"user_state['in_game'] = Falseuser_state['bet_amount'] = Noneelse:reply_string = (f"你抽到了 {self.format_hand([user_state['player_hand'][-1]])},你的手牌是:{self.format_hand(user_state['player_hand'])},总点数 {player_total}。\n""你可以继续选择'要牌'或者'停牌'。")elif content_lower == '停牌':if not user_state['in_game']:reply_string = "你还没有开始游戏,请发送'21点'或'21点 押注金额'来开始游戏。"else:player_total = self.calculate_total(user_state['player_hand'])# 庄家揭开暗牌并进行操作dealer_hand = user_state['dealer_hand']dealer_total = self.calculate_total(dealer_hand)while dealer_total < 17:dealer_card = self.draw_card(user_state['deck'])dealer_hand.append(dealer_card)dealer_total = self.calculate_total(dealer_hand)reply_string = (f"你的手牌是:{self.format_hand(user_state['player_hand'])},总点数 {player_total}。\n"f"庄家的手牌是:{self.format_hand(dealer_hand)},总点数 {dealer_total}。\n")if dealer_total > 21 or player_total > dealer_total:reply_string += "恭喜你,你赢了!\n"if user_state['bet_amount'] is not None:winnings = int(user_state['bet_amount'] * 2 * 0.95)self.bean_manager.add_beans(sender_nickname, winnings)reply_string += f"你赢得了 {winnings} 豆子(扣除5%抽水)。\n"elif player_total < dealer_total:reply_string += "很遗憾,你输了!\n"if user_state['bet_amount'] is not None:reply_string += f"你失去了 {user_state['bet_amount']} 豆子。\n"else:reply_string += "平局!\n"if user_state['bet_amount'] is not None:# 返还押注金额self.bean_manager.add_beans(sender_nickname, user_state['bet_amount'])reply_string += f"你的押注 {user_state['bet_amount']} 豆子已返还。\n"reply_string += "游戏结束。"user_state['in_game'] = Falseuser_state['bet_amount'] = None# 清空手牌,不清空牌库user_state['player_hand'] = []user_state['dealer_hand'] = []else:reply_string = "无法识别的指令。你可以发送'21点'或'21点 押注金额'来开始游戏,'要牌'来获取一张新牌,'停牌'来结束当前回合。"return reply_string@staticmethoddef create_deck():"""创建一副52张的扑克牌"""suits = ['♠️', '♥️', '♣️', '♦️']ranks = ['A'] + [str(n) for n in range(2, 11)] + ['J', 'Q', 'K']deck = []for suit in suits:for rank in ranks:deck.append(f"{suit}{rank}")return deckdef draw_card(self, deck):"""从牌堆中抽一张牌"""if len(deck) == 0:# 如果牌堆没牌了,重新创建并洗牌deck.extend(self.create_deck())random.shuffle(deck)return deck.pop()@staticmethoddef calculate_total(hand):"""计算手牌的总点数,处理A的情况(A可以是1也可以是11)"""total = 0aces = 0for card in hand:rank = card[2:] if card[1] in ['️'] else card[1:] # 处理可能的特殊字符if rank in ['J', 'Q', 'K']:total += 10elif rank == 'A':aces += 1total += 11else:total += int(rank)# 如果总点数超过21,且有A,把A当1处理while total > 21 and aces > 0:total -= 10aces -= 1return total@staticmethoddef format_hand(hand):"""格式化手牌输出"""return '、'.join(hand)@staticmethoddef get_card_value(card):"""获取单张牌的点数, 用于显示庄家的明牌点数"""rank = card[2:] if card[1] in ['️'] else card[1:]if rank in ['J', 'Q', 'K']:return 10elif rank == 'A':return 11else:return int(rank)
今天上班时间结束了,明天继续摸鱼继续写