#!/usr/bin/python
"""Legacy Historian script for analyzing Android bug reports."""
# Copyright 2016 Google Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# TO USE: (see also usage() below)
# adb shell dumpsys batterystats --enable full-wake-history (post-KitKat only)
# adb shell dumpsys batterystats --reset
# Optionally start powermonitor logging:
# For example, if using a Monsoon:
# if device/host clocks are not synced, run historian.py -v
# cts/tools/utils/monsoon.py --serialno 2294 --hz 1 --samples 100000 \
# -timestamp | tee monsoon.out
# ...let device run a while...
# stop monsoon.py
# adb bugreport > bugreport.txt
# ./historian.py -p monsoon.out bugreport.txt
import collections
import datetime
import fileinput
import getopt
import re
import StringIO
import subprocess
import sys
import time
POWER_DATA_FILE_TIME_OFFSET = 0 # deal with any clock mismatch.
BLAME_CATEGORY = "wake_lock_in" # category to assign power blame to.
ROWS_TO_SUMMARIZE = ["wake_lock", "running"] # -s: summarize these rows
getopt_debug = 0
getopt_bill_extra_secs = 0
getopt_power_quanta = 15 # slice powermonitor data this many seconds,
# to avoid crashing visualizer
getopt_power_data_file = False
getopt_proc_name = ""
getopt_highlight_category = ""
getopt_show_all_wakelocks = False
getopt_sort_by_power = True
getopt_summarize_pct = -1
getopt_report_filename = ""
getopt_generate_chart_only = False
getopt_disable_chart_drawing = False
def usage():
"""Print usage of the script."""
print "\nUsage: %s [OPTIONS] [FILE]\n" % sys.argv[0]
print " -a: show all wakelocks (don't abbreviate system wakelocks)"
print " -c: disable drawing of chart"
print " -d: debug mode, output debugging info for this program"
print (" -e TIME: extend billing an extra TIME seconds after each\n"
" wakelock, or until the next wakelock is seen. Useful for\n"
" accounting for modem power overhead.")
print " -h: print this message."
print (" -m: generate output that can be embedded in an existing page.\n"
" HTML header and body tags are not outputted.")
print (" -n [CATEGORY=]PROC: output another row containing only processes\n"
" whose name matches uid of PROC in CATEGORY.\n"
" If CATEGORY is not specified, search in wake_lock_in.")
print (" -p FILE: analyze FILE containing power data. Format per\n"
" line: <timestamp in epoch seconds> <amps>")
print (" -q TIME: quantize data on power row in buckets of TIME\n"
" seconds (default %d)" % getopt_power_quanta)
print " -r NAME: report input file name as NAME in HTML."
print (" -s PCT: summarize certain useful rows with additional rows\n"
" showing percent time spent over PCT% in each.")
print " -t: sort power report by wakelock duration instead of charge"
print " -v: synchronize device time before collecting power data"
print "\n"
sys.exit(1)
def parse_time(s, fmt):
"""Parses a human readable duration string into milliseconds.
Takes a human readable duration string like '1d2h3m4s5ms' and returns
the equivalent in milliseconds.
Args:
s: Duration string
fmt: A re object to parse the string
Returns:
A number indicating the duration in milliseconds.
"""
if s == "0": return 0.0
p = re.compile(fmt)
match = p.search(s)
try:
d = match.groupdict()
except IndexError:
return -1.0
ret = 0.0
if d["day"]: ret += float(d["day"])*60*60*24
if d["hrs"]: ret += float(d["hrs"])*60*60
if d["min"]: ret += float(d["min"])*60
if d["sec"]: ret += float(d["sec"])
if d["ms"]: ret += float(d["ms"])/1000
return ret
def time_float_to_human(t, show_complete_time):
if show_complete_time:
return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(t))
else:
return time.strftime("%H:%M:%S", time.localtime(t))
def abbrev_timestr(s):
"""Chop milliseconds off of a time string, if present."""
arr = s.split("s")
if len(arr) < 3: return "0s"
return arr[0]+"s"
def timestr_to_jsdate(timestr):
return "new Date(%s * 1000)" % timestr
def format_time(delta_time):
"""Return a time string representing time past since initial event."""
if not delta_time:
return str(0)
timestr = "+"
datet = datetime.datetime.utcfromtimestamp(delta_time)
if delta_time > 24 * 60 * 60:
timestr += str(datet.day - 1) + datet.strftime("d%Hh%Mm%Ss")
elif delta_time > 60 * 60:
timestr += datet.strftime("%Hh%Mm%Ss").lstrip("0")
elif delta_time > 60:
timestr += datet.strftime("%Mm%Ss").lstrip("0")
elif delta_time > 1:
timestr += datet.strftime("%Ss").lstrip("0")
ms = datet.microsecond / 1000.0
timestr += "%03dms" % ms
return timestr
def format_duration(dur_ms):
"""Return a time string representing the duration in human readable format."""
if not dur_ms:
return "0ms"
ms = dur_ms % 1000
dur_ms = (dur_ms - ms) / 1000
secs = dur_ms % 60
dur_ms = (dur_ms - secs) / 60
mins = dur_ms % 60
hrs = (dur_ms - mins) / 60
out = ""
if hrs > 0:
out += "%dh" % hrs
if mins > 0:
out += "%dm" % mins
if secs > 0:
out += "%ds" % secs
if ms > 0 or not out:
out += "%dms" % ms
return out
def get_event_category(e):
e = e.lstrip("+-")
earr = e.split("=")
return earr[0]
def get_quoted_region(e):
e = e.split("\"")[1]
return e
def get_after_equal(e):
e = e.split("=")[1]
return e
def get_wifi_suppl_state(e):
try:
e = get_after_equal(e)
return e.split("(")[0]
except IndexError:
return ""
def get_event_subcat(cat, e):
"""Get subcategory of an category from an event string.
Subcategory can be use to distinguish simultaneous entities
within one category. To track possible concurrent instances,
add category name to concurrent_cat. Default is to track
events using only category name.
Args:
cat: Category name
e: Event name
Returns:
A string that is the subcategory of the event. Returns
the substring after category name if not empty and cat
is one of the categories tracked by concurrent_cat.
Default subcategory is the empty string.
"""
concurrent_cat = {"wake_lock_in", "sync", "top", "job", "conn"}
if cat in concurrent_cat:
try:
return get_after_equal(e)
except IndexError:
pass
return ""
def get_proc_pair(e):
if ":" in e:
proc_pair = get_after_equal(e)
return proc_pair.split(":", 1)
else:
return ("", "")
def as_to_mah(a):
return a * 1000 / 60 / 60
def apply_fn_over_range(fn, start_time, end_time, arglist):
"""Apply a given function per second quanta over a time range.
Args:
fn: The function to apply
start_time: The starting time of the whole duration
end_time: The ending time of the whole duration
arglist: Additional argument list
Returns:
A list of results generated by applying the function
over the time range.
"""
results = []
cursor = start_time
while cursor < end_time:
cursor_int = int(cursor)
next_cursor = float(cursor_int + 1)
if next_cursor > end_time: next_cursor = end_time
time_this_quanta = next_cursor - cursor
results.append(fn(cursor_int, time_this_quanta, *arglist))
cursor = next_cursor
return results
def space_escape(match):
value = match.group()
p = re.compile(r"\s+")
return p.sub("_", value)
def parse_reset_time(line):
line = line.strip()
line
评论0