2 years ago
10 kB
import b3 import b3.events import b3.plugin from b3.functions import getCmd # you have to install ipy : pip install ipy from IPy import IP import requests import json import uuid class VpnblockerPlugin(b3.plugin.Plugin): _adminPlugin = None _checklevel = 1 _maxConnections = 100 serverId = "" apiKey1 = "" apiKey2 = "" apiKey3 = "" allowvpn = 40 denyvpn = 40 def onStartup(self): self._adminPlugin = self.console.getPlugin('admin') if not self._adminPlugin: self.error('Could not find admin plugin') return # register our commands if 'commands' in self.config.sections(): for cmd in self.config.options('commands'): level = self.config.get('commands', cmd) sp = cmd.split('-') alias = None if len(sp) == 2: cmd, alias = sp func = getCmd(self, cmd) if func: self._adminPlugin.registerCommand(self, cmd, level, func, alias) self.registerEvent(b3.events.EVT_CLIENT_AUTH, self.onConnect) #prepare user-agent hostIp= str(self.console._publicIp) HostPort= str(self.console._port) spr = '%s:%s' %(hostIp, HostPort) self.serverId = str(uuid.uuid5(uuid.NAMESPACE_DNS, spr)) def onLoadConfig(self): try: self._checklevel = self.config.getint('settings', 'maxlevel') self._maxConnections = self.config.getint('settings', 'maxconnactions') self.apiKey1 = self.getSetting('settings', 'proxycheck.io', b3.STR, self.apiKey1) self.apiKey2 = self.getSetting('settings', 'iphub.info', b3.STR, self.apiKey2) self.apiKey3 = self.getSetting('settings', 'zwambro.pw', b3.STR, self.apiKey3) except Exception, err: self.error(err) def onConnect(self, event): client = event.client self.debug('Checking {} ip...'.format(client.name)) info = {'ip': str(client.ip)} if client.maxLevel > self._checklevel: self.debug("%s is a higher level user, he can't be checked" % client.name) return else: self.debug('%s is a lower level user, checking his ip ...' %client.name) if client.connections > self._maxConnections: self.debug('%s has more than %s connections, not affected by the plugin' % (client.name, self._maxConnections)) return else: self.debug('%s have less than %s connections, we will check his ip now ... ' % (client.name, self._maxConnections)) if self.byPassProtection(client): self.debug('Player {} ({}) bypassed VpnProtection'.format(client.name, client.ip)) return elif self.zwamBroDb(client.ip): self.debug('Access denied by Zwambro db for {} ({})'.format(client.name, client.ip)) client.kick('^6Proxy/VPN Detected!^7') return elif self.xdefConDb(client.ip): self.zwamBroAddVpn(client.ip, info) self.debug('Access denied by xdefcon for {} ({})'.format(client.name, client.ip)) client.kick('^6Proxy/VPN Detected!^7') return elif self.proxyCheckDb(client.ip): self.zwamBroAddVpn(client.ip, info) self.debug('Access denied by Proxycheck for {} ({})'.format(client.name, client.ip)) client.kick('^6Proxy/VPN Detected!^7') return elif self.ipHubDb(client.ip): self.zwamBroAddVpn(client.ip, info) self.debug('Access denied by Iphub for {} ({})'.format(client.name, client.ip)) client.kick('^6Proxy/VPN Detected!^7') return elif self.ipApiDb(client.ip): self.zwamBroAddVpn(client.ip, info) self.debug('Access denied by Ipinfo for {} ({})'.format(client.name, client.ip)) client.kick('^6Proxy/VPN Detected!^7') return else: self.debug('({}) not a VPN'.format(client.ip)) def cmd_denyvpn(self, data, client=None, cmd=None): """ <player/ip> - Deny a player or a ip to use vpn """ if not self._adminPlugin.parseUserCmd(data): client.message('Correct usage: !dv <player') return False sclient = self._adminPlugin.findClientPrompt(data, client) if not sclient: return if self.byPassProtection(sclient): self.removePlayer(sclient) client.message('{} has been deleted from whitelist vpn users'.format(sclient.name)) return elif not self.byPassProtection(sclient): client.messasge('{} not in whitelist vpn users !!'.format(sclient.name)) def cmd_allowvpn(self, data, client=None, cmd=None): """ <player/ip> - Allow a player or a ip to use vpn """ if not self._adminPlugin.parseUserCmd(data): client.message('Correct usage: !av <player') return False sclient = self._adminPlugin.findClientPrompt(data, client) if not sclient: return if not self.byPassProtection(sclient): self.registerPlayer(sclient) client.message('{} allowed to use vpn'.format(sclient.name)) return elif self.byPassProtection(sclient): client.messasge('{} was already on whitelist vpn users'.format(sclient.name)) def removePlayer(self, client): query = "DELETE FROM vpnblock WHERE client_id={0}".format(client.id) self.console.storage.query(query) def registerPlayer(self, client): query = "INSERT INTO vpnblock VALUES (NULL, {0})".format(client.id) self.console.storage.query(query) def byPassProtection(self, client): query = "SELECT * FROM vpnblock WHERE client_id={0}".format(client.id) result = self.console.storage.query(query) if result.rowcount == 1: return True return False def zwamBroDb(self, ip): """ Check if a ip is a vpn Return True if is vpn and False if not """ self.debug("checking Zwambro DB") try: r = requests.get('https://zwambro.pw/antivpn/checkvpn?ip={}' .format(ip), headers={'Authorization': 'Token {}' .format(self.apiKey3.strip())}, timeout=2) if r.status_code == 200: finalRes = r.json() if finalRes["vpn"] == True: self.debug('Zwambro db detect this ip ({}) a VPN/Proxy' .format(ip)) return True except Exception as e: self.debug('error: ' + str(e)) return False def xdefConDb(self, ip): """ Check if a ip is a vpn Return True if is vpn and False if not """ self.debug("checking xdefcon DB") try: headers = {"User-Agent": "{}" .format(self.serverId)} r = requests.get('https://api.xdefcon.com/proxy/check/?ip={}&vpn=1' .format(ip), headers, timeout=2) if r.status_code == 200: finalRes = r.json() if finalRes["proxy"] == True: self.debug('Xdefcon db detect this ip ({}) a VPN/Proxy' .format(ip)) return True except Exception as e: self.debug('error: ' + str(e)) return False def proxyCheckDb(self, ip): """ Check if a ip is a vpn Return True if is vpn and False if not """ self.debug("checking proxycheck DB") try: r2 = requests.get('http://proxycheck.io/v2/{}?key={}&vpn=1' .format(ip, self.apiKey1.strip()), timeout=3) if r2.status_code == 200: finalRes2 = r2.json() if finalRes2[ip]["proxy"] == "yes": self.debug('proxycheck db detect this ip ({}) a VPN/Proxy' .format(ip)) return True except Exception as e: self.debug('error: ' + str(e)) return False def ipHubDb(self, ip): """ Check if a ip is a vpn Return True if is vpn and False if not """ self.debug("checking iphub DB") try: r3 = requests.get('http://v2.api.iphub.info/ip/{}'.format(ip), headers={'X-Key': self.apiKey2.strip()}, timeout=3) if r3.status_code == 200: finalRes3 = r3.json() if finalRes3["block"] == 1: self.debug('Iphub db detect this ip ({}) a VPN/Proxy' .format(ip)) return True except Exception as e: self.debug('error: ' + str(e)) return False def ipApiDb(self, ip): """ Check if a ip is a vpn Return True if is vpn and False if not """ self.debug("checking ipapi DB") try: r4 = requests.get('http://ip-api.com/json/{}?fields=status,mobile,proxy,hosting,query' .format(ip), timeout=2) if r4.status_code == 200: finalRes4 = r4.json() if finalRes4["proxy"] == True: self.debug('IP info db detect this ip ({}) a VPN/Proxy' .format(ip)) return True except Exception as e: self.debug('error: ' + str(e)) return False def zwamBroAddVpn(self, ip, info=None): self.debug("adding VPN to zwambro DB") try: headers = {'Content-type': 'application/json', 'Authorization': 'Token {}' .format(self.apiKey3.strip())} r = requests.post('https://zwambro.pw/antivpn/addvpn', data=json.dumps(info), headers=headers) if r.status_code == 201: self.debug('VPN IP added perfeclty') return True except Exception as e: self.debug('error: ' + str(e)) return False def validIP(self, ip): """ Check if a ip has a correct format Return True when is valid and False when not """ try: IP(ip) except: return False return True
Editor is loading...