Untitled
unknown
python
4 years ago
13 kB
5
Indexable
class MyPythonServer: def __init__(self,host,port): self.host=host self.port=port def startServer(self): import socket import mylib server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server.bind((self.host,self.port)) server.listen(100) while True: # 阻塞式, 直到接收到訊息 try: conn, addr = server.accept() ret=conn.recv(1024) #raise UnicodeDecodeError('funnycodec', b"O!OnnO!", 1, 2, 'This is just a fake reason!') #raise ConnectionResetError clientMessage = str(ret, encoding='utf-8',errors='ignore') requestKey=clientMessage.split(sep="|")[0] except UnicodeDecodeError as e: print(mylib.目前時間() + " Error message: ", e) #mylib.send_email("UnicodeDecodeError", "my python server", "asdf121472@gmail.com", '發生 UnicodeDecodeError') clientMessage='err|UnicodeDecodeError' requestKey = clientMessage.split(sep="|")[0] except ConnectionResetError as c: print(mylib.目前時間() + " Error message: ", c) #mylib.send_email("ConnectionResetError", "my python server", "asdf121472@gmail.com", '發生 ConnectionResetError') clientMessage='err|ConnectionResetError' requestKey = clientMessage.split(sep="|")[0].lower() #根據client的請求關鍵字(requestKey), 呼叫處理器函數 p=一般處理器(clientMessage) q=外掛驗證_處理器(clientMessage) c=cali_處理器(clientMessage) if requestKey=='cali': serverMessage = c.cali() elif requestKey=='xor_new': serverMessage=q.check唯一模擬器使用_new() elif requestKey=='sms': serverMessage=p.發簡訊() elif requestKey=='ver': serverMessage=q.check合法版本() elif requestKey=='limit': serverMessage=q.check使用期限() elif requestKey=='test': # 平常測試用 print(mylib.目前時間() + '----------------------------測試模式\n') print("client 傳來: "+clientMessage.split(sep="|")[1]) serverMessage="server 收到: "+clientMessage.split(sep="|")[1] elif requestKey=='err': # 發生異常 serverMessage = p.發生異常() elif requestKey == 'get_date': print(mylib.目前時間() + '----------------------------取得網路日期\n') import time localtime = time.localtime(time.time()) serverMessage=str(localtime.tm_year) + '-' + str(localtime.tm_mon) + '-' + str(localtime.tm_mday) print('網路日期: '+serverMessage+'\n') elif requestKey == 'txt': # client 要從 server 下載文字檔 print(mylib.目前時間() + '----------------------------下載文字檔\n') serverMessage=p.下載文字檔() elif requestKey == 'email': # client 要寄送email serverMessage = p.寄送Email() else: print(mylib.目前時間() + '-------------------------客戶端意圖不明\n') print("client 傳來: "+clientMessage+'\n') serverMessage="你到底要什麼服務?" # 將結果回傳客戶端 try: if type(serverMessage) is str: # encode() 編碼只能用在文字資料 conn.sendall(serverMessage.encode()) else: conn.sendall(serverMessage) conn.close() print(mylib.目前時間() + '------------------客戶端已離線\n') except ConnectionResetError as c: # client強制關閉連線 print(mylib.目前時間() + '------------------ConnectionResetError cllient端強制關閉連線\n') conn.close() class 外掛驗證_處理器: def __init__(self, clientMessage): self.clientMessage = clientMessage def check唯一模擬器使用_new(self): # 0 1 2 3 # clientMessage=xor_new|授權文件資料夾路徑|可用通道|通道檢查週期_分鐘 # 可用通道分隔符為| eg. 可用通道='1|2|3|4|5' import time import mylib print(mylib.目前時間() + '--------------------------new_唯一模擬器使用驗證\n') 授權文件資料夾路徑 = self.clientMessage.split(sep="|")[1].strip() print('授權文件路徑= ' + 授權文件資料夾路徑) 檢查週期_分鐘 = int(self.clientMessage.split(sep="|")[3].strip('\n')) print('檢查週期_分鐘= ' + str(檢查週期_分鐘)) 可用通道 = self.clientMessage.split(sep="|")[2].strip() print('可用通道= ' + str(可用通道)) for channel in 可用通道.split(sep=','): # 讀取授權文件資料 filePath=授權文件資料夾路徑+channel.strip()+'.txt' print('filePath='+filePath) try: f = open(filePath, "r") arr = f.read().split(sep="#") 上次登記epoch = int(arr[0]) print('old epoch= ' + str(上次登記epoch)) 到期日 = str(arr[1]) print('到期日= ' + 到期日) except Exception as E: print(E,'請檢查'+str(channel)+'.txt有無錯誤改寫') 上次登記epoch = 115 到期日='1999-1-1' finally: f.close() # 取得目前epoch 目前epoch = int(time.time()) print('now epoch= ' + str(目前epoch)) # [驗證週期]是否<客戶端[檢查週期_分鐘] print('經過秒數= ' + str((目前epoch - 上次登記epoch))) if int(目前epoch - 上次登記epoch) >= (檢查週期_分鐘 * 60): # = 要考慮進去 try: # 記錄新的[epoch#使用期限] f = open(filePath, "w") f.write(str(目前epoch) + '#' + 到期日) except Exception as E: print(E) finally: f.close() print('週期合法 授權文件:'+filePath+'\n') return 'ok' else: print(filePath+' 週期不合法') print('===================') # 週期都不合法 print('沒有空位子\n') return 'not' def check合法版本(self): # clientMessage=ver|client版本|伺服器version_control.txt路徑 import mylib print(mylib.目前時間() + '--------------------------版本驗證請求\n') client_ver = self.clientMessage.split(sep="|")[1].strip('\n') server_path = self.clientMessage.split(sep="|")[2].strip('\n') print("客戶端版本: " + client_ver + "\n版本文件路徑: " + server_path) try: # 讀取對應專案的 version_control.txt f = open(server_path, "r") server_ver = f.readlines()[0].split('=')[1] print('伺服器端合法版本: ' + server_ver.strip('\n')) except Exception as E: print(E) server_ver='x.x.x' finally: f.close() # client端版本是否合法 flag = "not" for v in server_ver.split(sep="|"): if client_ver.strip() == v.strip(): flag = "ok" print('客戶端,版本可用\n') break if flag == "not": print('客戶端,版本不可用\n') return flag def check使用期限(self): # clientMessage=limit|授權文件路徑 import mylib print(mylib.目前時間() + '--------------------------使用期限請求\n') txt_path = self.clientMessage.split(sep="|")[1].strip('\n') print('授權文件路徑: ' + txt_path) try: # 讀取對應專案的 1.txt f = open(txt_path, "r") # 1550#2011-11-12 limit_date = f.readlines()[0].split('#')[1] except Exception as E: print(E) limit_date='2001-1-1' # 讓它過期 finally: print("使用期限: " + limit_date + "\n") f.close() return limit_date class cali_處理器: def __init__(self, clientMessage): self.clientMessage = clientMessage def cali(self): # 處理卡利百家樂的讀寫資料 # clientMessage=cali|read or cali|write|data 要讀或寫紀錄資料 import mylib import os data_path = r'C:\myFTP\Android_qm\calibet\cali_data.txt' flag = self.clientMessage.split(sep="|")[1].strip() print(mylib.目前時間() + '--------------------------卡利百家樂 ' + flag + '\n') if flag == 'read': # 檢查 /cali_data.txt 是否存在 try: f = open(data_path, 'r') ret = f.read() f.close() print('data read: %s \n' % ret) return ret except Exception as E: print(E) return 'not' if flag == 'write': try: data = self.clientMessage.split(sep="|")[2].strip() f = open(data_path, 'w') f.write(data) f.close() print('Write data to %s \n' % data_path) except Exception as E: print(E) return 'not' else: return "ok" class 一般處理器: def __init__(self, clientMessage): self.clientMessage = clientMessage def 下載文字檔(self): # 客戶端要從伺服器下載文字檔 # clientMessage = txt|伺服器端文字檔路徑 txt_path = self.clientMessage.split(sep="|")[1].strip('\n') print('client想下載文件: ' + txt_path+'\n') try: f = open(txt_path, "r",encoding='utf-8') ret=f.read() except Exception as E: print(E) ret='something gets wrong in Server' finally: f.close() return ret def 發生異常(self): # clientMessage=err|異常名稱 # 前面捕捉到的異常, 這裡返回給使用者 import mylib print(mylib.目前時間() + '--------------------------發生異常 ' + self.clientMessage.split(sep="|")[1] + '\n') return self.clientMessage.split(sep="|")[1] def 寄送Email(self): # clientMessage=email|標題|寄件者|收件者|內容 import mylib tmp=self.clientMessage.split(sep="|") 標題=tmp[1] 寄件者=tmp[2] 收件者=tmp[3] 內容=tmp[4] print(mylib.目前時間() + '----------------------------send email to '+收件者+'\n') ret=mylib.send_email(標題,寄件者,收件者,內容) if ret==1: return "ok" else: return "not" def 發簡訊(self): # 利用簡訊王( https://www.kotsms.com.tw/ )發送簡訊 # clientMessage=sms|簡訊王帳號|簡訊王密碼|接收者電話號碼|簡訊內容 import mylib from kotsms import kotsms tmp=self.clientMessage.split(sep="|") USERNAME = tmp[1] PASSWORD = tmp[2] PHONE = tmp[3] MSG = tmp[4] print(mylib.目前時間() + '----------------------------send SMS to '+PHONE+'\n') sms = kotsms() sms.login(USERNAME, PASSWORD) sms.sendMsg(PHONE, MSG) return "ok" if __name__=="__main__": server=MyPythonServer('122.116.83.160',1313) server.startServer()
Editor is loading...