Untitled

 avatar
unknown
plain_text
2 months ago
5.8 kB
6
Indexable
import requests
import json

class xaccel():

    def create_stream(self, stream_name, profile_id, url, stream_headers=None, video_map=None, rate_emulation='no'):
    
        headers = {
            'Authorization': 'token ' + self.token,
        }

        json_data = {
            'name': stream_name,
            'input_urls': [
                url,
            ],
            'profile_id': profile_id,
            'rate_emulation': rate_emulation,
            'video_encoders': [
                {
                    'codec': 'copy',
                },
            ],
            'audio_encoders': [
                {
                    'codec': 'copy',
                },
            ],
            'outputs': [
                {
                    'protocol': 'hls',
                    'filename': 'index.m3u8',
                },
                {
                    'protocol': 'http',
                    'filename': 'index.ts',
                }
            ],
        }
        
        if self.proxy is not None:
            json_data['http_proxy'] = self.proxy
            
        if self.user_agent is not None:
            json_data['user_agent'] = self.user_agent
            
        if stream_headers is not None:
            json_data['headers'] = stream_headers
            
        if video_map is not None:
            json_data['video_map'] = video_map

        response = requests.post(f'{self.base_url}/api/stream/add', headers=headers, json=json_data)

        return response.content.decode()
        
    def get_stream_config(self, stream_name):
        headers = {
            'Authorization': 'token ' + self.token,
        }

        response = requests.get(f'{self.base_url}/api/stream/{stream_name}/config', headers=headers)
        
        data = json.loads(response.content)
        
        return data
        
    def update_stream_header(self, stream_name, header):
        headers = {
            'Authorization': 'token ' + self.token,
        }

        json_data = {
            'headers': header
        }

        response = requests.post(f'{self.base_url}/api/stream/{stream_name}/update', headers=headers, json=json_data)
        
        #data = json.loads(response.content)
        
        #return data
        
    def update_stream_source(self, stream_name, url):
        headers = {
            'Authorization': 'token ' + self.token,
        }

        json_data = [
            url,
        ]

        response = requests.post(f'{self.base_url}/api/stream/{stream_name}/source-update', headers=headers, json=json_data)
        
        return response.content.decode()
        
    def delete_stream(self, stream_name):
        headers = {
            'Authorization': 'token ' + self.token,
        }

        response = requests.post(f'{self.base_url}/api/stream/{stream_name}/delete', headers=headers)
        
        print(response)
        print(response.content)
        
    def fetch_stream_stats(self):
        headers = {
            'Authorization': 'token ' + self.token,
        }

        response = requests.get(f'{self.base_url}/api/stream/stats', headers=headers)
        
        data = json.loads(response.content)
        
        self.stream_stats = data
        
        return data
        
    def get_stream_stats(self, stream_name):

        data = self.stream_stats
        
        for s in data:
            if s['name'].lower() == stream_name.lower():
                return s
                
    def start_stream(self, stream_id):

        headers = {
            'Authorization': 'Token ' + self.token,
        }

        response = requests.post(f'{self.base_url}/api/stream/{stream_id}/start', headers=headers)
        
        return response.content.decode()
        
    def stop_stream(self, stream_id):

        headers = {
            'Authorization': 'Token ' + self.token,
        }

        response = requests.post(f'{self.base_url}/api/stream/{stream_id}/stop', headers=headers)
        
        return response.content.decode()
        
    def restart_stream(self, stream_id):
        self.stop_stream(stream_id)
        self.start_stream(stream_id)
        
    #This doesn't seem to work on the XAccel side
    def change_keys(self, stream_id, keys):
        headers = {
            'Authorization': 'Token ' + self.token,
        }

        response = requests.post(f'{self.base_url}/api/stream/{stream_id}/decryption-key-update', headers=headers, json=keys)
        
        print(response.content)
        
    def do_stream(self, stream_name, profile_id, url, keys=None, header=None, video_map=None, rate_emulation='no'):
        cs = self.create_stream(stream_name, profile_id, url, header, video_map, rate_emulation)
        
        if 'error' in cs:
            cs = json.loads(cs)
            if 'exists' in cs['error']:
                uss = self.update_stream_source(stream_name, url)
                
                if 'error' in uss:
                    uss = json.loads(uss)
                    print(uss['error'])
                else:
                    if header is not None:
                        self.update_stream_header(stream_name, header)
                    print(f'Updated {stream_name}')
            else:
                print(cs['error'])
                quit()
        else:
            print(f'Created {stream_name}')
            
        self.restart_stream(stream_name)
        
    def set_proxy(self, proxy):
        self.proxy = proxy
        
    def set_user_agent(self, user_agent):
        self.user_agent = user_agent
            
    def __init__(self, base_url, token, proxy=None, user_agent=None):
        self.base_url = base_url
        self.token = token
        self.proxy = proxy
        self.user_agent = user_agent
Editor is loading...
Leave a Comment