import random, json, urllib3, threading
from requests_html import HTMLSession
from concurrent.futures import ThreadPoolExecutor
import sound
urllib3.disable_warnings()
user_agent_list = [
"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.1 (KHTML, like Gecko) Chrome/22.0.1207.1 Safari/537.1"
"Mozilla/5.0 (X11; CrOS i686 2268.111.0) AppleWebKit/536.11 (KHTML, like Gecko) Chrome/20.0.1132.57 Safari/536.11",
"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/536.6 (KHTML, like Gecko) Chrome/20.0.1092.0 Safari/536.6",
"Mozilla/5.0 (Windows NT 6.2) AppleWebKit/536.6 (KHTML, like Gecko) Chrome/20.0.1090.0 Safari/536.6",
"Mozilla/5.0 (Windows NT 6.2; WOW64) AppleWebKit/537.1 (KHTML, like Gecko) Chrome/19.77.34.5 Safari/537.1",
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/536.5 (KHTML, like Gecko) Chrome/19.0.1084.9 Safari/536.5",
"Mozilla/5.0 (Windows NT 6.0) AppleWebKit/536.5 (KHTML, like Gecko) Chrome/19.0.1084.36 Safari/536.5",
"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/536.3 (KHTML, like Gecko) Chrome/19.0.1063.0 Safari/536.3",
"Mozilla/5.0 (Windows NT 5.1) AppleWebKit/536.3 (KHTML, like Gecko) Chrome/19.0.1063.0 Safari/536.3",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_8_0) AppleWebKit/536.3 (KHTML, like Gecko) Chrome/19.0.1063.0 Safari/536.3",
"Mozilla/5.0 (Windows NT 6.2) AppleWebKit/536.3 (KHTML, like Gecko) Chrome/19.0.1062.0 Safari/536.3",
"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/536.3 (KHTML, like Gecko) Chrome/19.0.1062.0 Safari/536.3",
"Mozilla/5.0 (Windows NT 6.2) AppleWebKit/536.3 (KHTML, like Gecko) Chrome/19.0.1061.1 Safari/536.3",
"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/536.3 (KHTML, like Gecko) Chrome/19.0.1061.1 Safari/536.3",
"Mozilla/5.0 (Windows NT 6.1) AppleWebKit/536.3 (KHTML, like Gecko) Chrome/19.0.1061.1 Safari/536.3",
"Mozilla/5.0 (Windows NT 6.2) AppleWebKit/536.3 (KHTML, like Gecko) Chrome/19.0.1061.0 Safari/536.3",
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/535.24 (KHTML, like Gecko) Chrome/19.0.1055.1 Safari/535.24",
"Mozilla/5.0 (Windows NT 6.2; WOW64) AppleWebKit/535.24 (KHTML, like Gecko) Chrome/19.0.1055.1 Safari/535.24"]
e = threading.Event()
shows_dict = {'上午场':43,'下午场':44}
class Function:
def __init__(self, window):
self.window = window
self.catch = ''
self.watch = ''
self.s = HTMLSession()
def get_shows(self):
url = "https://www.360youmei.com/v5/driver-lines/norm-list?mine_id=1&&pages=1&&limit=10"
headers = {
'Host': 'www.360youmei.com',
'accept': '*/*',
'content-type': 'application/json',
'app-type': 'driver-applet',
'user-agent': random.choice(user_agent_list),
'referer': 'https://servicewechat.com/',
'current-user-id': str(self.user_id),
'auth-token': self.token,
}
try:
response = self.s.get(url, headers=headers, verify=False, timeout=5)
except:
self.window.write_event_value('show', '%s 场次获取超时' % (self.user))
return {}
if response.status_code != 200:
self.window.write_event_value('show', '%s 后台状态码预警:%s' % (self.user, response.status_code))
return {}
else:
response = json.loads(response.text)
item_list = {}
if response['status'] == 'success':
for i in response['data']['data']:
item_list.setdefault(i['norm_name'], {'id': i['id'], 'count': i['count'],
'rest_count': int(i['count']) - int(i['queued_count'])})
else:
self.window['status'].print('%s %s抢号---%s' % (self.user, self.passwd, response['msg']))
return item_list
def get_signal(self):
self.catch = False
def get_signal_watch(self):
self.catch = False
e.set()
def login(self,user,passwd):
self.user = user
self.passwd = passwd
url = "https://www.360youmei.com/v1/sign-in"
payload = json.dumps({
"mobile": str(self.user),
"password": str(self.passwd)
})
headers = {
'Host': 'www.360youmei.com',
'app-type': 'driver-applet',
'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/105.0.0.0 Safari/537.36',
'content-type': 'application/json',
'accept': '*/*',
'x-requested-with': 'XMLHttpRequest',
'origin': 'https://www.360youmei.com',
'referer': 'https://www.360youmei.com/auth/sign-in',
}
try:
response = json.loads(self.s.post(url, headers=headers, data=payload, verify=False, timeout=5).text)
except:
self.window.write_event_value('show', '登录失败,原因:网络堵塞')
return
if response['status'] == 'success':
self.token = response['data']['auth_token']
self.user_id = response['data']['user_id']
self.window.write_event_value('show','登录成功,请选择商品')
item_list = self.get_shows()
self.window.write_event_value('return-item', item_list)
else:
self.window.write_event_value('show', '登录失败,原因:%s'%response['msg'])
return
def submit_order(self, id):
url = "https://www.360youmei.com/v5/driver-lines/driver-lines"
payload = json.dumps({
"id": str(id),
"origin": "0,0",
"voice_call": False
})
headers = {
'Host': 'www.360youmei.com',
'content-type': 'application/json',
'app-type': 'driver-applet',
'accept': '*/*',
'auth-token': self.token,
'current-user-id': str(self.user_id),
'user-agent': random.choice(user_agent_list),
'referer': 'https://servicewechat.com/'
}
if not self.catch:
return
try:
response = self.s.post(url, headers=headers, data=payload, verify=False, timeout=5)
except:
return '%s 抢购超时,请检查网络'%(self.user)
if response.status_code != 200:
return '%s 后台状态码预警:%s'%(self.user, response.status_code)
else:
response = json.loads(response.text)
if response['status'] == 'error':
return '%s 抢购失败,原因:%s请检查网络' % (self.user,response['msg'])
else:
txt = '%s 抢购成功, 产品:%s\n' % (self.user,self.product)
try:
with open('./成功报告.txt','a',encoding='utf-8') as file:
file.write(txt)
file.close()
except:
pass
try:
sound.play_music()
except:
pass
self.catch = False
return txt
def parse_result(self,result):
if not self.catch:
try:
self.pool.shutdown()
except:
pass
return
self.window.write_event_value('show', result.result())
def parse_result_watch(self,result):
if not self.catch:
try:
self.pool_watch.shutdown()
except:
pass
return
self.window.write_event_value('show', result.result())
def set_task(self, product):
self.catch = True
self.product = product
id = ''
while True:
if not self.catch:
break
item_list = self.get_shows()
if item_list == {}:
self.window.write_event_value('show', '%s未检测到预约场次�