import b3
import b3.events
import b3.plugin
from b3.functions import getCmd
# you have to install ipy : pip install ipy
from IPy import IP
import requests
import json
import uuid
class VpnblockerPlugin(b3.plugin.Plugin):
_adminPlugin = None
_checklevel = 1
_maxConnections = 100
serverId = ""
apiKey1 = ""
apiKey2 = ""
apiKey3 = ""
allowvpn = 40
denyvpn = 40
def onStartup(self):
self._adminPlugin = self.console.getPlugin('admin')
if not self._adminPlugin:
self.error('Could not find admin plugin')
return
# register our commands
if 'commands' in self.config.sections():
for cmd in self.config.options('commands'):
level = self.config.get('commands', cmd)
sp = cmd.split('-')
alias = None
if len(sp) == 2:
cmd, alias = sp
func = getCmd(self, cmd)
if func:
self._adminPlugin.registerCommand(self, cmd, level, func, alias)
self.registerEvent(b3.events.EVT_CLIENT_AUTH, self.onConnect)
#prepare user-agent
hostIp= str(self.console._publicIp)
HostPort= str(self.console._port)
spr = '%s:%s' %(hostIp, HostPort)
self.serverId = str(uuid.uuid5(uuid.NAMESPACE_DNS, spr))
def onLoadConfig(self):
try:
self._checklevel = self.config.getint('settings', 'maxlevel')
self._maxConnections = self.config.getint('settings', 'maxconnactions')
self.apiKey1 = self.getSetting('settings', 'proxycheck.io', b3.STR, self.apiKey1)
self.apiKey2 = self.getSetting('settings', 'iphub.info', b3.STR, self.apiKey2)
self.apiKey3 = self.getSetting('settings', 'zwambro.pw', b3.STR, self.apiKey3)
except Exception, err:
self.error(err)
def onConnect(self, event):
client = event.client
self.debug('Checking {} ip...'.format(client.name))
info = {'ip': str(client.ip)}
if client.maxLevel > self._checklevel:
self.debug("%s is a higher level user, he can't be checked" % client.name)
return
else:
self.debug('%s is a lower level user, checking his ip ...' %client.name)
if client.connections > self._maxConnections:
self.debug('%s has more than %s connections, not affected by the plugin' % (client.name, self._maxConnections))
return
else:
self.debug('%s have less than %s connections, we will check his ip now ... ' % (client.name, self._maxConnections))
if self.byPassProtection(client):
self.debug('Player {} ({}) bypassed VpnProtection'.format(client.name, client.ip))
return
elif self.zwamBroDb(client.ip):
self.debug('Access denied by Zwambro db for {} ({})'.format(client.name, client.ip))
client.kick('^6Proxy/VPN Detected!^7')
return
elif self.xdefConDb(client.ip):
self.zwamBroAddVpn(client.ip, info)
self.debug('Access denied by xdefcon for {} ({})'.format(client.name, client.ip))
client.kick('^6Proxy/VPN Detected!^7')
return
elif self.proxyCheckDb(client.ip):
self.zwamBroAddVpn(client.ip, info)
self.debug('Access denied by Proxycheck for {} ({})'.format(client.name, client.ip))
client.kick('^6Proxy/VPN Detected!^7')
return
elif self.ipHubDb(client.ip):
self.zwamBroAddVpn(client.ip, info)
self.debug('Access denied by Iphub for {} ({})'.format(client.name, client.ip))
client.kick('^6Proxy/VPN Detected!^7')
return
elif self.ipApiDb(client.ip):
self.zwamBroAddVpn(client.ip, info)
self.debug('Access denied by Ipinfo for {} ({})'.format(client.name, client.ip))
client.kick('^6Proxy/VPN Detected!^7')
return
else:
self.debug('({}) not a VPN'.format(client.ip))
def cmd_denyvpn(self, data, client=None, cmd=None):
"""
<player/ip> - Deny a player or a ip to use vpn
"""
if not self._adminPlugin.parseUserCmd(data):
client.message('Correct usage: !dv <player')
return False
sclient = self._adminPlugin.findClientPrompt(data, client)
if not sclient:
return
if self.byPassProtection(sclient):
self.removePlayer(sclient)
client.message('{} has been deleted from whitelist vpn users'.format(sclient.name))
return
elif not self.byPassProtection(sclient):
client.messasge('{} not in whitelist vpn users !!'.format(sclient.name))
def cmd_allowvpn(self, data, client=None, cmd=None):
"""
<player/ip> - Allow a player or a ip to use vpn
"""
if not self._adminPlugin.parseUserCmd(data):
client.message('Correct usage: !av <player')
return False
sclient = self._adminPlugin.findClientPrompt(data, client)
if not sclient:
return
if not self.byPassProtection(sclient):
self.registerPlayer(sclient)
client.message('{} allowed to use vpn'.format(sclient.name))
return
elif self.byPassProtection(sclient):
client.messasge('{} was already on whitelist vpn users'.format(sclient.name))
def removePlayer(self, client):
query = "DELETE FROM vpnblock WHERE client_id={0}".format(client.id)
self.console.storage.query(query)
def registerPlayer(self, client):
query = "INSERT INTO vpnblock VALUES (NULL, {0})".format(client.id)
self.console.storage.query(query)
def byPassProtection(self, client):
query = "SELECT * FROM vpnblock WHERE client_id={0}".format(client.id)
result = self.console.storage.query(query)
if result.rowcount == 1:
return True
return False
def zwamBroDb(self, ip):
"""
Check if a ip is a vpn
Return True if is vpn and False if not
"""
self.debug("checking Zwambro DB")
try:
r = requests.get('https://zwambro.pw/antivpn/checkvpn?ip={}' .format(ip), headers={'Authorization': 'Token {}' .format(self.apiKey3.strip())}, timeout=2)
if r.status_code == 200:
finalRes = r.json()
if finalRes["vpn"] == True:
self.debug('Zwambro db detect this ip ({}) a VPN/Proxy' .format(ip))
return True
except Exception as e:
self.debug('error: ' + str(e))
return False
def xdefConDb(self, ip):
"""
Check if a ip is a vpn
Return True if is vpn and False if not
"""
self.debug("checking xdefcon DB")
try:
headers = {"User-Agent": "{}" .format(self.serverId)}
r = requests.get('https://api.xdefcon.com/proxy/check/?ip={}&vpn=1' .format(ip), headers, timeout=2)
if r.status_code == 200:
finalRes = r.json()
if finalRes["proxy"] == True:
self.debug('Xdefcon db detect this ip ({}) a VPN/Proxy' .format(ip))
return True
except Exception as e:
self.debug('error: ' + str(e))
return False
def proxyCheckDb(self, ip):
"""
Check if a ip is a vpn
Return True if is vpn and False if not
"""
self.debug("checking proxycheck DB")
try:
r2 = requests.get('http://proxycheck.io/v2/{}?key={}&vpn=1' .format(ip, self.apiKey1.strip()), timeout=3)
if r2.status_code == 200:
finalRes2 = r2.json()
if finalRes2[ip]["proxy"] == "yes":
self.debug('proxycheck db detect this ip ({}) a VPN/Proxy' .format(ip))
return True
except Exception as e:
self.debug('error: ' + str(e))
return False
def ipHubDb(self, ip):
"""
Check if a ip is a vpn
Return True if is vpn and False if not
"""
self.debug("checking iphub DB")
try:
r3 = requests.get('http://v2.api.iphub.info/ip/{}'.format(ip), headers={'X-Key': self.apiKey2.strip()}, timeout=3)
if r3.status_code == 200:
finalRes3 = r3.json()
if finalRes3["block"] == 1:
self.debug('Iphub db detect this ip ({}) a VPN/Proxy' .format(ip))
return True
except Exception as e:
self.debug('error: ' + str(e))
return False
def ipApiDb(self, ip):
"""
Check if a ip is a vpn
Return True if is vpn and False if not
"""
self.debug("checking ipapi DB")
try:
r4 = requests.get('http://ip-api.com/json/{}?fields=status,mobile,proxy,hosting,query' .format(ip), timeout=2)
if r4.status_code == 200:
finalRes4 = r4.json()
if finalRes4["proxy"] == True:
self.debug('IP info db detect this ip ({}) a VPN/Proxy' .format(ip))
return True
except Exception as e:
self.debug('error: ' + str(e))
return False
def zwamBroAddVpn(self, ip, info=None):
self.debug("adding VPN to zwambro DB")
try:
headers = {'Content-type': 'application/json', 'Authorization': 'Token {}' .format(self.apiKey3.strip())}
r = requests.post('https://zwambro.pw/antivpn/addvpn', data=json.dumps(info), headers=headers)
if r.status_code == 201:
self.debug('VPN IP added perfeclty')
return True
except Exception as e:
self.debug('error: ' + str(e))
return False
def validIP(self, ip):
"""
Check if a ip has a correct format
Return True when is valid and False when not
"""
try:
IP(ip)
except:
return False
return True