Python课程-w73班
Louyj's Blog
Toggle navigation
Python课程-w73班
Home
班级课件
课堂代码
课后作业
学习资料
Archives
Tags
2022-06-26课堂代码
2022-06-26 09:14:42
11
0
0
python-w73
服务端(chat_server.py) ``` import json import socket import traceback online_users = {} def decode_msg(msg): return json.loads(msg.decode('utf-8')) def encode_msg(msg): return json.dumps(msg).encode('utf-8') server_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) server_socket.bind(('localhost', 8888)) while True: try: recv_message, recv_address = server_socket.recvfrom(1024) print("收到用户消息: %s, 网络地址 %s" % (recv_message, recv_address)) recv_message = decode_msg(recv_message) msg_type = recv_message['type'] if msg_type == 'login': recv_user_name = recv_message['user'] online_users[recv_user_name] = recv_address for user_name, address in online_users.items(): send_message = {'type': 'online', 'all_users': online_users, 'user': recv_user_name} server_socket.sendto(encode_msg(send_message), address) elif msg_type == 'quit': recv_user_name = recv_message['user'] del online_users[recv_user_name] for user_name, address in online_users.items(): if user_name != recv_user_name: send_message = {'type': 'offline', 'all_users': online_users, 'user': recv_user_name} server_socket.sendto(encode_msg(send_message), address) elif msg_type == 'chat': to_user_name = recv_message['to_user'] if to_user_name in online_users: to_user_address = online_users[to_user_name] server_socket.sendto(encode_msg(recv_message), to_user_address) else: print('用户%s不存在, 消息无法送达' % to_user_name) except Exception as e: print("Exception: ", traceback.format_exc()) ``` 客户端(chat_client.py) ``` import json import socket import threading online_users = {} # current_user_name = None selected_user = None selected_user_address = None unread_messages = {} running = True class bcolors: HEADER = '\033[95m' OKBLUE = '\033[94m' OKCYAN = '\033[96m' OKGREEN = '\033[92m' WARNING = '\033[93m' FAIL = '\033[91m' ENDC = '\033[0m' BOLD = '\033[1m' UNDERLINE = '\033[4m' def decode_msg(msg): return json.loads(msg.decode('utf-8')) def encode_msg(msg): return json.dumps(msg).encode('utf-8') def print_prompt(): prompt = ">> " if selected_user: prompt = '[%s]>> ' % selected_user print("\r" + bcolors.WARNING + prompt + bcolors.ENDC, end="") def print_receive_message(from_user, message): prompt = '[%s]<< ' % from_user print("\r" + bcolors.WARNING + prompt + bcolors.ENDC, end="") print(bcolors.HEADER + message + bcolors.ENDC) def print_system_message(message): print("\r" + bcolors.FAIL + message + bcolors.ENDC) print_prompt() def list_online_users(): print("=================在线用户列表=================") for user_name, address in online_users.items(): if user_name == current_user_name: print('用户: %s, 网络地址: %s:%s (*)' % (user_name, address[0], address[1])) else: print('用户: %s, 网络地址: %s:%s' % (user_name, address[0], address[1])) def list_unread_messages(): print("=================未读消息列表=================") for user_name, message_list in unread_messages.items(): print('用户: %s, 未读消息: %d 条' % (user_name, len(message_list))) def receive(): while running: recv_message, recv_address = client_socket.recvfrom(1024) recv_message = decode_msg(recv_message) msg_type = recv_message['type'] global online_users if msg_type == 'online': online_users = recv_message['all_users'] print_system_message("系统消息: 新用户%s已上线" % recv_message['user']) elif msg_type == 'offline': online_users = recv_message['all_users'] offline_user = recv_message['user'] print_system_message("系统消息: 用户%s已下线" % offline_user) global selected_user global selected_user_address if selected_user == offline_user: selected_user = None selected_user_address = None elif msg_type == 'chat': from_user = recv_message['from_user'] message = recv_message['message'] if from_user != selected_user: if from_user in unread_messages: from_user_unread_message = unread_messages[from_user] from_user_unread_message.append(message) else: unread_messages[from_user] = [message] print_system_message("系统消息: 收到来自用户%s的%d条新消息" % (from_user, len(unread_messages[from_user]))) else: print_receive_message(from_user, message) print_prompt() def send(): global running while running: print_prompt() message = input() if message == ':user': list_online_users() elif message == ':select': list_online_users() global selected_user global selected_user_address find_user = False while not find_user: selected_user = input("请选择用户名: ") if selected_user == current_user_name: print("聊天用户不是是自己,请重新选择") continue for user_name, address in online_users.items(): if user_name == selected_user: selected_user_address = address find_user = True break if not find_user: print("用户不存在,请重新选择") if selected_user in unread_messages: unread_message_list = unread_messages[selected_user] for unread_message in unread_message_list: print_receive_message(selected_user, "未读消息: " + unread_message) del unread_messages[selected_user] elif message == ':quit': send_message = {'type': 'quit', 'user': current_user_name} client_socket.sendto(encode_msg(send_message), server_address) running = False print("系统退出") break elif message == ':message': list_unread_messages() else: if message.strip() == '': pass elif not selected_user: print("未选择任何用户,不能发送消息") else: send_message = {'type': 'chat', 'to_user': selected_user, 'from_user': current_user_name, 'message': message} client_socket.sendto(encode_msg(send_message), server_address) client_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) server_address = ('localhost', 8888) print("欢迎使用在线聊天系统") current_user_name = input("请输入用户名: ") registry_msg = {"type": "login", "user": current_user_name} client_socket.sendto(encode_msg(registry_msg), server_address) threading.Thread(target=receive, daemon=True).start() threading.Thread(target=send).start() ```
Pre:
2022-07-10课件
Next:
2022-06-19课堂代码
0
likes
11
Weibo
Wechat
Tencent Weibo
QQ Zone
RenRen
Submit
Sign in
to leave a comment.
No Leanote account?
Sign up now.
0
comments
More...
Table of content
No Leanote account? Sign up now.