Source code for TorConnection

import requests #pip install request[socks] (In case of error, uninstall requests)
from stem import Signal # pip install stem
from stem.control import Controller
from io import BytesIO
import time

# TOR_PROTOCOL: You don't want to change this unless you are running Tor through
# unix sockets. However, if that's the case, you'll also need to
# import requests_unixsocket and call 'requests_unixsocket.Session()
# instead of requests.session().
TOR_PROTOCOL = 'socks5h'
# TOR_ADDR: Address of an already running socks5 proxy.
# Normally, you don't want to change this as Tor is intended to be running on localhost.
TOR_ADDR = '127.0.0.1'
# TOR_PORT: Port of the already running socks5 proxy. Normally 9050 but it can be changed.
TOR_PORT = '9050'
# TOR_CONTORL_PORT: Tor ControlPort. You will need to open it through torrc.
# Set ControlPort in torrc to whatever you want (default 9051)
TOR_CONTROL_PORT = 9051
# TOR_CONTROL_PASSWD: ControlPort password. The default password is "password".
# Change it by running tor --hash-password "<new_password>" where <new_password>
# is your password. Then copy the hash on HashedControlPassword (on your torrc)
# and your non-hashed password here.
TOR_CONTROL_PASSWD = 'Jx8I1_3zE@9'

[docs]class TorConnection(): def __init__(self): self.update_tor_session() # This routes your requests session through the socks5 proxy
[docs] def update_tor_session(self) -> requests.session: # Your current session self.session = requests.session() # Set the proxies for http and https connections to your socks5 self.session.proxies = { 'http': f'{TOR_PROTOCOL}://{TOR_ADDR}:{TOR_PORT}', 'https': f'{TOR_PROTOCOL}://{TOR_ADDR}:{TOR_PORT}' } # Return the session return self.session
# This renews the connection so that you get a new external IP
[docs] def renew_connection(self) -> requests.session or False: try: # Uses stem.control to connect to the ControlPort with Controller.from_port(port = TOR_CONTROL_PORT) as controller: # Authenticate using your password controller.authenticate(password=TOR_CONTROL_PASSWD) # Check if NEWNYM is available if controller.is_newnym_available: # Send the NEWNYM signal to the control port. controller.signal(Signal.NEWNYM) else: # If not available, wait until it's available. time.sleep(controller.get_newnym_wait) controller.signal(Signal.NEWNYM) # Refresh the session return self.update_tor_session() except Exception as exc: # Could not connect to the control port. print('Connection to Tor through it\'s control port failed. Make sure Tor is using a control port and that the password is correct.') print(str(exc)) return False
[docs] def request_get_torified(self, urloc: str, headers: dict = None, urlparams: dict = None, cookies: dict = None, allow_redirects:bool = False) -> requests.Response: return self.session.get(url = urloc, headers=headers, params = urlparams, cookies=cookies, allow_redirects = allow_redirects)
[docs] def request_post_torified(self, urloc: str, headers: dict = None, data: dict = None, urlparams: dict = None, files: dict = None, cookies: dict = None, allow_redirects:bool = False) -> requests.Response: return self.session.post(url = urloc, headers=headers, data=data, params = urlparams, files = files, cookies=cookies, allow_redirects = allow_redirects)
[docs] def request_put_torified(self, urloc: str, headers: dict = None, data: dict = None, urlparams: dict = None, files: dict = None, cookies: dict = None, allow_redirects:bool = False) -> requests.Response: return self.session.put(url = urloc, headers=headers, data=data, params = urlparams, files = files, cookies=cookies, allow_redirects = allow_redirects)
[docs] def request_delete_torified(self, urloc: str, headers: dict = None, urlparams: dict = None, cookies: dict = None, allow_redirects:bool = False) -> requests.Response: return self.session.delete(url = urloc, headers=headers, params = urlparams, cookies=cookies, allow_redirects = allow_redirects)
[docs] def request_head_torified(self, urloc: str, headers: dict = None, urlparams: dict = None, cookies: dict = None, allow_redirects:bool = False) -> requests.Response: return self.session.head(url = urloc, headers=headers, params = urlparams, cookies=cookies)
[docs] def request_options_torified(self, urloc: str, headers: dict = None, urlparams: dict = None, cookies: dict = None, allow_redirects:bool = False) -> requests.Response: return self.session.options(url = urloc, headers=headers, params = urlparams, cookies=cookies, allow_redirects = allow_redirects)
[docs] def request_get_binary_torified(self, urloc: str, headers: dict = None, urlparams: dict = None, cookies: dict = None, allow_redirects:bool = False) -> BytesIO: return BytesIO(self.session.get(url = urloc, headers=headers, params = urlparams, cookies=cookies, allow_redirects = allow_redirects).content)
[docs] def request_get_json_torified(self, urloc: str, headers: dict = None, urlparams: dict = None, cookies: dict = None, allow_redirects:bool = False) -> dict: return self.session.get(url = urloc, headers=headers, params = urlparams, cookies=cookies, allow_redirects = allow_redirects).json()