Source code for sylib.library.plugins.data.adaf.importers.plugin_erg_importer

# This file is part of Sympathy for Data.
# Copyright (c) 2013, Combine Control Systems AB
#
# Sympathy for Data is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3 of the License.
#
# Sympathy for Data is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Sympathy for Data.  If not, see <http://www.gnu.org/licenses/>.
import re
import os.path
import struct
import array
import datetime

import numpy as np

from sympathy.api import importers
from sympathy.api.exceptions import SyDataError


TYPES = {
        'Double': 'd',
        'Float': 'f',
        'LongLong': 'q',
        'ULongLong': 'Q',
        'Long': 'l',
        'ULong': 'L',
        'Int': 'i',
        'UInt': 'I',
        'Short': 'h',
        'UShort': 'H',
        'Char': 'b',
        'UChar': 'B'
        }


def erg_info(filename):
    """
    Return a tuple of two dictionaries.
    The first containing a structure where the 'File' and 'Quantity' structures
    have been separated into two keys. The remaning keys are as found in the
    file.
    The second containing the keys directly from the file and their value.
    """
    items = []
    props = {}

    with open(filename) as f:
        for line in f:
            try:
                key, value = line.strip().split('=', 1)
            except ValueError:
                pass
            else:
                key = key.strip()
                value = value.strip()
                items.append((key, value))

                swf = key.startswith('File.')
                swq = key.startswith('Quantity.')

                if swf or swq:

                    if swf:
                        parts = key.split('.')
                    if swq:
                        parts = key.split('.')
                        parts = (parts[:1] + ['.'.join(parts[1:-1])] +
                                 parts[-1:])
                    if parts:
                        prev = props

                        for part in parts[:-1]:
                            prev = prev.setdefault(part, {})

                        prev[parts[-1]] = value
                else:
                    props[key] = value
        return (props, dict(items))


def erg_parse_header(f):
    header_format = '8sBBH4x'
    header = f.read(struct.calcsize(header_format))
    id_string, erg_version, endianness, record_length = struct.unpack(
        header_format, header)
    if id_string != b"CM-ERG\0\0":
        raise SyDataError('Not a valid type 2 ERG file.')
    if erg_version != 1:
        raise SyDataError(
            'Version {} of type 2 ERG file format is not supported.'.format(
                erg_version))
    if endianness != 0:
        raise SyDataError('Big endian support is not implemented.')
    return record_length


def erg_read_data(f, rec_len, props):
    column_names = []
    column_types = []
    record_format = ''

    i = 1
    while True:
        if str(i) not in props['File']['At']:
            break
        colname = props['File']['At'][str(i)]['Name']
        coltype = props['File']['At'][str(i)]['Type']
        bytes_match = re.match(r'(\d+) Bytes', coltype)
        if bytes_match:
            record_format += bytes_match.group(1) + 'x'
        else:
            column_names.append(colname)
            column_types.append(TYPES[coltype])
            record_format += TYPES[coltype]
        i += 1

    s = struct.Struct(record_format)
    if rec_len != s.size:
        raise SyDataError('Found {} records with total length {}, but record '
                          'length is reported to be {}.'.format(
                              len(column_names), s.size, rec_len))

    columns = [array.array(t) for t in column_types]
    while True:
        t = f.read(rec_len)
        if len(t) < rec_len:
            break
        record = s.unpack_from(t)
        for a, v in zip(columns, record):
            a.append(v)
    return columns, column_names


def get_factor_and_offset(name, props):
    try:
        k = props['Quantity'][name]['Factor']
    except KeyError:
        k = 1
    try:
        b = props['Quantity'][name]['Offset']
    except KeyError:
        b = 0
    return k, b


def get_unit(name, props):
    try:
        return props['Quantity'][name]['Unit']
    except KeyError:
        return ''


[docs]class DataImportERG(importers.ADAFDataImporterBase): IMPORTER_NAME = "CM-ERG" def valid_for_file(self): if self._fq_infilename is None or not os.path.isfile( self._fq_infilename): return False with open(self._fq_infilename, 'rb') as f: try: erg_parse_header(f) except Exception: return False else: return True def import_data(self, out_datafile, parameters=None, progress=None): erg_filename = self._fq_infilename info_filename = erg_filename + '.info' if not os.path.exists(info_filename): info_filename = os.path.splitext(erg_filename)[0] + '.info' if not os.path.exists(info_filename): raise SyDataError("Can't find infofile.") props, items = erg_info(info_filename) if props['File']['Format'] == 'FORTRAN_Binary_Data': raise SyDataError( 'Only type 2 erg files are supported. Not type 1.') elif props['File']['Format'] != 'erg': raise SyDataError('Invalid erg infofile.') if props['File']['ByteOrder'] != 'LittleEndian': raise SyDataError('Big endian support is not implemented.') time = datetime.datetime.fromtimestamp( int(props['File']['DateInSeconds'])) with open(erg_filename, 'rb') as f: rec_len = erg_parse_header(f) columns, column_names = erg_read_data(f, rec_len, props) out_datafile.meta.create_column('Datetime', np.array([time]), {}) system = out_datafile.sys.create('CM-ERG') raster = system.create('raster') unit = get_unit(column_names[0], props) raster.create_basis(np.array(columns[0]), dict(unit=unit)) for name, signal in zip(column_names[1:], columns[1:]): unit = get_unit(name, props) k, b = get_factor_and_offset(name, props) signal = np.array(signal) * k + b raster.create_signal(name, signal, dict(unit=unit))