Dear Python community,
I hope this message finds you in good health. I am reaching out to seek your assistance regarding an issue that I have been experiencing with one of my Python scripts that involves the use of libcurl.
The script, named proxy-speed.py, is employed to measure and display the speed of different proxies. It runs through a list of proxies, giving the current and average speed for each, one at a time. However, the issue arises when I try to terminate the script prematurely using Ctrl+C. The KeyboardInterrupt appears to only pause the current task, and the script instantaneously proceeds with the next proxy on the list, clearly illustrating that it doesnāt exit entirely.
Hereās an example illustrating the issue:
File "/home/werner/Desktop/proxy-speed/proxy-speed.py", line 54, in progress
def progress(download_t, download_d, upload_t, upload_d):
KeyboardInterrupt
Average Speed: 3944.97 kB/s
Proxy: SG_ssr_futeapbquf5m.nodelist.club_1356_8ce27b1301fcbcb77cc5abb28cb127a6
^CTraceback (most recent call last):
File "/home/werner/Desktop/proxy-speed/proxy-speed.py", line 54, in progress
def progress(download_t, download_d, upload_t, upload_d):
KeyboardInterrupt
Average Speed: 72.81 kB/s
This repeat pattern continues for every Ctrl+C interruption made. Hence, my goal is to modify the script so that a single Ctrl+C command would result in a comprehensive termination of the script.
Any guidance or suggestions that you could offer regarding this issue would be greatly appreciated. If there is any further information needed to better understand the issue, please let me know.
The content of proxy-speed.py is as follows:
import subprocess
import time
import pycurl
from io import BytesIO
def fetch_proxies():
command = 'echo "show stat" | sudo socat stdio /var/run/haproxy.sock 2>/dev/null | awk -F, \'$1=="socks5" && !($2~/^(FRONTEND|BACKEND)$/) {print $2,$74}\''
process = subprocess.Popen(command, stdout=subprocess.PIPE, shell=True)
output, error = process.communicate()
result_dict = {} # Initialize default empty dictionary
if error is not None:
return result_dict # Return the empty dictionary in case of error
lines = output.decode('utf-8').split('\n')
for line in lines:
if line != "":
key_value = line.split(' ')
result_dict[key_value[0]] = key_value[1]
return result_dict
def test_proxy(proxy, url):
global last_calc_time, download_start_time
buffer = BytesIO()
c = pycurl.Curl()
c.setopt(pycurl.URL, url)
c.setopt(pycurl.WRITEDATA, buffer)
c.setopt(pycurl.PROXY, proxy)
c.setopt(pycurl.PROXYTYPE, pycurl.PROXYTYPE_SOCKS5_HOSTNAME)
c.setopt(pycurl.NOPROGRESS, False)
c.setopt(pycurl.XFERINFOFUNCTION, progress)
c.setopt(pycurl.TIMEOUT, 5)
download_start_time = time.time()
last_calc_time = download_start_time
try:
c.perform()
except pycurl.error as e:
pass
except KeyboardInterrupt:
print("Script interrupted by user. Exiting.")
sys.exit(0) # immediately exit the script
average_speed = c.getinfo(pycurl.SPEED_DOWNLOAD) / 1024
return average_speed
def progress(download_t, download_d, upload_t, upload_d):
global last_calc_time, download_start_time
current_time = time.time()
if current_time - last_calc_time >= 2:
elapsed_time = current_time - download_start_time
current_speed = download_d / elapsed_time / 1024
print(f"Current Speed: {current_speed:.2f} kB/s")
last_calc_time = current_time
proxy_data = fetch_proxies()
url = "http://ipv4.download.thinkbroadband.com/1GB.zip"
for key, value in proxy_data.items():
print(f"Proxy: {key}")
try:
average_speed = test_proxy(value, url)
print(f"Average Speed: {average_speed:.2f} kB/s")
except KeyboardInterrupt:
print("Script interrupted by user. Exiting.")
break # exit the for loop
Thank you very much for your help in advance.
Best Regards,
Zhao
P.S: If possible, could you please provide both an explanation and snippet of the solution so that I can understand and learn from it for future references.