Untitled
unknown
python
4 years ago
13 kB
9
Indexable
class MyPythonServer:
def __init__(self,host,port):
self.host=host
self.port=port
def startServer(self):
import socket
import mylib
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind((self.host,self.port))
server.listen(100)
while True:
# 阻塞式, 直到接收到訊息
try:
conn, addr = server.accept()
ret=conn.recv(1024)
#raise UnicodeDecodeError('funnycodec', b"O!OnnO!", 1, 2, 'This is just a fake reason!')
#raise ConnectionResetError
clientMessage = str(ret, encoding='utf-8',errors='ignore')
requestKey=clientMessage.split(sep="|")[0]
except UnicodeDecodeError as e:
print(mylib.目前時間() + " Error message: ", e)
#mylib.send_email("UnicodeDecodeError", "my python server", "asdf121472@gmail.com", '發生 UnicodeDecodeError')
clientMessage='err|UnicodeDecodeError'
requestKey = clientMessage.split(sep="|")[0]
except ConnectionResetError as c:
print(mylib.目前時間() + " Error message: ", c)
#mylib.send_email("ConnectionResetError", "my python server", "asdf121472@gmail.com", '發生 ConnectionResetError')
clientMessage='err|ConnectionResetError'
requestKey = clientMessage.split(sep="|")[0].lower()
#根據client的請求關鍵字(requestKey), 呼叫處理器函數
p=一般處理器(clientMessage)
q=外掛驗證_處理器(clientMessage)
c=cali_處理器(clientMessage)
if requestKey=='cali':
serverMessage = c.cali()
elif requestKey=='xor_new':
serverMessage=q.check唯一模擬器使用_new()
elif requestKey=='sms':
serverMessage=p.發簡訊()
elif requestKey=='ver':
serverMessage=q.check合法版本()
elif requestKey=='limit':
serverMessage=q.check使用期限()
elif requestKey=='test':
# 平常測試用
print(mylib.目前時間() + '----------------------------測試模式\n')
print("client 傳來: "+clientMessage.split(sep="|")[1])
serverMessage="server 收到: "+clientMessage.split(sep="|")[1]
elif requestKey=='err':
# 發生異常
serverMessage = p.發生異常()
elif requestKey == 'get_date':
print(mylib.目前時間() + '----------------------------取得網路日期\n')
import time
localtime = time.localtime(time.time())
serverMessage=str(localtime.tm_year) + '-' + str(localtime.tm_mon) + '-' + str(localtime.tm_mday)
print('網路日期: '+serverMessage+'\n')
elif requestKey == 'txt':
# client 要從 server 下載文字檔
print(mylib.目前時間() + '----------------------------下載文字檔\n')
serverMessage=p.下載文字檔()
elif requestKey == 'email':
# client 要寄送email
serverMessage = p.寄送Email()
else:
print(mylib.目前時間() + '-------------------------客戶端意圖不明\n')
print("client 傳來: "+clientMessage+'\n')
serverMessage="你到底要什麼服務?"
# 將結果回傳客戶端
try:
if type(serverMessage) is str:
# encode() 編碼只能用在文字資料
conn.sendall(serverMessage.encode())
else:
conn.sendall(serverMessage)
conn.close()
print(mylib.目前時間() + '------------------客戶端已離線\n')
except ConnectionResetError as c:
# client強制關閉連線
print(mylib.目前時間() + '------------------ConnectionResetError cllient端強制關閉連線\n')
conn.close()
class 外掛驗證_處理器:
def __init__(self, clientMessage):
self.clientMessage = clientMessage
def check唯一模擬器使用_new(self):
# 0 1 2 3
# clientMessage=xor_new|授權文件資料夾路徑|可用通道|通道檢查週期_分鐘
# 可用通道分隔符為| eg. 可用通道='1|2|3|4|5'
import time
import mylib
print(mylib.目前時間() + '--------------------------new_唯一模擬器使用驗證\n')
授權文件資料夾路徑 = self.clientMessage.split(sep="|")[1].strip()
print('授權文件路徑= ' + 授權文件資料夾路徑)
檢查週期_分鐘 = int(self.clientMessage.split(sep="|")[3].strip('\n'))
print('檢查週期_分鐘= ' + str(檢查週期_分鐘))
可用通道 = self.clientMessage.split(sep="|")[2].strip()
print('可用通道= ' + str(可用通道))
for channel in 可用通道.split(sep=','):
# 讀取授權文件資料
filePath=授權文件資料夾路徑+channel.strip()+'.txt'
print('filePath='+filePath)
try:
f = open(filePath, "r")
arr = f.read().split(sep="#")
上次登記epoch = int(arr[0])
print('old epoch= ' + str(上次登記epoch))
到期日 = str(arr[1])
print('到期日= ' + 到期日)
except Exception as E:
print(E,'請檢查'+str(channel)+'.txt有無錯誤改寫')
上次登記epoch = 115
到期日='1999-1-1'
finally:
f.close()
# 取得目前epoch
目前epoch = int(time.time())
print('now epoch= ' + str(目前epoch))
# [驗證週期]是否<客戶端[檢查週期_分鐘]
print('經過秒數= ' + str((目前epoch - 上次登記epoch)))
if int(目前epoch - 上次登記epoch) >= (檢查週期_分鐘 * 60): # = 要考慮進去
try:
# 記錄新的[epoch#使用期限]
f = open(filePath, "w")
f.write(str(目前epoch) + '#' + 到期日)
except Exception as E:
print(E)
finally:
f.close()
print('週期合法 授權文件:'+filePath+'\n')
return 'ok'
else:
print(filePath+' 週期不合法')
print('===================')
# 週期都不合法
print('沒有空位子\n')
return 'not'
def check合法版本(self):
# clientMessage=ver|client版本|伺服器version_control.txt路徑
import mylib
print(mylib.目前時間() + '--------------------------版本驗證請求\n')
client_ver = self.clientMessage.split(sep="|")[1].strip('\n')
server_path = self.clientMessage.split(sep="|")[2].strip('\n')
print("客戶端版本: " + client_ver + "\n版本文件路徑: " + server_path)
try:
# 讀取對應專案的 version_control.txt
f = open(server_path, "r")
server_ver = f.readlines()[0].split('=')[1]
print('伺服器端合法版本: ' + server_ver.strip('\n'))
except Exception as E:
print(E)
server_ver='x.x.x'
finally:
f.close()
# client端版本是否合法
flag = "not"
for v in server_ver.split(sep="|"):
if client_ver.strip() == v.strip():
flag = "ok"
print('客戶端,版本可用\n')
break
if flag == "not":
print('客戶端,版本不可用\n')
return flag
def check使用期限(self):
# clientMessage=limit|授權文件路徑
import mylib
print(mylib.目前時間() + '--------------------------使用期限請求\n')
txt_path = self.clientMessage.split(sep="|")[1].strip('\n')
print('授權文件路徑: ' + txt_path)
try:
# 讀取對應專案的 1.txt
f = open(txt_path, "r") # 1550#2011-11-12
limit_date = f.readlines()[0].split('#')[1]
except Exception as E:
print(E)
limit_date='2001-1-1' # 讓它過期
finally:
print("使用期限: " + limit_date + "\n")
f.close()
return limit_date
class cali_處理器:
def __init__(self, clientMessage):
self.clientMessage = clientMessage
def cali(self):
# 處理卡利百家樂的讀寫資料
# clientMessage=cali|read or cali|write|data 要讀或寫紀錄資料
import mylib
import os
data_path = r'C:\myFTP\Android_qm\calibet\cali_data.txt'
flag = self.clientMessage.split(sep="|")[1].strip()
print(mylib.目前時間() + '--------------------------卡利百家樂 ' + flag + '\n')
if flag == 'read':
# 檢查 /cali_data.txt 是否存在
try:
f = open(data_path, 'r')
ret = f.read()
f.close()
print('data read: %s \n' % ret)
return ret
except Exception as E:
print(E)
return 'not'
if flag == 'write':
try:
data = self.clientMessage.split(sep="|")[2].strip()
f = open(data_path, 'w')
f.write(data)
f.close()
print('Write data to %s \n' % data_path)
except Exception as E:
print(E)
return 'not'
else:
return "ok"
class 一般處理器:
def __init__(self, clientMessage):
self.clientMessage = clientMessage
def 下載文字檔(self):
# 客戶端要從伺服器下載文字檔
# clientMessage = txt|伺服器端文字檔路徑
txt_path = self.clientMessage.split(sep="|")[1].strip('\n')
print('client想下載文件: ' + txt_path+'\n')
try:
f = open(txt_path, "r",encoding='utf-8')
ret=f.read()
except Exception as E:
print(E)
ret='something gets wrong in Server'
finally:
f.close()
return ret
def 發生異常(self):
# clientMessage=err|異常名稱
# 前面捕捉到的異常, 這裡返回給使用者
import mylib
print(mylib.目前時間() + '--------------------------發生異常 ' + self.clientMessage.split(sep="|")[1] + '\n')
return self.clientMessage.split(sep="|")[1]
def 寄送Email(self):
# clientMessage=email|標題|寄件者|收件者|內容
import mylib
tmp=self.clientMessage.split(sep="|")
標題=tmp[1]
寄件者=tmp[2]
收件者=tmp[3]
內容=tmp[4]
print(mylib.目前時間() + '----------------------------send email to '+收件者+'\n')
ret=mylib.send_email(標題,寄件者,收件者,內容)
if ret==1:
return "ok"
else:
return "not"
def 發簡訊(self):
# 利用簡訊王( https://www.kotsms.com.tw/ )發送簡訊
# clientMessage=sms|簡訊王帳號|簡訊王密碼|接收者電話號碼|簡訊內容
import mylib
from kotsms import kotsms
tmp=self.clientMessage.split(sep="|")
USERNAME = tmp[1]
PASSWORD = tmp[2]
PHONE = tmp[3]
MSG = tmp[4]
print(mylib.目前時間() + '----------------------------send SMS to '+PHONE+'\n')
sms = kotsms()
sms.login(USERNAME, PASSWORD)
sms.sendMsg(PHONE, MSG)
return "ok"
if __name__=="__main__":
server=MyPythonServer('122.116.83.160',1313)
server.startServer()
Editor is loading...