开发者

基于Python实现多人聊天室的示例代码

开发者 https://www.devze.com 2025-03-19 09:59 出处:网络 作者: 席子哥哥的代码库
本文主要为大家介绍一下如何实现一个多人聊天室(server+client),感兴趣的小伙js伴可以了解下

本文主要为大家介绍一下如何实现一个多人聊天室(server+client),感兴趣的小伙js伴可以了解下

效果图

基于Python实现多人聊天室的示例代码

通过本地服务器以用户名登录

基于Python实现多人聊天室的示例代码

实现关键代码

支持群聊和私聊

sever端代码:

import socket
import threading
from datetime import datetime
from collections import OrderedDict
 
class ChatServer:
    def __init__(self, host='0.0.0.0', port=50000):
        self.clients = OrderedDict()
        self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.server.bind((host, port))
        self.server.listen(5)
        print(f" 服务端已启动在 {host}:{port}")
 
    def handle_client(self, conn, addr):
        username = None
        try:
            username = conn.recv(1024).decode().strip()
            if not username or username in self.clients:
                conn.send("USERNAME_INVALID".encode())
                conn.close()
                return
 
            conn.send("CONNECT_SUCCESS".encode())
            self.clients[username] = conn
            self.broadcast_system_msg(f" {username} 进入聊天室")
            self.broadcast_userlist()
 
            while True:
                try:
                    data = conn.recv(4096)
                    if not data: break
                    
                    msg = data.decode().strip()
                    if msg == "HEARTBEAT":
                        conn.send(b"HEARphpTBEAT_ACK")
                    elif msg.startswith("@@"):
                        target, _, content = msg[2:].partition(' ')
                        self.handle_private(username, target, content)
                    else:
                        self.broadcast_msg(f"{username}:{msg}")
 
                except Exception as e:
                    print(f"处理错误:{str(e)}")
                    break
 
        except ConnectionResetError:
            print(f"❌ {username} 异常断开")
        finally:
            if username in self.clients:
                del self.clients[username]
                self.broadcast_system_msg(f" {username} 离开聊天室")
                self.broadcast_userlist()
            conn.close()
 
    def handle_private(self, sender, target, content):
        if target in self.clients:
            timestamp = datetime.now().strftime("%H:%M:%S")
            msg = f"[{timestamp}] [私聊] {sender} -> 你:{content}"
            self.clients[target].send(msg.encode())
            self.clients[sender].send(msg.encode())
        else:
            self.clients[sender].send(f"用户 {target} 不在线".encode())
 
    def broadcast_msg(self, msg):
        timestamp = datetime.now().strftime("%H:%M:%S")
        full_msg = f"[{timestamp}] {msg}\n"
        for client in self.clients.values():
            try: client.send(full_msg.encode())
            except: pass
 
    def broadcast_system_msg(self, msg):
        full_msg = f"SYSTEM:{msg}\n"
        for client in self.clients.values():
            try: client.send(full_msg.encode())
            except: pass
 
    def broadcast_userlist(self):
        user_list = ",".join(self.clients.keys())
        msg = f"USERLIST:{user_list}\n"
        for client in self.clients.values():
            try: client.send(msg.encode())
            except: pass
 
    def start(self):
        while True:
            conn, addr = self.server.accept()
            threading.Thread(target=self.handle_client, args=(conn, addr)).start()
 
if __name__ == "__main__":
    ChatServer().start() 

客户端代码:

import tkinter as tk
from tkinter import ttk, scrolledtext, messagejYAwQebox
import socket
import threading
 
class ChatClient:
    def __init__(self, master):
        self.master = master
        self.client_socket = None
        self.username = ""
        self.running = False
        
        master.title("在线聊天室")
        master.geometry("900x600")
        self.create_widgets()
        self.show_login_dialog()
 
    def create_widgets(self):
        self.user_frame = ttk.Frame(self.master, width=200)
        self.user_frame.pack(side=tk.LEFT, fill=tk.Y)
        
        self.user_list = ttk.Treeview(self.user_frame, show="tree", selectmode='browse')
        self.user_list.pack(expand=True, fill=tk.BOTH)
        self.user_list.bind('<<TreeviewSelect>>', self.select_user)
 
        self.chat_frame = ttk.Frame(self.master)
        self.chat_frame.pack(expand=True, fill=tk.BOTH)
        
        self.chat_area = scrolledtext.ScrolledText(self.chat_frame, state=tk.DISABLED)
        self.chat_area.pack(expand=True, fill=tk.BOTH)
        
        # 输入框和按钮框架
        input_frame = ttk.Frame(self.chat_frame)
        input_frame.pack(fill=tk.X, pady=5)
        
        self.msg_entry = ttk.Entry(input_frame)
        self.msg_entry.pack(side=tk.LEFT, expand=True, fill=tk.X)
        self.msg_entry.bind("<Return>", self.send_message)
        
        # 添加发送按钮
        send_btn = ttk.Button(input_frame, text="发送", command=self.send_message)
        send_btn.pack(side=tk.RIGHT, padx=5)
 
    def select_user(self, event):
        selected = self.user_list.selection()
        if selected:
            target = self.user_list.item(selected[0])['text']
            current = self.msg_entry.get()
            self.msg_entry.delete(0, tk.END)
            self.msg_entry.insert(0, f"@@{target} " if not current.startswith("@") else "")
 
    def show_login_dialog(self):
        self.login_dialog = tk.Toplevel(self.master)
        self.login_dialog.title("登录")
        
        ttk.Label(self.login_dialog, text="服务器地址:").grid(row=0, column=0, padx=5, pady=5)
        self.server_entry = ttk.Entry(self.login_dialog)
        self.server_entry.insert(0, "127.0.0.1")
        self.server_entry.grid(row=0, column=1, padx=5, pady=5)
 
        ttk.Label(self.login_dialog, text="端口号:").grid(row=1, column=0, padx=5, pady=5)
        self.port_entjavascriptry = ttk.Entry(self.login_dialog)
        self.port_entry.insert(0, "50000")
        self.port_entry.grid(row=1, column=1, padx=5, pady=5)
 
        ttk.Label(self.login_dialog, text="用户名:").grid(row=2, column=0, padx=5, pady=5)
        self.username_entry = ttk.Entry(self.login_dialog)
        self.username_entry.grid(row=2, column=1, padx=5, pady=5)
 
        ttk.Button(self.login_dialog, text="登录", command=self.connect_server).grid(row=3, columnspan=2, pady=10)
 
    def connect_server(self):
        server = self.server_entry.get()
        port = self.port_entry.get()
        self.username = self.username_entry.get().strip()
 
        if not self.username:
            messagebox.showerror("错误", "用户名不能为空")
            return
 
        try:
            self.client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            self.client_socket.connect((server, int(port)))
            self.client_socket.send(self.username.encode())
            
            response = self.client_socket.recv(1024).decode()
            if response != "CONNECT_SUCCESS":
                messagebox.showerror("错误", f"连接失败: {response}")
                return
 
            self.running = True
            self.login_dialog.destroy()
            self.master.title(f"在线聊天室 - {self.username}")
            threading.Thread(target=self.receive_messages, daemon=True).start()
 
        except Exception as e:
            messagebox.showerror("连接失败", str(e))
            if self.client_socket:
                self.client_socket.close()
 
    def receive_messages(self):
        buffer = ""
        while self.running:
            try:
                data = self.client_socket.recv(4096)
                if not data: break
                
                buffer += data.decode()
                while "\n" in buffer:
                    msg, buffer = buffer.split("\n", 1)
                    if msg.startswith("USERLIST:"):
                        self.update_user_list(msg[9:].split(','))
                    elif msg.startswith("SYSTEM:"):
                        self.display_system_msg(msg[7:])
                    else:
                        self.display_message(msg)
            except:
                break
 
    def update_user_list(self, users):
        current = set(self.user_list.get_children())
        online = set(编程客栈users)
        
        for user in current - online:
            self.user_list.delete(user)
        
        for user in online - current:
            self.user_list.insert("", "end", iid=user, text=user)
 
    def display_message(self, msg):
        self.chat_area.config(state=tk.NORMAL)
        self.chat_area.insert(tk.END, msg + "\n")
        self.chat_area.see(tk.END)
        self.chat_area.config(state=tk.DISABLED)
 
    def display_system_msg(self, msg):
        self.chat_area.config(state=tk.NORMAL)
        self.chat_area.insert(tk.END, f"【系统】{msg}\n", 'system')
        self.chat_area.see(tk.END)
        self.chat_area.config(state=tk.DISABLED)
 
    def send_message(self, event=None):
        msg = self.msg_entry.get().strip()
        if msg:
            try:
                self.client_socket.send(f"{msg}\n".encode())
                self.msg_entry.delete(0, tk.END)
                if msg.startswith("@@"):
                    self.display_message(f"[我] 私聊 {msg[2:].split(' ')[0]}:{' '.join(msg.split()[1:])}")
                else:
                    self.display_message(f"[我]:{msg}")
            except Exception as e:
                messagebox.showerror("发送失败", str(e))
 
if __name__ == "__main__":
    root = tk.Tk()
    app = ChatClient(root)
    root.mainloop() 

以上就是基于python实现多人聊天室的示例代码的详细内容,更多关于Python多人聊天室的资料请关注编程客栈(www.devze.com)其它相关文章!

0

精彩评论

暂无评论...
验证码 换一张
取 消

关注公众号