Source code for sylib.library.plugins.data.json.importers.plugin_base_json_importers

# This file is part of Sympathy for Data.
# Copyright (c) 2018, Combine Control Systems AB
#
# Sympathy for Data is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3 of the License.
#
# Sympathy for Data is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Sympathy for Data.  If not, see <http://www.gnu.org/licenses/>.
import os
import io
import json
from lxml import etree
from collections import defaultdict
from sympathy.api import importers
from sympathy.api import qt2 as qt_compat

QtWidgets = qt_compat.import_module('QtWidgets')


def _sniff_type(filename):
    """
    Guess if it is a JSON or XML file by looking
    at the file extension or the first line
    """
    ext = os.path.splitext(filename)[1].lower()
    if ext in [".json", ".xml"]:
        return ext[1:]

    with open(filename, "rb") as f:
        for line in f:
            line = line.lstrip()
            if line:
                if line.startswith(b"<"):
                    return "xml"
                else:
                    return "json"


def _add_nonempty_text(children, text):
    if text:
        text = text.strip()
        if text:
            children.append({'#text': text})


def _get_nsname(reverse_nsmap, obj):
    qname = etree.QName(obj)
    name = qname.localname
    namespace = qname.namespace
    if namespace and namespace in reverse_nsmap:
        ns_name = reverse_nsmap[namespace]
        if ns_name:
            name = f'{ns_name}:{name}'
    return name


def _get_attrname(nsname):
    res = '@xmlns'
    if nsname:
        res = f'@xmlns:{nsname}'
    return res


def lxml_element_to_json(element, nsmap):
    """
    Convert XML to JSON.

    - Text and tail are children with name #text.
    - Attributes are children prefixed by @.
    - Namespace declarations are treated as attributes.

    Elements without children are omitted, ones with
    1 child produces a dict, and ones with several a
    list.
    """
    children = []
    reverse_nsmap = {v: k for k, v in nsmap.items()}
    tag_key = _get_nsname(reverse_nsmap, element)
    local_nsmap = element.nsmap
    res = {tag_key: None}

    for k, v in local_nsmap.items():
        if k not in nsmap or nsmap[k] != local_nsmap[k]:
            children.append({_get_attrname(k): v})

    for k, v in element.attrib.items():
        attr_name = f'@{_get_nsname(reverse_nsmap, k)}'
        children.append({attr_name: v})

    _add_nonempty_text(children, element.text)

    for child in element:
        children.append(lxml_element_to_json(child, local_nsmap))
        _add_nonempty_text(children, child.tail)

    len_children = len(children)
    if len_children == 1 and '#text' in children[0]:
        res[tag_key] = list(children[0].values())[0]
    elif len_children >= 1:
        data = {}
        res[tag_key] = data
        counts = defaultdict(int)
        for child in children:
            for k, v in child.items():
                counts[k] += 1

        for k, count in counts.items():
            if count > 1:
                data[k] = []

        for child in children:
            for k, v in child.items():
                count = counts[k]
                if count > 1:
                    data[k].append(v)
                else:
                    data[k] = v
    return res


def lxml_file_to_json(f):
    etree_parse = etree.parse(f)
    root = etree_parse.getroot()
    return lxml_element_to_json(root, {})


def import_data(obj, filename, filetype):
    """
    Load a Json structure from a datasource or a filepath

    :param datasource: the datasource or the filepath to load the Json from
    :param filetype: can be either ``json`` or ``xml`` and determines what type
                     of file to load
    """
    filetype = filetype.lower()
    with io.open(filename, "rb") as f:
        if filetype == "json":
            _dict = json.load(f)
        elif filetype == "xml":
            _dict = lxml_file_to_json(f)
        else:
            assert False, 'Unknown filetype'
    obj.set(_dict)


[docs]class DataImportAuto(importers.AutoImporterMixin, importers.JsonDataImporterBase): IMPORTER_NAME = "Auto" _IMPORTER_BASE = importers.JsonDataImporterBase
[docs]class DataImportXml(importers.JsonDataImporterBase): IMPORTER_NAME = "XML" def valid_for_file(self): try: return _sniff_type(self._fq_infilename) == 'xml' except Exception: return False def parameter_view(self, parameters): if not self.valid_for_file(): return QtWidgets.QLabel( 'File does not exist or cannot be read.') return QtWidgets.QLabel() def import_data(self, out_datafile, parameters=None, progress=None): try: import_data(out_datafile, self._fq_infilename, filetype='xml') except Exception as e: raise self.import_failed(e)
[docs]class DataImportJson(importers.JsonDataImporterBase): IMPORTER_NAME = "JSON" def valid_for_file(self): try: return _sniff_type(self._fq_infilename) == 'json' except Exception: return False def parameter_view(self, parameters): if not self.valid_for_file(): return QtWidgets.QLabel( 'File does not exist or cannot be read.') return QtWidgets.QLabel() def import_data(self, out_datafile, parameters=None, progress=None): try: import_data(out_datafile, self._fq_infilename, filetype='json') except Exception as e: raise self.import_failed(e)