本文主要为大家介绍一下如何实现一个多人聊天室(server+client),感兴趣的小伙js伴可以了解下
效果图
通过本地服务器以用户名登录
实现关键代码
支持群聊和私聊
sever端代码:
import socket import threading from datetime import datetime from collections import OrderedDict class ChatServer: def __init__(self, host='0.0.0.0', port=50000): self.clients = OrderedDict() self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.server.bind((host, port)) self.server.listen(5) print(f" 服务端已启动在 {host}:{port}") def handle_client(self, conn, addr): username = None try: username = conn.recv(1024).decode().strip() if not username or username in self.clients: conn.send("USERNAME_INVALID".encode()) conn.close() return conn.send("CONNECT_SUCCESS".encode()) self.clients[username] = conn self.broadcast_system_msg(f" {username} 进入聊天室") self.broadcast_userlist() while True: try: data = conn.recv(4096) if not data: break msg = data.decode().strip() if msg == "HEARTBEAT": conn.send(b"HEARphpTBEAT_ACK") elif msg.startswith("@@"): target, _, content = msg[2:].partition(' ') self.handle_private(username, target, content) else: self.broadcast_msg(f"{username}:{msg}") except Exception as e: print(f"处理错误:{str(e)}") break except ConnectionResetError: print(f"❌ {username} 异常断开") finally: if username in self.clients: del self.clients[username] self.broadcast_system_msg(f" {username} 离开聊天室") self.broadcast_userlist() conn.close() def handle_private(self, sender, target, content): if target in self.clients: timestamp = datetime.now().strftime("%H:%M:%S") msg = f"[{timestamp}] [私聊] {sender} -> 你:{content}" self.clients[target].send(msg.encode()) self.clients[sender].send(msg.encode()) else: self.clients[sender].send(f"用户 {target} 不在线".encode()) def broadcast_msg(self, msg): timestamp = datetime.now().strftime("%H:%M:%S") full_msg = f"[{timestamp}] {msg}\n" for client in self.clients.values(): try: client.send(full_msg.encode()) except: pass def broadcast_system_msg(self, msg): full_msg = f"SYSTEM:{msg}\n" for client in self.clients.values(): try: client.send(full_msg.encode()) except: pass def broadcast_userlist(self): user_list = ",".join(self.clients.keys()) msg = f"USERLIST:{user_list}\n" for client in self.clients.values(): try: client.send(msg.encode()) except: pass def start(self): while True: conn, addr = self.server.accept() threading.Thread(target=self.handle_client, args=(conn, addr)).start() if __name__ == "__main__": ChatServer().start()
客户端代码:
import tkinter as tk from tkinter import ttk, scrolledtext, messagejYAwQebox import socket import threading class ChatClient: def __init__(self, master): self.master = master self.client_socket = None self.username = "" self.running = False master.title("在线聊天室") master.geometry("900x600") self.create_widgets() self.show_login_dialog() def create_widgets(self): self.user_frame = ttk.Frame(self.master, width=200) self.user_frame.pack(side=tk.LEFT, fill=tk.Y) self.user_list = ttk.Treeview(self.user_frame, show="tree", selectmode='browse') self.user_list.pack(expand=True, fill=tk.BOTH) self.user_list.bind('<<TreeviewSelect>>', self.select_user) self.chat_frame = ttk.Frame(self.master) self.chat_frame.pack(expand=True, fill=tk.BOTH) self.chat_area = scrolledtext.ScrolledText(self.chat_frame, state=tk.DISABLED) self.chat_area.pack(expand=True, fill=tk.BOTH) # 输入框和按钮框架 input_frame = ttk.Frame(self.chat_frame) input_frame.pack(fill=tk.X, pady=5) self.msg_entry = ttk.Entry(input_frame) self.msg_entry.pack(side=tk.LEFT, expand=True, fill=tk.X) self.msg_entry.bind("<Return>", self.send_message) # 添加发送按钮 send_btn = ttk.Button(input_frame, text="发送", command=self.send_message) send_btn.pack(side=tk.RIGHT, padx=5) def select_user(self, event): selected = self.user_list.selection() if selected: target = self.user_list.item(selected[0])['text'] current = self.msg_entry.get() self.msg_entry.delete(0, tk.END) self.msg_entry.insert(0, f"@@{target} " if not current.startswith("@") else "") def show_login_dialog(self): self.login_dialog = tk.Toplevel(self.master) self.login_dialog.title("登录") ttk.Label(self.login_dialog, text="服务器地址:").grid(row=0, column=0, padx=5, pady=5) self.server_entry = ttk.Entry(self.login_dialog) self.server_entry.insert(0, "127.0.0.1") self.server_entry.grid(row=0, column=1, padx=5, pady=5) ttk.Label(self.login_dialog, text="端口号:").grid(row=1, column=0, padx=5, pady=5) self.port_entjavascriptry = ttk.Entry(self.login_dialog) self.port_entry.insert(0, "50000") self.port_entry.grid(row=1, column=1, padx=5, pady=5) ttk.Label(self.login_dialog, text="用户名:").grid(row=2, column=0, padx=5, pady=5) self.username_entry = ttk.Entry(self.login_dialog) self.username_entry.grid(row=2, column=1, padx=5, pady=5) ttk.Button(self.login_dialog, text="登录", command=self.connect_server).grid(row=3, columnspan=2, pady=10) def connect_server(self): server = self.server_entry.get() port = self.port_entry.get() self.username = self.username_entry.get().strip() if not self.username: messagebox.showerror("错误", "用户名不能为空") return try: self.client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.client_socket.connect((server, int(port))) self.client_socket.send(self.username.encode()) response = self.client_socket.recv(1024).decode() if response != "CONNECT_SUCCESS": messagebox.showerror("错误", f"连接失败: {response}") return self.running = True self.login_dialog.destroy() self.master.title(f"在线聊天室 - {self.username}") threading.Thread(target=self.receive_messages, daemon=True).start() except Exception as e: messagebox.showerror("连接失败", str(e)) if self.client_socket: self.client_socket.close() def receive_messages(self): buffer = "" while self.running: try: data = self.client_socket.recv(4096) if not data: break buffer += data.decode() while "\n" in buffer: msg, buffer = buffer.split("\n", 1) if msg.startswith("USERLIST:"): self.update_user_list(msg[9:].split(',')) elif msg.startswith("SYSTEM:"): self.display_system_msg(msg[7:]) else: self.display_message(msg) except: break def update_user_list(self, users): current = set(self.user_list.get_children()) online = set(编程客栈users) for user in current - online: self.user_list.delete(user) for user in online - current: self.user_list.insert("", "end", iid=user, text=user) def display_message(self, msg): self.chat_area.config(state=tk.NORMAL) self.chat_area.insert(tk.END, msg + "\n") self.chat_area.see(tk.END) self.chat_area.config(state=tk.DISABLED) def display_system_msg(self, msg): self.chat_area.config(state=tk.NORMAL) self.chat_area.insert(tk.END, f"【系统】{msg}\n", 'system') self.chat_area.see(tk.END) self.chat_area.config(state=tk.DISABLED) def send_message(self, event=None): msg = self.msg_entry.get().strip() if msg: try: self.client_socket.send(f"{msg}\n".encode()) self.msg_entry.delete(0, tk.END) if msg.startswith("@@"): self.display_message(f"[我] 私聊 {msg[2:].split(' ')[0]}:{' '.join(msg.split()[1:])}") else: self.display_message(f"[我]:{msg}") except Exception as e: messagebox.showerror("发送失败", str(e)) if __name__ == "__main__": root = tk.Tk() app = ChatClient(root) root.mainloop()
以上就是基于python实现多人聊天室的示例代码的详细内容,更多关于Python多人聊天室的资料请关注编程客栈(www.devze.com)其它相关文章!
精彩评论