"""
Functions for reading FIB-SEM data from Harald Hess' proprietary format
Adapted from https://github.com/janelia-cosem/FIB-SEM-Aligner/blob/master/fibsem.py
Copyright (c) 2017, David Hoffman, Davis Bennett
"""
import os
import numpy as np
from typing import Iterable, Union, Sequence
from pathlib import Path
import dask.array as da
from xarray import DataArray
Pathlike = Union[str, Path]
# This value is used to ensure that the endianness of the data is correct
MAGIC_NUMBER = 3555587570
# This is the size of the header, in bytes. Everything beyond this number of bytes is data.
OFFSET = 1024
class FIBSEMHeader(object):
"""Structure to hold header info"""
def __init__(self, **kwargs):
self.__dict__ = kwargs
def update(self, **kwargs):
"""update internal dictionary"""
self.__dict__.update(kwargs)
def to_native_types(self):
for k, v in self.__dict__.items():
if isinstance(v, np.integer):
self.__dict__[k] = int(v)
elif isinstance(v, np.bytes_):
self.__dict__[k] = v.tostring().decode("utf-8")
elif isinstance(v, np.ndarray):
self.__dict__[k] = v.tolist()
elif isinstance(v, np.floating):
self.__dict__[k] = float(v)
else:
self.__dict__[k] = v
class FIBSEMData(np.ndarray):
"""Subclass of ndarray to attach header data to fibsem data"""
def __new__(cls, input_array, header=None):
# Input array is an already formed ndarray instance
# We first cast to be our class type
obj = np.asarray(input_array).view(cls)
# add the new attribute to the created instance
obj.header = header
# Finally, we must return the newly created object:
return obj
def __array_finalize__(self, obj):
# see InfoArray.__array_finalize__ for comments
if obj is None:
return
self.header = getattr(obj, "header", None)
class _DTypeDict(object):
"""Handle dtype dict manipulations"""
def __init__(self, names=None, formats=None, offsets=None):
# initialize internals to empty lists
self.names = []
self.formats = []
self.offsets = []
if names is not None:
self.update(names, formats, offsets)
def update(self, names, formats, offsets):
""""""
if isinstance(names, list):
if len(names) == len(formats) == len(offsets):
self.names.extend(names)
self.formats.extend(formats)
self.offsets.extend(offsets)
else:
raise RuntimeError("Lengths are not equal")
else:
self.names.append(names)
self.formats.append(formats)
self.offsets.append(offsets)
@property
def dict(self):
"""Return the dict representation"""
return dict(names=self.names, formats=self.formats, offsets=self.offsets)
@property
def dtype(self):
"""return the dtype"""
return np.dtype(self.dict)
def _read_header(path):
# make emtpy header to fill
header_dtype = _DTypeDict()
header_dtype.update(
[
"FileMagicNum", # Read in magic number, should be 3555587570
"FileVersion", # Read in file version number
"FileType", # Read in file type, 1 is Zeiss Neon detectors
"SWdate", # Read in SW date
"TimeStep", # Read in AI sampling time (including oversampling) in seconds
"ChanNum", # Read in number of channels
"EightBit", # Read in 8-bit data switch
],
[">u4", ">u2", ">u2", ">S10", ">f8", ">u1", ">u1"],
[0, 4, 6, 8, 24, 32, 33],
)
# read initial header
with open(path, mode="rb") as fobj:
base_header = np.fromfile(fobj, dtype=header_dtype.dtype, count=1)
if len(base_header) == 0:
raise RuntimeError(
f"Base header is missing for {fobj.name}"
)
fibsem_header = FIBSEMHeader(
**dict(zip(base_header.dtype.names, base_header[0]))
)
# now fobj is at position 34, return to 0
fobj.seek(0, os.SEEK_SET)
if fibsem_header.FileMagicNum != MAGIC_NUMBER:
raise RuntimeError(
f"FileMagicNum should be {MAGIC_NUMBER} but is {fibsem_header.FileMagicNum}"
)
_scaling_offset = 36
if fibsem_header.FileVersion == 1:
header_dtype.update(
"Scaling", (">f8", (fibsem_header.ChanNum, 4)), _scaling_offset
)
elif fibsem_header.FileVersion in {2, 3, 4, 5, 6}:
header_dtype.update(
"Scaling", (">f4", (fibsem_header.ChanNum, 4)), _scaling_offset
)
else:
# Read in AI channel scaling factors, (col#: AI#), (row#: offset, gain, 2nd order, 3rd order)
header_dtype.update("Scaling", (">f4", (2, 4)), _scaling_offset)
if fibsem_header.FileVersion >= 9:
header_dtype.update(
["Restart", "StageMove", "FirstX", "FirstY"],
[">u1", ">u1", ">i4", ">i4"],
[68, 69, 70, 74],
)
header_dtype.update(
["XResolution", "YResolution"], # X Resolution # Y Resolution
[">u4", ">u4"],
[100, 104],
)
if fibsem_header.FileVersion in {1, 2, 3}:
header_dtype.update(
["Oversampling", "AIDelay"], # AI oversampling # Read AI delay (
[">u1", ">i2"],
[108, 109],
)
else:
header_dtype.update("Oversampling", ">u2", 108) # AI oversampling
header_dtype.update("ZeissScanSpeed", ">u1", 111) # Scan speed (Zeiss #)
if fibsem_header.FileVersion in {1, 2, 3}:
header_dtype.update(
[
"ScanRate", # Actual AO (scanning) rate
"FramelineRampdownRatio", # Frameline rampdown ratio
"Xmin", # X coil minimum voltage
"Xmax", # X coil maximum voltage
],
[">f8", ">f8", ">f8", ">f8"],
[112, 120, 128, 136],
)
# fibsem_header.Detmin = -10 # Detector minimum voltage
# fibsem_header.Detmax = 10 # Detector maximum voltage
else:
header_dtype.update(
[
"ScanRate", # Actual AO (scanning) rate
"FramelineRampdownRatio", # Frameline rampdown ratio
"Xmin", # X coil minimum voltage
"Xmax", # X coil maximum voltage
"Detmin", # Detector minimum voltage
"Detmax", # Detector maximum voltage
"DecimatingFactor", # Decimating factor
],
[">f4", ">f4", ">f4", ">f4", ">f4", ">f4", ">u2"],
[112, 116, 120, 124, 128, 132, 136],
)
header_dtype.update(
[
"AI1", # AI Ch1
"AI2", # AI Ch2
"AI3", # AI Ch3
"AI4", # AI Ch4
],
[">u1", ">u1", ">u1", ">u1"],
[151, 152, 153, 154],
)
if fibsem_header.FileVersion >= 9:
header_dtype.update(
["SampleID"],
["S25"],
[155],
)
header_dtype.update(
["Notes"],
[">S200"],
[180],
)
if fibsem_header.FileVersion in {1, 2}:
header_dtype.update(
[
"DetA", # Name of detector A
"DetB", # Name of detector B
"DetC", # Name of detector C
"DetD", # Name of detector D
"Mag", # Magnification