Article Index

程序框架

服务器端

  • server_main.py 服务器主程序,分响应者和巡逻者两个进程。 响应者负责接收客户端传输的全部数据,及时做出应答。 巡逻者负责监视用户的在线状态。
  • server_database.py 数据库管理者,负责存取数据库。
  • server_config.py 服务器接口配置

 

客户端

  • client_main.py 客户端主程序,打印菜单,接收用户输入。

进入聊天室后程序分为三个进程,分别是:

连接者:每秒告知服务端响应者客户端的连接状态。

发送者:负责告知服务端响应者用户的聊天信息。

接收者:负责接受服务端响应者发来的信息。

  • client_comm.py 应用层通讯协议指令工厂
  • client_udp.py 客户端与服务器的连接层,提供发送和接听方法
  • client_config.py 客户端配置文件,包括服务器地址和客户端端口

 

https://github.com/jarork/chatroom/tree/master

 

 

 

服务端代码

#!/usr/bin/python3
'''
服务端客户端连线层
'''

from socket import *
from server_config import ServerConfig
from multiprocessing import Process, Array
from server_database import DataManager
from time import ctime, time, sleep
from datetime import datetime

# 每个角色都应该能够向所有用户发送广播
class Speaker:
   '''
  管理者角色:继承此类将能够对所有或单个用户发送信息。
  '''
   def broadcast(self, user_name, message):
       '''
      向所有在线用户广播消息
      '''
       # 用户的发送端口为n,接收端口为n+1
       for user, info in self.online_users.items():
           self.response((info[0],int(info[1])+1), "%s %s: %s" % (user_name, datetime.now().strftime('%H:%M:%S'), message))

   def response(self, client_addr, message):
       '''
      用于反馈用户的请求
      '''
       self.udp_socket.sendto(message.encode(), client_addr)    

# 服务器应答者,负责接收客户端信息并立即作出回应
class Responser(Speaker):
   '''
  应答者:负责监听客户端信息,做出及时应答。
  包括应答注册登录请求,以及消息发送请求。
  负责联系数据库管理者,调用数据验证用户,
  以及存储聊天记录。
  '''
   def __init__(self):
       # 创建udp的套接字
       self.udp_socket = socket(AF_INET,SOCK_DGRAM)

       # 绑定地址
       self.udp_socket.bind(ServerConfig.UDP_SOCKET_ADDR)

       # 活跃用户列表
       self.online_users = dict()

       # 创建数据库管理者角色(连接数据库)
       DataManager.connect_mysql()
   

   def run_listener(self):
       '''
      用于等待、接收客户端信息
      '''
       # 注意接收到的addr是客户端的地址
       print("正在等待客户端递包...")

       while True:
           data, addr = self.udp_socket.recvfrom(1024)
           data = data.decode()
           # print("收到了客户端%s发来的指令:\n%s" %(addr, data))
           command, info = data.split("$$")

           if command == "LOGIN":
               self.client_login(info, addr)

           elif command == "REG":
               self.client_reg(info, addr)
               
           elif command == "MSG":
               self.client_msg(info, addr)

           elif command == "ONLINE":
               self.client_online(info, addr)

           elif command == "EXIT":
               self.client_exit(info, addr)

   
   def get_online_users(self):
       users = [i for i in self.online_users]
       users = ", ".join(users)
       users = "当前在线用户: " + users
       return users

   def client_login(self, info, addr):
       info = info.split("^^")
       user_name = info[0]
       password = info[1]
       print("接收到了客户端的登录请求,\n用户:%s\n密码:%s\n" % (user_name,password))

       # 查询用户登录信息是否与数据库一致
       result = DataManager.check_user_login(user_name, password)
       if result == 2:
           print("用户%s登录成功" % user_name)

           # 获取在线用户和聊天记录,然后发送给用户
           online_users = self.get_online_users()
           chat_history_list = self.get_latest_messages(10)

           chat_history_str = ""
           for user_name, msg, time in chat_history_list:
               chat_history_str += "%s %s : %s\n" % (user_name, time.strftime('%H:%M:%S'), msg)

           # print(chat_history_str)

           self.response(addr, "2"+chat_history_str+online_users)

           # 向所有人广播用户登录
           self.broadcast("管理员", "%s进入了聊天室" % user_name)

       if result == 1:
           print("用户%s输入了不正确的密码" % user_name)
           self.response(addr, "1")
           
       if result == 0:
           print("用户%s不存在" % user_name)
           self.response(addr, "0")

   def client_reg(self, info, addr):
       info = info.split("^^")
       user_name = info[0]
       password = info[1]
       print("接收到了客户端的注册请求,\n用户:%s\n密码:%s\n" % (user_name,password))

       # 查询用户是否存在
       result = DataManager.search_user(user_name)
       if result:
           print("用户名%s已存在" % user_name)
           self.response(addr, "用户名%s已存在" % user_name)
       else:
           print("%s可以注册"  % user_name)
           reg_result = DataManager.reg_user(user_name, password)
           if reg_result:
               print("已经完成对用户%s的注册请求。" % user_name)
               self.response(addr, "恭喜,您已完成注册;现在即可使用用户名和密码登录聊天室。")
           else:
               print("用户%s的注册失败。" % user_name)
               self.response(addr, "抱歉,该用户名已被注册。")

   def client_msg(self, info, addr):
       info = info.split("^^")
       user_name = info[0]
       message = info[1]
       print("接收到了用户%s的信息%s" % (user_name, message))

       # 广播用户刚刚发送的消息
       self.broadcast(user_name, message)

       # 把用户发送的消息保存到数据库
       result = DataManager.save_msg(user_name, message)

   def client_online(self, info, addr):
       user_name = info
       # print("用户%s在线中..." % user_name)
       
       # 从共享内存读取在线用户
       user_str = online_users_array.value.decode()
       self.online_users = dict()

       # users_info用于保存每个用户的名字,IP地址和上次状态更新的时间
       # 格式Tom@@127.0.0.1@@9999@@2020-07-24-05:12:52##...
       
       users = user_str.split("##")
       if users:
           for each_user in users:
               # if a user is found
               if each_user:
                   user_data = each_user.split("@@")
                   o_user_name, ip_addr, ip_port, last_active = user_data[0:4]
                   self.online_users.update({o_user_name: [ip_addr, ip_port, last_active]})
               
       
       # 如果用户已在线
       if user_name in self.online_users:
           self.online_users.update({user_name: [addr[0], addr[1], round(time()) ] })
       else:
           self.online_users.update({user_name: [addr[0], addr[1], round(time()) ] })

       # 把更新后的在线用户保存至共享内存
       string = ""
       for name, info in self.online_users.items():
           for i in range(len(info)):
               info[i] = str(info[i])

           line = "##%s@@%s@@%s@@%s" % (name, *info)
           string += line

       string = string.encode()    
       # 将字符串打散成单个字节的字节串
       bytes = [l for l in string]
       bytes_len = len(bytes)
       
       # 更新共享内存
       online_users_array[:bytes_len] = bytes

   def client_exit(self, info, addr):
       user_name = info
       print("接收到了用户%s的退出请求" % user_name)

   def get_latest_messages(self, msg_num):
       messages = DataManager.get_latest_msg(10)
       return messages

   def close_conn(self):
       '''
      用于关闭和客户端的通讯
      '''
       try:
           self.udp_socket.close()
           return 1
       except Exception as e:
           print(e, "关闭和客户端的通讯失败")
           return 0
       
# 服务器巡逻者,负责检测不活跃用户,以及不文明用户
class PatrolMan(Speaker):
   '''
  巡逻者:负责清理不活跃的用户,
  以及封禁发布违规内容的用户。
  '''
   def __init__(self):
       # 创建udp的套接字
       self.udp_socket = socket(AF_INET,SOCK_DGRAM)

       # 绑定地址
       self.udp_socket.bind(ServerConfig.UDP_SOCKET_ADDR2)

       # 活跃用户列表
       self.online_users = dict()

   def patrol(self):
       '''
      巡逻者开始巡视,如果发现用户长时间不应答
      则广播用户离线的消息。
      '''
       while True:
           sleep(1)

           current_time = round(time())

           # 判断用户是否超时
           # 从共享内存读取在线用户
           user_str = online_users_array.value.decode()
           self.online_users = dict()

           # users_info用于保存每个用户的名字,IP地址和上次状态更新的时间
           # 格式Tom@@127.0.0.1@@9999@@2020-07-24-05:12:52##...
           
           users = user_str.split("##")
           if users:
               for each_user in users:
                   # if a user is found
                   if each_user:
                       user_data = each_user.split("@@")
                       o_user_name, ip_addr, ip_port, last_active = user_data[0:4]
                       self.online_users.update({o_user_name: [ip_addr, ip_port, last_active]})

           del_user_list = []
           for each_user, info in self.online_users.items():
               # 获取用户上次更新在线状态的时间
               last_active = info[2]
               # print(each_user, "的上次响应时间戳:",last_active)
               
               # 如果用户超过10秒没有响应,说明该用户断开了连接
               if (current_time - int(last_active)) > 10:
                   print(each_user,"离开了聊天室")

                   # 广播离开的用户
                   self.broadcast('管理员',"用户%s离开了聊天室。" % each_user)

                   # 标记要删除的离线用户
                   del_user_list.append(each_user)

           # 删除所有被标记的超时用户
           for each in del_user_list:
               del self.online_users[each]
           
           # 把更新后的在线用户保存至共享内存
           string = ""
           for name, info in self.online_users.items():
               for i in range(len(info)):
                   info[i] = str(info[i])

               line = "##%s@@%s@@%s@@%s" % (name, *info)
               string += line

           string = string.encode()    
           # 将字符串打散成单个字节的字节串
           bytes = [l for l in string]
           bytes_len = len(bytes)
           
           
           # 清空共享内存
           online_users_array[:len(online_users_array)]=b'\x00'*len(online_users_array)

           # 更新共享内存
           online_users_array[:bytes_len] = bytes
           # print(online_users_array.value)

   
if __name__ == '__main__':
   # 创建共享内存空间,用于进程间共享活跃用户
   online_users_array = Array('c', 1024)

   # 创建巡逻者角色,子进程A
   patrol_man = PatrolMan()
   patrol_man_proc = Process(target = patrol_man.patrol)
   patrol_man_proc.start()

   # 创建响应者角色,父进程
   responser = Responser()
   responser.run_listener()

   

 

import pymysql
from server_config import ServerConfig

# 数据管理者
class DataManager():
   '''
  负责把数据信息交付给应答者和巡逻者,
  收集从应答者和巡逻者获得的信息,保存数据库。
  '''
   @classmethod
   def connect_mysql(cls):
       '''
      数据库连接
      '''
       cls.db = pymysql.connect(**ServerConfig.DB_CONFIG)
       cls.cur = cls.db.cursor()

   @classmethod
   def search_user(cls, user_name):
       sql = "select name from users where name = %s;"
       cls.cur.execute(sql, [user_name])
       result = cls.cur.fetchone()
       return result

   @classmethod
   def check_user_login(cls, user_name, password):
       '''
      : return 2 登陆成功
      : return 1 密码不对
      : return 0 用户名未找到
      '''
       sql = "select password from users where name = %s;"
       cls.cur.execute(sql, [user_name])
       password_serv = cls.cur.fetchone()

       # 如果该用户存在,检查密码
       if password_serv:
           if password == password_serv[0]:
               return 2
           else:
               return 1
       
       # 如果没有找到用户
       return 0

   @classmethod
   def reg_user(cls, user_name, password):
       sql = "insert into users (name, password) values (%s,%s);"
       try:
           cls.cur.execute(sql, [user_name, password])
           cls.db.commit()

           return True

       except:
           cls.db.rollback()  

           return False
           
   @classmethod
   def save_msg(cls, user_name, message):
       try:
           sql = 'insert into chat_history (user,message) values (%s,%s);'
           cls.cur.execute(sql,[user_name,message])
           cls.db.commit()

           return True
       
       except:
           cls.db.rollback()  

           return False

   @classmethod
   def get_latest_msg(cls, msg_num):
       try:
           sql = 'select user, message, time from chat_history order by time desc limit %d;' % msg_num
           cls.cur.execute(sql)
           messages = cls.cur.fetchall()

           return messages
       
       except:  
           return False

 

'''
服务器配置
'''

class ServerConfig():
   # 应答者的地址和端口
   UDP_SOCKET_ADDR = ("0.0.0.0",8888)

   # 巡逻者的地址和端口
   UDP_SOCKET_ADDR2 = ("0.0.0.0",9999)

   # MySQL数据库配置信息
   DB_CONFIG = dict(host='localhost',port=3306,user='root',password="123456",database='chatroom',charset='utf8')