Source code for plugin_erg_importer

# Copyright (c) 2013, System Engineering Software Society
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#     * Redistributions of source code must retain the above copyright
#       notice, this list of conditions and the following disclaimer.
#     * Redistributions in binary form must reproduce the above copyright
#       notice, this list of conditions and the following disclaimer in the
#       documentation and/or other materials provided with the distribution.
#     * Neither the name of the System Engineering Software Society nor the
#       names of its contributors may be used to endorse or promote products
#       derived from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED.
# IN NO EVENT SHALL SYSTEM ENGINEERING SOFTWARE SOCIETY BE LIABLE FOR ANY
# DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import re
import os.path
import struct
import array
import datetime

import numpy as np

from sympathy.api import importers
from sympathy.api.exceptions import SyDataError


TYPES = {
        'Double': 'd',
        'Float': 'f',
        'LongLong': 'q',
        'ULongLong': 'Q',
        'Long': 'l',
        'ULong': 'L',
        'Int': 'i',
        'UInt': 'I',
        'Short': 'h',
        'UShort': 'H',
        'Char': 'b',
        'UChar': 'B'
        }


def erg_info(filename):
    """
    Return a tuple of two dictionaries.
    The first containing a structure where the 'File' and 'Quantity' structures
    have been separated into two keys. The remaning keys are as found in the
    file.
    The second containing the keys directly from the file and their value.
    """
    items = []
    props = {}

    with open(filename) as f:
        for line in f:
            try:
                key, value = line.strip().split('=', 1)
            except ValueError:
                pass
            else:
                key = key.strip()
                value = value.strip()
                items.append((key, value))

                swf = key.startswith('File.')
                swq = key.startswith('Quantity.')

                if swf or swq:

                    if swf:
                        parts = key.split('.')
                    if swq:
                        parts = key.split('.')
                        parts = (parts[:1] + ['.'.join(parts[1:-1])] +
                                 parts[-1:])
                    if parts:
                        prev = props

                        for part in parts[:-1]:
                            prev = prev.setdefault(part, {})

                        prev[parts[-1]] = value
                else:
                    props[key] = value
        return (props, dict(items))


def erg_parse_header(f):
    header_format = '8sBBH4x'
    header = f.read(struct.calcsize(header_format))
    id_string, erg_version, endianness, record_length = struct.unpack(header_format, header)
    if id_string != "CM-ERG\0\0":
        raise SyDataError('Not a valid type 2 ERG file.')
    if erg_version != 1:
        raise SyDataError('Version {} of type 2 ERG file format is not supported.'.format(erg_version))
    if endianness != 0:
        raise SyDataError('Big endian support is not implemented.')
    return record_length


def erg_read_data(f, rec_len, props):
    column_names = []
    column_types = []
    record_format = ''

    i = 1
    while True:
        if str(i) not in props['File']['At']:
            break
        colname = props['File']['At'][str(i)]['Name']
        coltype = props['File']['At'][str(i)]['Type']
        bytes_match = re.match(r'(\d+) Bytes', coltype)
        if bytes_match:
            record_format += bytes_match.group(1) + 'x'
        else:
            column_names.append(colname)
            column_types.append(TYPES[coltype])
            record_format += TYPES[coltype]
        i += 1

    s = struct.Struct(record_format)
    if rec_len != s.size:
        raise SyDataError('Found {} records with total length {}, but record '
                          'length is reported to be {}.'.format(
                              len(column_names), s.size, rec_len))

    columns = [array.array(t) for t in column_types]
    while True:
        t = f.read(rec_len)
        if len(t) < rec_len:
            break
        record = s.unpack_from(t)
        for a, v in zip(columns, record):
            a.append(v)
    return columns, column_names


def get_factor_and_offset(name, props):
    try:
        k = props['Quantity'][name]['Factor']
    except KeyError:
        k = 1
    try:
        b = props['Quantity'][name]['Offset']
    except KeyError:
        b = 0
    return k, b


def get_unit(name, props):
    try:
        return props['Quantity'][name]['Unit']
    except KeyError:
        return ''


[docs]class DataImportERG(importers.ADAFDataImporterBase): IMPORTER_NAME = "CM-ERG" def valid_for_file(self): if self._fq_infilename is None or not os.path.isfile( self._fq_infilename): return False with open(self._fq_infilename, 'rb') as f: try: erg_parse_header(f) except: return False else: return True def import_data(self, out_datafile, parameters=None, progress=None): erg_filename = self._fq_infilename info_filename = erg_filename + '.info' if not os.path.exists(info_filename): info_filename = os.path.splitext(erg_filename)[0] + '.info' if not os.path.exists(info_filename): raise SyDataError("Can't find infofile.") props, items = erg_info(info_filename) if props['File']['Format'] == 'FORTRAN_Binary_Data': raise SyDataError('Only type 2 erg files are supported. Not type 1.') elif props['File']['Format'] != 'erg': raise SyDataError('Invalid erg infofile.') if props['File']['ByteOrder'] != 'LittleEndian': raise SyDataError('Big endian support is not implemented.') time = datetime.datetime.fromtimestamp(int(props['File']['DateInSeconds'])) with open(erg_filename, 'rb') as f: rec_len = erg_parse_header(f) columns, column_names = erg_read_data(f, rec_len, props) out_datafile.meta.create_column('Datetime', np.array([time]), {}) system = out_datafile.sys.create('CM-ERG') raster = system.create('raster') unit = get_unit(column_names[0], props) raster.create_basis(np.array(columns[0]), dict(unit=unit)) for name, signal in zip(column_names[1:], columns[1:]): unit = get_unit(name, props) k, b = get_factor_and_offset(name, props) signal = np.array(signal) * k + b raster.create_signal(name, signal, dict(unit=unit))