#coding: gbk
import subprocess
import os
import sys
import re
import collections
from importlib import reload
def flatten(x):
result = []
for el in x:
if isinstance(x, list) and not isinstance(el, str):
result.extend(flatten(el))
else:
result.append(el)
return result
reload(sys)
class Adb(object):
_path = None #adb path
_lastError = ''
_params = ''
def __init__(self, dits):
self._path = dits['adb_path'] #dit = {'adb_path':'','input_para':''}
self._lastError = ''
self._params = dits['input_para']
self.phoneInfoDict = {"2_sysversion":'', "3_sysapi":'', "4_cpu":'',
"1_product":'', "6_serialId":'', "7_imei":'',
"5_mac":'', "9_density":'', "a_memory":'',
"b_storage":'', "c_sdcard":'', "8_pix":'',
"d_sdcard":''}
@staticmethod
def _run(args):
p = subprocess.Popen(flatten(args), stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
shell=False)
ret = p.communicate()
#print((ret))
#output =p.stdout.read()
#print(output)
#p.stdout.close()
#error = p.stderr.readlines()
#p.stderr.close()
# return output if ret is 0 else error
return Adb._out2str(ret)
@staticmethod
def _out2str(text_seq):
#print(str.strip(" ".join(id.decode() for id in text_seq).replace("\r", "").replace("\n", "")))
return str.strip(" ".join(id.decode() for id in text_seq).replace("\r", "").replace("\n", ""))
def connect(self, host):
return self._run([self._path, "connect", host])
def disconnect(self, host):
return self._run([self._path, "disconnect", host])
def shell(self, host, command):
args = flatten([self._path, "-s", host, command.split(" ")])
return self._run(args)
def logcat(self, host):
args = [self._path, "-s", host, "logcat"]
return subprocess.Popen(args, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
shell=False)
# -r: replace existing application
# -t: allow test packages
# -d: allow version code downgrade
def install_apk(self, host, file_path):
_state = self.get_state(host)
self._lastError = ''
if _state is "offline":
self._lastError = "device offline"
return False
args = [self._path, "-s", host, "install", "-r", file_path]
# args = [self._path, "-s", host, "install", "-rt", file_path]
print(self._run(args))
return True
def uninstall_apk(self, host, packageName):
_state = self.get_state(host)
self._lastError = ''
if _state is "offline":
self._lastError = "device offline"
return False
args = [self._path, "-s", host, "uninstall", packageName]
result=self._run(args)
#print(result)
if result.find("Unknown package")!=-1:
print(u'设备中未安装该APK')
return False
return True
def get_state(self, host):
_state = self.shell(host, "get-state")
if "unknown" in _state:
return "offline"
elif "device" in _state:
return "online"
else:
return "offline"
pass
def restart(self):
self._run([self._path, "kill-server"])
return self._run([self._path, "start-server"])
def screencap(self,host,outfile):
_state = self.get_state(host)
self._lastError = ''
if _state is "offline":
self._lastError = "device offline"
return False
args = [self._path, "-s", host, "shell","/system/bin/screencap", "-p", "/data/local/tmp/screenshot.png"]
result = self._run(args)
if result.find('Error') == -1:
args = [self._path, "-s", host, "pull", "/data/local/tmp/screenshot.png", outfile]
result = self._run(args)
if os.path.exists(outfile):
args=[self._path,"-s",host, "shell", "rm", "/data/local/tmp/screenshot.png"]
result=self._run(args)
return True
result='保存截图到本地失败'
self._lastError = result
return False
def getDeviceList(self):
device = self._run([self._path, "devices"])
lists = None
if device != 'List of devices attached':
#获取device,list列表
lists = device.replace('List of devices attached','').replace('device','').replace('offline','').replace('unknown','').replace(' ','').strip().split('\t')
if lists:
pass
else:
lists = []
return lists
def kill(self):
self._run([self._path, "kill-server"])
def path_provided(self):
return self._path is not None and not self._path == ""
def getScreenCap(self):
devicelist = self.getDeviceList()
#print(len(devicelist))
if len(devicelist) == 0:
#print("error")
print("设备列表为空,请连接设备")
return False
host = devicelist[0]
result = self.screencap(host, self._params)
if not result:
print(self._lastError)
return False
else:
print(self._params)
return True
# 获取系统版本
def getSysversion(self, host):
args = [self._path, "-s", host, "shell", "getprop", "ro.build.version.release"]
result = self._run(args)
result = str(result).strip()
try:
pattern = r'^[1-9]+\.?\d*\.?\d*\.?\d*$'
m = re.search(pattern, result)
if m:
self.phoneInfoDict['2_sysversion'] = "系统版本:" + result
# print result
except Exception as e:
self.phoneInfoDict['2_sysversion'] = ''
# 获取系统API
def getSysApi(self, host):
args = [self._path, "-s", host, "shell", "getprop", "ro.build.version.sdk"]
result = self._run(args)
result = str(result).strip()
try:
pattern = r'^[1-9]+\.?\d*$'
m = re.search(pattern, result)
if m:
self.phoneInfoDict['3_sysapi'] = u"系统API版本:" + result
# print result
except Exception as e:
self.phoneInfoDict['3_sysapi'] = ''
# 获取CPU类型
def getCpu(self, host):
args = [self._path, "-s", host, "shell", "getprop", "ro.product.cpu.abi"]
result = self._run(args)
result = str(result).strip()
try:
if result != '' and result.find('not found') == -1:
self.phoneInfoDict['4_cpu'] = "系统CPU类型:" + result
# print result
except Exception as e:
self.phoneInfoDict['4_cpu'] = ''
# 获取产品信息 adb -d shell getprop ro.product.model
def getProduct(self, host):
args = [self._path, "-s", host, "-d", "shell", "getprop", "ro.product.model"]
result = self._run(args)
result = str(result).strip()
try:
if result != '' and result.find('not found') == -1:
self.phoneInfoDict['1_product'] = "手机类型:" + result
# print result
except Exception as e:
self.phoneInfoDict['1_product'] = ''
# 获取序列号
def getSerialId(self, host):
args = [self._path, "-s", host, "get-serialno"]
result = self._run(args)
result = str(result).strip()
pattern = r'^\w+$'
try:
m = re.search(pattern, result)
if m: