1
0
mirror of https://bitbucket.org/librepilot/librepilot.git synced 2025-01-18 03:52:11 +01:00

742 lines
24 KiB
Python

#!/usr/bin/env python
# This file is Copyright 2009, 2010 Dean Hall.
#
# This file is part of the Python-on-a-Chip program.
# Python-on-a-Chip is free software: you can redistribute it and/or modify
# it under the terms of the GNU LESSER GENERAL PUBLIC LICENSE Version 2.1.
#
# Python-on-a-Chip is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
# A copy of the GNU LESSER GENERAL PUBLIC LICENSE Version 2.1
# is seen in the file COPYING in this directory.
"""
PyMite Heap Dump
================
Parses a heap dump into human-readable format.
The heap dump file is created by inserting a calls to heap_dump()
inside heap_gcRun(). Using two calls, a before and after, is ideal.
The dump format is:
=========== ==========================================
NumBytes Contents
=========== ==========================================
6 string: PMDUMP or PMUDMP depending on target endianess (little and big respectively)
2 uint16: pointer size
2 uint16: dump format version
2 uint16: bifield of pmfeatures enabled on target
4 uint32: HEAP_SIZE
p pointer to heap start
HEAP_SIZE contents of heap (byte array)
4 uint16: NUM_ROOTS
NUM_ROOTS*p pointers to root objects
=========== ==========================================
The heap_dump() function names files incrementally starting from:
pmheapdump00.bin
pmheapdump01.bin
...
pmheapdumpNN.bin
"""
## @file
# @copybrief pmHeapDump
## @package pmHeapDump
# @brief PyMite Heap Dump
#
# See the source docstring for details.
import os, struct, sys, types, UserDict, collections
try:
import cStringIO as StringIO
except:
import StringIO
def _ellipse(string, length):
"""Truncates a string to a given size with ellipses
"""
if len(string) < length:
return string
else:
return string[0 : length-3] + '...'
def _dot_escape(string):
return string.replace('<', '\<').replace('>', '\>')
def unpack_fp(fmt, fp, increment=True):
"""Unpacks a structure from a file, increment the position if set.
"""
if increment:
return struct.unpack(fmt, fp.read(struct.calcsize(fmt)))
else:
pos = fp.tell()
ret = struct.unpack(fmt, fp.read(struct.calcsize(fmt)))
fp.seek(pos)
return ret
PmFieldInfo = collections.namedtuple('PmFieldInfo', 'name type mul')
PmBitfieldInfo = collections.namedtuple('PmFieldInfo', 'type fields mul')
class PmTypeInfo(object):
"""Model of an object type
"""
def __init__(self, name, fmt):
"""Initializes a new object type from name and a format string
The format consist of a list of field name, type and optionnal multiplicity
fmt = "fieldname1:fieldtype1[:multiplicity1],fieldname2:fieldtype2[:multiplicity2],..."
where:
fieldname : a string naming a field...
fieldtype : a type understand by python struct [c, b, B, ?, h, H, i, I, l, L, q, Q, f, d, s, p, P] or '.' for a bit
P (pointer) is translated to the correct size based on the dump header information
multiplicity : number of item for a list. multiplicity can be:
any positive integer
a string naming a field already described
Multiplicity can also specify:
'*' to read as much as possible in the limit of the size object
'<name' to read as long as memory offset is lower than then value of field name
"""
self._parse_fmt(fmt)
self.name = name
def _parse_fmt(self, fmt):
self.fields = []
for f in map(lambda s: s.strip(), fmt.split(',')):
if f == '':
continue
name, typ = f.split(':', 1)
if ':' in typ:
typ, mul = typ.split(':')
try:
mul = int(mul)
except:
pass
else:
mul = False
self.fields.append(PmFieldInfo(name, typ, mul))
# Bitfield support : concatenate bit-fields in a bitfield
bitfield = None
self._rawfields = []
for i, f in enumerate(self.fields):
if f.type != '.':
self._rawfields.append(f)
continue
if bitfield == None:
bitfield = []
bitfield.append(f)
# Next field is a bit in this bitfield
if i + 1 < len(self.fields) and self.fields[i + 1].type == '.':
continue
# Last bit in field : sum the bitfield and add it to the fields list
bitcount = sum([sf.mul for sf in bitfield])
bytes = 1
while (bytes * 8 < bitcount):
bytes *= bytes
typechr = [None, 'B', 'H', None, 'I', None, None, None, 'Q'][bytes]
self._rawfields.append(PmBitfieldInfo(typechr, bitfield, False))
bitfield = None
def _calc_fieldmap(self, obj):
"""Computes fieldmap and format
Returns a triple(format, : a struct format string
fielmap, : a list of Pm(Bit)FieldInfo
remaining : number of field that cannot ne computed due to a missing value
)
"""
d = obj.data
fmt = obj.heap.endianchr + "H" # Skip object descriptor
fieldmap = [None]
for i, f in enumerate(self._rawfields):
typechr = (f.type == 'P') and obj.heap.ptrchr or f.type
while (struct.calcsize(fmt) % struct.calcsize(typechr)!=0):
fmt += 'x'
if f.mul == False:
fmt += typechr
fieldmap.append(f)
else:
d[f.name] = []
if isinstance(f.mul, int):
fmt += typechr * f.mul
fieldmap += [f] * f.mul
elif f.mul == '*':
while struct.calcsize(fmt)<obj.size:
fmt += typechr
fieldmap.append(f)
elif f.mul.startswith('<'):
if f.mul[1:] not in d:
return (fmt, fieldmap, len(self._rawfields)-i)
while struct.calcsize(fmt + typechr) + obj.addr \
+ obj.heap.base < d[f.mul[1:]]:
# and struct.calcsize(fmt+typechr)<obj.size:
fmt += typechr
fieldmap += [f]
else:
if f.mul not in d:
return (fmt, fieldmap, len(self._rawfields)-i)
fmt += typechr * d[f.mul]
fieldmap += [f] * d[f.mul]
return (fmt, fieldmap, 0)
def PmObjectClass(dumpversion, features):
class PmObject(UserDict.UserDict):
"""A model of an object.
"""
PM_TYPES = (
PmTypeInfo('NON', ""),
PmTypeInfo("INT", "val:i"),
PmTypeInfo("FLT", "val:f"),
PmTypeInfo("STR", "len:H,"+
(features.USE_STRING_CACHE and "cache_next:P," or "") +
"val:B:len"),
PmTypeInfo("TUP", "len:H,items:P:len"),
PmTypeInfo("COB", "codeimg:P,names:P,consts:P,code:P"),
PmTypeInfo("MOD", "co:P,attrs:P,globals:P," +
(features.HAVE_DEFAULTARGS and "defaultargs:P," or "") +
(features.HAVE_CLOSURES and "closure:P," or "")),
PmTypeInfo("CLO", "attrs:P,bases:P"),
PmTypeInfo("FXN", "co:P,attrs:P,globals:P," +
(features.HAVE_DEFAULTARGS and "defaultargs:P," or "") +
(features.HAVE_CLOSURES and "closure:P" or "")),
PmTypeInfo("CLI", "class:P,attrs:P"),
PmTypeInfo("CIM", "data:B:*"),
PmTypeInfo("NIM", ""),
PmTypeInfo("NOB", "argcount:B,funcidx:H"),
PmTypeInfo("THR", "frame:P,interpctrl:I"),
PmTypeInfo("x", ""),
PmTypeInfo("BOL", "val:i"),
PmTypeInfo("CIO", "data:B:*"),
PmTypeInfo("MTH", "instance:P,func:P,attrs:P"),
PmTypeInfo("LST", "len:H,sgl:P"),
PmTypeInfo("DIC", "len:H,keys:P,vals:P"),
PmTypeInfo("x", ""),
PmTypeInfo("x", ""),
PmTypeInfo("x", ""),
PmTypeInfo("x", ""),
PmTypeInfo("x", ""),
PmTypeInfo("FRM", "back:P,func:P,memspace:B,ip:P,blockstack:P,"
"attrs:P,globals:P,sp:P,isImport:.," +
(features.HAVE_CLASSES and "isInit:.," or "") +
"locals:P:<sp"),
PmTypeInfo("BLK", "sp:P,handler:P,type:B,next:P"),
PmTypeInfo("SEG", "items:P:8,next:P"),
PmTypeInfo("SGL", "rootseg:P,lastseg:P,length:H"),
PmTypeInfo("SQI", "sequence:P,index:H"),
PmTypeInfo("NFM", "back:P,func:P,stack:P,active:B,numlocals:B,"
"locals:P:8"),
)
FREE_TYPE = PmTypeInfo("FRE", "prev:P,next:P")
def __init__(self, heap):
"""Initializes the object at the current file location
"""
UserDict.UserDict.__init__(self)
self.is_dotted = False
self.is_dotrev = False
self.heap = heap
self.fp = fp = self.heap.rawheap
self.addr = self.fp.tell()
od = unpack_fp(heap.endianchr + "H", fp, False)[0]
self.mark = (' ','M')[(od & 0x4000) == 0x4000]
self.free = (' ','F')[(od & 0x8000) == 0x8000]
if self.free == 'F':
self.size = (od & 0x3FFF) << 2
self.objtype = self.FREE_TYPE
else:
self.size = (od & 0x01FF) << 2
assert self.size > 0
self.typeindex = (od >> 9) & 0x1f
self.objtype = PmObject.PM_TYPES[self.typeindex]
if self.objtype.name == 'x':
raise Exception("unknown object type", self.typeindex)
self.type = self.objtype.name.lower()
self.parse()
self.fp.seek(self.addr + self.size)
def parse(self,):
"""Parses data at the current file location
"""
d = self.data
fmt, fieldmap, remaining = self.objtype._calc_fieldmap(self)
results = unpack_fp(fmt, self.fp, False)
# Store data in obj
for r, f in zip(results, fieldmap):
if f == None:
continue
elif isinstance(f, PmBitfieldInfo):
for bf in f.fields:
d[bf.name] = r & ((1 << bf.mul) - 1)
r = r >> bf.mul
elif f.mul == False:
d[f.name] = r
elif f.mul == 'lastv':
if r != 0:
d[f.name].append(r)
else:
break
else:
d[f.name].append(r)
if remaining:
if not self.objtype._calc_fieldmap(self)[2] < remaining:
raise Exception("Cannot compute %s field length"
% self.objtype._rawfields[-remaining].name)
# Retry if there are any remaining field
self.parse()
def __str__(self,):
d = self.data
result = []
result.append("%s %s %d %s%s: " % (
hex(self.addr+self.heap.base),
self.type,
self.size,
self.mark,
self.free,))
values = []
for f in self.objtype.fields:
typechr = (f.type in ['P', '.']) and "0x%x" or "%d"
if f.mul == False:
values.append(("%s=" + typechr) % (f.name, d[f.name]))
elif self.objtype.name == "STR" and f.name == "val":
values.append("val=%s"
% _ellipse("".join(map(chr, d['val'])), 30))
elif self.objtype.name == 'CIO' and f.name == 'data':
values.append("data=%s"
% _ellipse(repr("".join(map(chr, d['data']))),
30))
else:
values.append(
"%s=[%s]"
% (f.name, ", ".join([typechr % v for v in d[f.name]])))
result.append(", ".join(values))
return "".join(result)
def __repr__(self):
return "<0x%x %s %d>" \
% (self.addr + self.heap.base,
self.objtype.name.lower(),
self.size)
COLOR = ["aliceblue", "antiquewhite", "aqua", "aquamarine", "azure",
"beige", "bisque", "blanchedalmond", "blue",
"blueviolet", "brown", "burlywood", "cadetblue", "chartreuse",
"chocolate", "coral", "cornflowerblue", "cornsilk", "crimson",
"cyan", "darkblue", "darkcyan", "darkgoldenrod", "darkgray",
"darkgreen", "darkgrey", "darkkhaki", "darkmagenta",
"darkolivegreen", "darkorange", "darkorchid", "darkred",
"darksalmon", "darkseagreen", "darkslateblue", "darkslategray",
"darkslategrey", "darkturquoise", "darkviolet", "deeppink",
"deepskyblue", "dimgray", "dimgrey", "dodgerblue"]
def dotstring(self):
"""A DOT representation of the object
"""
if self.is_dotted:
return "" #blurp %x' % (self.addr+self.heap.base)
self.is_dotted = True
d = self.data
#mark dic vals segment for upside-down display
if self.type == 'dic' and d['len']>0:
seg = self.heap.data[d['vals']].data['rootseg']
while seg:
self.heap.data[seg].is_dotrev = True
seg = self.heap.data[seg].data['next']
result = []
result.append('"0x%x" [style=filled, fillcolor=%s, colorscheme=svg,'
' label="%s"];'
% (self.addr+self.heap.base,
self.COLOR[getattr(self, 'typeindex', 0)],
self._dot_label()))
if (self.type == "sgl" and d['rootseg'] in self.heap.data
and d['rootseg'] != d['lastseg']):
result.append("{ rank=same;")
seg = d['rootseg']
while seg in self.heap.data:
result.append(self.heap.data[seg].dotstring())
seg = self.heap.data[seg].data['next']
result.append('}')
return "\n".join(result)
def _dot_label(self):
"""Label for the dot node
"""
d = self.data
label = []
label.append('{')
label.append(_dot_escape(repr(self)))
values = []
for f in self.objtype.fields:
if f.type == 'P': continue
if self.objtype.name == "STR" and f.name == "val" \
and d['val'] != None:
values.append("val=%s"
% _dot_escape(_ellipse("".join(map(chr, d['val'])), 20))
)
elif self.objtype.name == "CIO" and f.name == "data":
values.append("data=%s"
% _dot_escape(_ellipse(repr(d['data']), 20)))
else:
values.append("%s=%s"
% (_dot_escape(f.name), _dot_escape(str(d[f.name]))))
if len(values):
label.append("|{")
label.append("|".join(values))
label.append("}")
pointers = []
for f in self.objtype.fields:
if f.type != 'P':
continue
if f.name == "cache_next" and d[f.name] in self.heap.data:
pointers.append("%s=%s"%(f.name, hex(d[f.name])))
else:
pointers.append("<%s> %s" % (f.name, f.name))
if len(pointers):
label.append('|{')
label.append("|".join(pointers))
label.append('}')
label.append('}')
return "".join(label)
def dotedges(self):
"""Edges (pointers) leaving this object
"""
d = self.data
result = []
for f in self.objtype.fields:
if f.type != 'P':
continue
if f.name == "cache_next":
continue
if f.mul == False:
if d[f.name] == 0:
continue
result.append(self._dotedge(f.name, d[f.name]))
else:
for i, m in enumerate(d[f.name]):
if m == 0:
continue
result.append(self._dotedge(f.name, m, str(i)))
if self.type == "dic" and d['len'] > 0:
i = 0
sgls = tuple(map(lambda p: self.heap.data[d[p]],
("keys", "vals")))
segs = tuple(map(lambda l: self.heap.data[l.data['rootseg']],
sgls))
iters = tuple(map(lambda s: iter(s.data['items']), segs))
while i < d['len']:
try:
key, val = tuple(map(next, iters))
result.append('"0x%x" -> "0x%x" '
'[style=dotted, weight=50];'%(key, val))
i += 1
except StopIteration:
segs = tuple(map(
lambda s: self.heap.data[s.data['next']] , segs))
iters = tuple(map(
lambda s: iter(s.data['items']), segs))
return "\n".join(result)
def _dotedge(self, name, value, label = None):
style = []
if label != None:
style.append("label=%s" % label)
if self.is_dotrev:
style.append("dir=back")
style = len(style) and (" [" + ", ".join(style) + "]") or ""
if self.is_dotrev:
return '"0x%x" -> "0x%x":%s%s;' \
% (value, self.addr+self.heap.base, name, style)
else:
return '"0x%x":%s -> "0x%x"%s;' \
% (self.addr+self.heap.base, name, value, style)
return PmObject
class PmHeap(UserDict.UserDict):
"""A model of the heap.
"""
FEATURES = ['USE_STRING_CACHE', 'HAVE_DEFAULTARGS', 'HAVE_CLOSURES',
'HAVE_CLASSES']
def __init__(self, fp):
"""Initializes the heap based on the given dump file.
"""
UserDict.UserDict.__init__(self)
self.is_parsed = False
self._sense_fmt(fp)
self.version, features, self.size, self.base = \
unpack_fp(self.endianchr + "2HI" + self.ptrchr, fp)
if self.version != 1:
raise Exception('Dump version %d not supported' % self.version)
self.features = \
type("pmFeatures",
(object,),
dict(zip(self.FEATURES, [False] * len(self.FEATURES))))()
f = 0
while(features):
setattr(self.features,
self.FEATURES[f],
features & 1 and True or False)
f = f + 1
features = features >> 1
self.rawheap = StringIO.StringIO(fp.read(self.size))
num_roots = unpack_fp("I", fp)[0]
roots = {}
(roots['None'],
roots['False'],
roots['True'],
roots['Zero'],
roots['One'],
roots['NegOne'],
roots['CodeStr'],
roots['Builtins'],
roots['NativeFrame'],
roots['ThreadList']) = \
unpack_fp(self.endianchr + (self.ptrchr * num_roots), fp)
self.roots = roots
self.PmObjectClass = PmObjectClass(self.version, self.features)
fp.close()
def _sense_fmt(self, fp):
"""Senses pmdump format (endianess, pointer size)
depending on the first 8 bytes
"""
magic = fp.read(6)
if magic == "PMDUMP":
self.endianess = "little"
self.endianchr = '<'
elif magic == "PMUDMP":
self.endianess = "big"
self.endianchr = '>'
else:
raise Exception("Not a PMDUMP format")
self.ptrsize = unpack_fp(self.endianchr+"H", fp)[0]
self.ptrchr = [None, 'B', 'H', None, 'I',
None, None, None, 'Q'][self.ptrsize]
if self.ptrchr == None:
raise Exception('invalid pointer size')
def parse_heap(self,):
"""Parses the heap into a dict of key=address, value=object items
"""
self.rawheap.seek(0)
while self.rawheap.tell() < self.size:
addr = self.rawheap.tell() + self.base
self.data[addr] = self.PmObjectClass(self)
self.is_parsed = True
def __getitem__(self, indx):
"""Returns the object at the given address
or the string of bytes defined by the slice.
"""
# Return the object at the given address
if type(indx) == types.IntType:
if is_parsed:
return self.data[indx]
else:
self.rawheap.seek(indx)
return self.PmObjectClass(self)
# Return the string of bytes defined by the slice
elif type(indx) == types.SliceType:
return self.rawheap[indx.start - self.base : indx.stop - self.base]
else:
assert False, "Bad type to heap[%s]" % type(indx)
def __str__(self):
d = self.data
obj = filter(lambda o: o.type != "fre", self.data.values())
free = filter(lambda o: o.type == "fre", self.data.values())
result = []
result.append("dump : version=%d, ptr=%dbytes, %s-endian, features=%s"
% (self.version, self.ptrsize, self.endianess, self.features))
result.append("roots : "
+ ", ".join(map(lambda kv: "%s=0x%x" % kv, self.roots.iteritems())))
result.append("heap : size=%d, base=%x" % (self.size, self.base))
result.append("summary : %d bytes in %d objects, %d free bytes" %
(sum([o.size for o in obj]),
len(obj),
sum([o.size for o in free])))
for o in sorted(d.values(), key=lambda o: o.addr):
result.append(str(o))
result.append('')
return "\n".join(result)
def dotstring(self):
"""A DOT representation of the heap
"""
d = self.data
result = []
result.append("digraph pmheapdump {")
#result.append("concentrate=true;")
result.append('{ rank=same;')
for r, m in self.roots.iteritems():
result.append("%s;" % r)
result.append('}')
result.append("{ node [shape=record];")
for o in sorted(d.values(), key=lambda o: o.addr):
result.append(o.dotstring())
result.append("}")
for r, m in self.roots.iteritems():
result.append('%s -> "0x%x";' % (r, m))
for o in sorted(d.values(), key=lambda o: o.addr):
result.append(o.dotedges())
result.append("}")
result.append('')
return "\n".join(filter(len, result))
def main():
from optparse import OptionParser
parser = OptionParser(usage="usage: %prog [options] [dumpfile [output]]")
parser.add_option("-f", "--format",
dest="format", default='list', choices=['list', 'dot'],
help="output format: list or dot [default: %default]")
(options, args) = parser.parse_args()
if len(args) == 0:
fp = open(os.path.join(os.path.curdir, "pmheapdump00.bin"), 'rb')
out = sys.stdout
elif len(args) == 1:
fp = open(args[0], 'rb')
out = sys.stdout
elif len(args) == 2:
fp = open(args[0], 'rb')
out = open(args[1], 'w')
else:
print "too many arguments"
parser.print_help()
sys.exit()
heap0 = PmHeap(fp)
heap0.parse_heap()
if options.format == 'list':
out.write(str(heap0))
elif options.format == 'dot':
out.write(heap0.dotstring())
if __name__ == "__main__":
main()