加入收藏 | 设为首页 | 会员中心 | 我要投稿 湖南网 (https://www.hunanwang.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 业界 > 正文

RPC 处事器之【多历程描写符转达】高阶模子

发布时间:2019-07-06 09:21:52 所属栏目:业界 来源:码洞
导读:本日先生要给各人先容一个较量出格的 RPC 处事器模子,这个模子差异于 Nginx、差异于 Redis、差异于 Apache、差异于 Tornado、差异于 Netty,它的原型是 Node Cluster 的多历程并发模子。 Nginx 并发模子 我们知道 Nginx 的并发模子是一个多历程并发模子,

下面我来献上完备的处事器代码,为了简朴起见,我们在 Slave 历程中处理赏罚 RPC 哀求行使同步模子。

  1. # coding: utf 
  2. # sendmsg recvmsg python3.5+才可以支持 
  3.  
  4. import os 
  5. import json 
  6. import struct 
  7. import socket 
  8.  
  9.  
  10. def handle_conn(conn, addr, handlers): 
  11.     print(addr, "comes") 
  12.     while True: 
  13.         # 简朴起见,这里就没有行使轮回读取了 
  14.         length_prefix = conn.recv(4) 
  15.         if not length_prefix: 
  16.             print(addr, "bye") 
  17.             conn.close() 
  18.             break  # 封锁毗连,继承处理赏罚下一个毗连 
  19.         length, = struct.unpack("I", length_prefix) 
  20.         body = conn.recv(length) 
  21.         request = json.loads(body) 
  22.         in_ = request['in'] 
  23.         params = request['params'] 
  24.         print(in_, params) 
  25.         handler = handlers[in_] 
  26.         handler(conn, params) 
  27.  
  28.  
  29. def loop_slave(pr, handlers): 
  30.     while True: 
  31.         bufsize = 1 
  32.         ancsize = socket.CMSG_LEN(struct.calcsize('i')) 
  33.         msg, ancdata, flags, addr = pr.recvmsg(bufsize, ancsize) 
  34.         cmsg_level, cmsg_type, cmsg_data = ancdata[0] 
  35.         fd = struct.unpack('i', cmsg_data)[0] 
  36.         sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, fileno=fd) 
  37.         handle_conn(sock, sock.getpeername(), handlers) 
  38.  
  39.  
  40. def ping(conn, params): 
  41.     send_result(conn, "pong", params) 
  42.  
  43.  
  44. def send_result(conn, out, result): 
  45.     response = json.dumps({"out": out, "result": result}).encode('utf-8') 
  46.     length_prefix = struct.pack("I", len(response)) 
  47.     conn.sendall(length_prefix) 
  48.     conn.sendall(response) 
  49.  
  50.  
  51. def loop_master(serv_sock, pws): 
  52.     idx = 0 
  53.     while True: 
  54.         sock, addr = serv_sock.accept() 
  55.         pw = pws[idx % len(pws)] 
  56.         # 动静数据,whatever 
  57.         msg = [b'x'] 
  58.         # 帮助数据,携带描写符 
  59.         ancdata = [( 
  60.             socket.SOL_SOCKET, 
  61.             socket.SCM_RIGHTS, 
  62.             struct.pack('i', sock.fileno()))] 
  63.         pw.sendmsg(msg, ancdata) 
  64.         sock.close()  # 封锁引用 
  65.         idx += 1 
  66.  
  67.  
  68. def prefork(serv_sock, n): 
  69.     pws = [] 
  70.     for i in range(n): 
  71.         # 开发父子历程通讯「管道」 
  72.         pr, pw = socket.socketpair() 
  73.         pid = os.fork() 
  74.         if pid < 0:  # fork error 
  75.             return pws 
  76.         if pid > 0: 
  77.             # 父历程 
  78.             pr.close()  # 父历程不消读 
  79.             pws.append(pw) 
  80.             continue 
  81.         if pid == 0: 
  82.             # 子历程 
  83.             serv_sock.close()  # 封锁引用 
  84.             pw.close()  # 子历程不消写 
  85.             return pr 
  86.     return pws 
  87.  
  88.  
  89. if __name__ == '__main__': 
  90.     serv_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 
  91.     serv_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 
  92.     serv_sock.bind(("localhost", 8080)) 
  93.     serv_sock.listen(1) 
  94.     pws_or_pr = prefork(serv_sock, 10) 
  95.     if hasattr(pws_or_pr, '__len__'): 
  96.         if pws_or_pr: 
  97.             loop_master(serv_sock, pws_or_pr) 
  98.         else: 
  99.             # fork 所有失败,没有子历程,Game Over 
  100.             serv_sock.close() 
  101.     else: 
  102.         handlers = { 
  103.             "ping": ping 
  104.         } 
  105.         loop_slave(pws_or_pr, handlers) 

(编辑:湖南网)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

热点阅读