# This file is part of Sympathy for Data.
# Copyright (c) 2020 Combine Control Systems
#
# SYMPATHY FOR DATA COMMERCIAL LICENSE
# You should have received a link to the License with Sympathy for Data.
import json
import ast
import enum
import contextlib
import datetime
import bson
import bson.json_util
from bson.binary import Binary
from bson.dbref import DBRef
from bson.decimal128 import Decimal128
from bson.int64 import Int64
from bson.max_key import MaxKey
from bson.min_key import MinKey
from bson.objectid import ObjectId
from bson.regex import Regex
from bson.timestamp import Timestamp
import pymongo.errors
from sympathy.api import node
from sympathy.api.nodeconfig import Port, Ports, Tag, Tags
from sympathy.api.exceptions import SyDataError, SyConfigurationError
from sympathy.utils import parameters as utils_parameters
from sylib_database import mongodb
_legacy_url_str = 'connection_url'
_default_resource = 'mongodb://localhost:27017'
@contextlib.contextmanager
def db_connected(node, mongodb):
with mongodb.connected(node) as db:
yield db
def literal_bson(raw, msg):
"""
Here is an example that shows construction of different types:
{
"bytes": b"bytes",
"binary": Binary(b"bytes"),
"decimal": Decimal128("1.1"),
"string": "string",
"int": 1,
"objectid": ObjectId("62cff09ed08b15fac7f618c2"),
"int64": Int64(64),
"bool": False,
"list": [1, 2, 3],
"dict": {},
"dbref": DBRef("collection-name",
ObjectId("62cff09ed08b15fac7f618c2")),
"minkey": MinKey(),
"maxkey": MaxKey(),
"regex": Regex("^.$"),
"timestamp": Timestamp(100, 1),
"datetime": datetime(2019, 10, 11),
"isodate": ISODate("2019-10-11")
}
"""
try:
context = {
'Binary': Binary,
'DBRef': DBRef,
'Decimal128': Decimal128,
'ISODate': ISODate,
'Int64': Int64,
'MaxKey': MaxKey,
'MinKey': MinKey,
'ObjectId': ObjectId,
'Regex': Regex,
'Timestamp': Timestamp,
'datetime': datetime.datetime,
}
json_data = illiteral_eval(raw, context)
return to_bson(json_data)
except Exception as exc:
raise SyConfigurationError(msg) from exc
def set_collection_parameter(parameters):
parameters.set_string(
'collection',
label='Collection',
description='Collection name (required)',
value='')
def set_filter_parameter(parameters):
parameters.set_string(
'filter',
label='Filter',
description='Filter as json.',
value='{}',
editor=node.editors.code_editor())
def set_update_parameter(parameters):
parameters.set_string(
'update',
label='Update',
description='Update as json',
value='{}',
editor=node.editors.code_editor())
def set_replacement_parameter(parameters):
parameters.set_string(
'replacement',
label='Replacement',
description='Replacement as json.',
value='{}',
editor=node.editors.code_editor())
def set_upsert_parameter(parameters):
parameters.set_boolean(
'upsert',
label='Upsert',
description=(
'If True, perform an insert if no documents match the filter.'),
value=False)
def set_operation_parameter(parameters, desc, operation, operations):
parameters.set_string(
'operation', value=operation.value,
description='Operation used for updating.',
label='Operation',
editor=node.editors.combo_editor(
options=[w.value for w in operations]))
def set_connection_url_parameter(parameters):
parameters.set_string(
_legacy_url_str,
label='Connection URL',
description='Connection URL',
value='mongodb://localhost:27017/')
def set_database_parameter(parameters):
parameters.set_string(
'database',
label='Database',
description='Database name (required)',
value='')
def set_projection_parameter(parameters):
parameters.set_string(
'projection',
label='Projection',
description='Projection as json.',
value='',
editor=node.editors.code_editor())
def set_limit_parameter(parameters):
parameters.set_integer(
'limit',
label='Limit',
description=('Limit the maximum number of output documents.\n'
'(0=unlimited)'),
value=0)
def set_ordered_parameter(parameters):
parameters.set_boolean(
'ordered',
label='Ordered',
description=(
'If True, perform operations in provided order. '
'If False, perform operations in arbitrary order, possibly '
'in parallel.'),
value=True)
def port_field_exclusive_controller(name, field_names=None, state='disabled'):
if field_names is None:
field_names = [name]
return node.controller(
when=node.port(name, state='exists', value=True),
action=[node.field(field_name, state=state) for
field_name in field_names])
def opt_n(opt):
n = None
if opt == 'in':
n = (0, 1, 0)
elif opt == 'out':
n = (0, 1, 1)
return n
def json_port(desc, name, opt=None):
n = opt_n(opt)
return Port.Custom('json', desc, name=name, n=n)
_mongodb_port_name = 'mongodb'
def mongodb_port(opt=None):
n = opt_n(opt)
return mongodb.MongoDBPort('MongoDB', name=_mongodb_port_name, n=n)
def propagate_db(input, output):
if output:
output[0].set(input.get())
def input_or_param(name, inputs, parameters):
group = inputs.group(name)
value = None
if group:
value = to_bson(group[0].get())
else:
parameter = parameters[name]
bsvalue = parameter.value
if bsvalue:
value = literal_bson(
bsvalue,
f'{name.title()} must be a json compatible python literal!')
return value
def write_operation_port():
return json_port(
('Write operation. Output write operation instead of performing the '
'operation directly. The write operation can be used as an element '
'in a bulk write input list'),
'write_operation', opt='in')
def _bson_loads(data):
return bson.json_util.loads(
data,
json_options=bson.json_util.RELAXED_JSON_OPTIONS)
def _bson_dumps(data):
return bson.json_util.dumps(
data,
json_options=bson.json_util.RELAXED_JSON_OPTIONS)
def to_bson(data):
return _bson_loads(_bson_dumps(data))
def to_json(data):
return json.loads(_bson_dumps(data))
def elide(string: str, limit: int) -> str:
"""
Return `string` or a elided version in case it is longer than
`limit` characters, append "..." in case the string was elided.
>>> elide("abcde", 4)
"a..."
"""
len_string = len(string)
if len_string > limit:
string = string[:limit - 3] + '...'
return string
def illiteral_eval_expression(expression: ast.Expression, context=None):
"""
Non-literal evaluation with expands the behavior of literal evaluation
with a context of names which can be evaluated. This is a limited eval
which can be considered safe so long as the functions are.
Name expressions are supported if the name is in the context and evaluate
to the corresponding value. Similarly, Call expressions are supported if
the name is in the context and evaluate to the result of calling the named
function on the evaluated arguments, keywords are not supported.
"""
def inner(value):
res = None
if isinstance(value, ast.Constant):
res = value.value
elif isinstance(value, ast.Dict):
res = {
inner(k): inner(v)
for k, v in zip(value.keys, value.values)}
elif isinstance(value, ast.List):
res = [inner(v) for v in value.elts]
elif isinstance(value, ast.Tuple):
res = tuple(inner(v) for v in value.elts)
elif isinstance(value, ast.Call):
name = value.func.id
if name in context:
res = context[name](*[inner(v) for v in value.args])
else:
raise ValueError(
f'Type: {type(value).__name__} is '
f'only supported for function names, "{name}" '
f'is not in context')
elif isinstance(value, ast.Name):
name = value.id
if name in context:
res = context[name]
else:
raise ValueError(
f'Type: {type(value).__name__} is '
f'only supported for variable names, "{name}" is not in '
f'context')
else:
raise ValueError(
f'Unsupported type: {type(value).__name__}, '
f'"{elide(ast.unparse(value), 256)}"')
return res
if not isinstance(expression, ast.Expression):
raise ValueError('Use exactly one expression')
body = expression.body
context = context or {}
return inner(body)
def illiteral_eval(string: str, context: dict = None):
"""
Compiles the string and calls illiteral_eval_expression.
"""
expression = compile(
string, '<string>', 'eval', ast.PyCF_ONLY_AST)
return illiteral_eval_expression(expression, context)
def ISODate(iso_datetime: str) -> datetime:
"""
>>> MongoFunctions().ISODate("2014-07-03T00:00:00Z")
datetime.datetime(2014, 7, 3, 0, 0, tzinfo=datetime.timezone.utc)
"""
if iso_datetime:
last = iso_datetime[-1]
if last.lower() == 'z':
iso_datetime = f'{iso_datetime[:-1]}+00:00'
return datetime.datetime.fromisoformat(iso_datetime)
class WriteOperations(enum.Enum):
DeleteOne = 'Delete one'
DeleteMany = 'Delete many'
InsertOne = 'Insert one'
UpdateOne = 'Update one'
UpdateMany = 'Update many'
ReplaceOne = 'Replace one'
write_operations = {
WriteOperations.DeleteOne:
(mongodb.File.delete_one, pymongo.operations.DeleteOne),
WriteOperations.DeleteMany:
(mongodb.File.delete_many, pymongo.operations.DeleteMany),
WriteOperations.InsertOne:
(mongodb.File.insert_one, pymongo.operations.InsertOne),
WriteOperations.UpdateOne:
(mongodb.File.update_one, pymongo.operations.UpdateOne),
WriteOperations.UpdateMany:
(mongodb.File.update_many, pymongo.operations.UpdateMany),
WriteOperations.ReplaceOne:
(mongodb.File.replace_one, pymongo.operations.ReplaceOne),
}
write_operations_lookup = {
w.value: w for w in write_operations
}
def write_method(write_operation):
return write_operations[write_operation][0]
def store_write_operation(write_operation, kwargs):
return {write_operation.name: kwargs}
def update_one_kwargs(filter, update, upsert):
return {'filter': filter, 'update': update, 'upsert': upsert}
def update_many_kwargs(filter, update, upsert):
return {'filter': filter, 'update': update, 'upsert': upsert}
def replace_one_kwargs(filter, replacement, upsert):
return {'filter': filter, 'replacement': replacement, 'upsert': upsert}
def insert_one_kwargs(document):
return {'document': document}
def delete_one_kwargs(filter):
return {'filter': filter}
def delete_many_kwargs(filter):
return {'filter': filter}
[docs]
class MongoDBConnection(node.Node):
"""
MongoDB database connection.
Connection URL, database and collection can be specified either as part of
the configuration or using an opt-in URL type datasource port. When using
URL type datasource port, create a _Datasource_ node, set the datasource
type to be URL, and add two environment variables with names "database" and
"collection" under the Environment section.
The standard URL connection scheme has the form:
mongodb://[username:password@]host1[:port1][,...hostN[:portN]]...
[/[defaultauthdb][?options]]
See https://docs.mongodb.com/manual/reference/connection-string/ for more
details.
If the URL resource contains credential variables, these will be entered as
part of the URL. See the documentation for credentials in Sympathy for Data
for more info.
Tied to a specific collection which can be changed using Set Collection.
"""
name = 'MongoDB'
description = 'MongoDB database collection'
nodeid = 'com.sympathyfordata.database.mongodb.connection'
author = 'Erik der Hagopian'
tags = Tags(Tag.Database.MongoDB)
icon = 'mongodbnode.svg'
inputs = Ports([
Port.Datasource(
('Connection URL. Alternative configuration using an URL type '
'datasource with URL environment'),
name='connection_url', n=(0, 1, 0))])
outputs = Ports([mongodb_port()])
parameters = node.parameters()
set_connection_url_parameter(parameters)
utils_parameters.set_url_connection(
parameters, resource=_default_resource)
set_database_parameter(parameters)
set_collection_parameter(parameters)
controllers = [
port_field_exclusive_controller(
'connection_url', [
utils_parameters.url_connection_name,
'database', 'collection']),
# Trick to always hide legacy field by hiding it when a port which
# always exists is present.
port_field_exclusive_controller(
_mongodb_port_name, [_legacy_url_str], state='hidden')]
def execute(self, ctx):
datasources = ctx.input.group('connection_url')
connection = ctx.parameters[utils_parameters.url_connection_name].value
database = ctx.parameters['database'].value
collection = ctx.parameters['collection'].value
end = '.'
if datasources:
datasource = datasources[0]
if datasource.decode_type() == datasource.modes.url:
data = datasources[0].decode()
database = data['env'].get('database')
collection = data['env'].get('collection')
connection = datasource.connection()
end = ' in the datasource input.'
else:
raise SyDataError('Only URL datasources are supported.')
if not connection:
raise SyDataError(f'Connection URL must be specified{end}')
if not database:
raise SyDataError(f'Database must be specified{end}')
if not collection:
raise SyDataError(f'Collection must be specified{end}')
ctx.output['mongodb'].set({
'connection': connection,
'database': database,
'collection': collection})
def update_parameters(self, old_parameters):
for new_key, old_key, setter in [
(utils_parameters.url_connection_name,
_legacy_url_str, utils_parameters.set_url_connection),
]:
if new_key not in old_parameters:
setter(old_parameters)
try:
# Set new parameter value from old parameter.
old_parameter = old_parameters[old_key]
new_parameter = old_parameters[new_key]
utils_parameters.set_connection_from_string(
old_parameter, new_parameter)
except KeyError:
pass
else:
try:
# Keep old parameter up to date.
old_parameter = old_parameters[old_key]
new_parameter = old_parameters[new_key]
utils_parameters.set_string_from_connection(
new_parameter, old_parameter)
except KeyError:
pass
[docs]
class MongoDBCollection(node.Node):
"""
Change the configured collection of a MongoDB database.
"""
name = 'Set Collection'
description = 'Set MongoDB database collection.'
nodeid = 'com.sympathyfordata.database.mongodb.setcollection'
author = 'Erik der Hagopian'
tags = Tags(Tag.Database.MongoDB)
icon = 'setcollection_mongodb.svg'
inputs = Ports([mongodb_port()])
outputs = Ports([mongodb_port()])
parameters = node.parameters()
set_collection_parameter(parameters)
def execute(self, ctx):
in_db = ctx.input['mongodb']
out_db = ctx.output['mongodb']
collection = ctx.parameters['collection'].value
if not collection:
raise SyDataError('Collection must be specified.')
out_db.set(in_db.get())
out_db.collection = collection
def _json_convert(bson_dict):
# bson.json_util._json_util is private.
return json.loads(_bson_dumps(bson_dict))
[docs]
class CreateMongoDBJSON(node.Node):
"""
Create MongoDB compatible Json by writing a literal Python expression
extended with constructor functions.
The output can then be used as input for other MongoDB nodes, like,
*InsertOne*, *Find*, etc.
For info on how to input data to MongoDB, see :ref:`mongodb_data_format`.
"""
name = 'Create MongoDB Json'
author = 'Erik der Hagopian'
icon = 'mongodbnode.svg'
tags = Tags(Tag.Database.MongoDB)
nodeid = 'com.sympathyfordata.database.mongodb.createjson'
inputs = Ports([])
outputs = Ports([
Port.Custom('json', 'Output', name='output', preview=True)])
parameters = node.parameters()
parameters.set_string(
'code',
description='MongoDB compatible Python expression.',
value='{}',
editor=node.editors.code_editor())
def execute(self, node_context):
bson_dict = literal_bson(
node_context.parameters['code'].value,
'Code must be MongoDB compatible.')
json_dict = _json_convert(bson_dict)
node_context.output[0].set(json_dict)
[docs]
class FindMongoDB(node.Node):
"""
Find multiple documents from the database matching `filter`.
Selection of fields to return can be specified using `projection`. Filter
and Projection can be specified either as part of the configuration or
using opt-in json-ports.
See https://docs.mongodb.com/manual/reference/method/db.collection.find/
for more details.
For info on how to input data to MongoDB, see :ref:`mongodb_data_format`.
"""
name = 'Find'
description = 'Find multiple documents from the database'
nodeid = 'com.sympathyfordata.database.mongodb.findmany'
author = 'Erik der Hagopian'
tags = Tags(Tag.Database.MongoDB)
icon = 'find_mongodb.svg'
inputs = Ports([
mongodb_port(),
json_port(
'Filter as json', 'filter', opt='in'),
json_port(
'Projection as json', 'projection', opt='in'),
])
outputs = Ports([
mongodb_port(opt='out'),
Port.Custom('json', 'Documents', name='documents', preview=True)])
parameters = node.parameters()
set_filter_parameter(parameters)
set_projection_parameter(parameters)
set_limit_parameter(parameters)
controllers = [
port_field_exclusive_controller('filter'),
port_field_exclusive_controller('projection'),
]
def execute(self, ctx):
in_db = ctx.input['mongodb']
out_doc = ctx.output['documents']
limit = ctx.parameters['limit'].value
filter = input_or_param('filter', ctx.input, ctx.parameters)
projection = input_or_param('projection', ctx.input, ctx.parameters)
with db_connected(self, in_db):
res = in_db.find(filter=filter, projection=projection, limit=limit)
out_doc.set(to_json(res))
propagate_db(in_db, ctx.output.group('mongodb'))
# class MongoDBInsertList(node.Node):
# name = 'Insert list'
# description = 'Insert a list of documents into the database'
# nodeid = 'com.sympathyfordata.database.mongodb.insertmany'
# author = 'Erik der Hagopian'
# tags = Tags(Tag.Database.MongoDB)
# icon = 'mongodbnode.svg'
# inputs = Ports([
# mongodb_port(),
# Port.Json('Document list', name='documents')])
# outputs = Ports([
# Port.Json('Inserted objects', name='inserted')])
# parameters = node.parameters()
# def execute(self, ctx):
# in_db = ctx.input['mongodb']
# in_doc = ctx.input['documents']
# out_res = ctx.output['inserted']
# db = MongoDB(**in_db.get())
# doc = to_bson(in_doc.get())
# with db:
# doclist = isinstance(doc, list)
# if not doclist:
# raise SyDataError(
# 'Document list input must be a list of documents.')
# try:
# res = db.insert_many(doc)
# except pymongo.errors.BulkWriteError as bwe:
# for write_error in bwe.details.get('writeErrors', []):
# we_msg = write_error.get('errmsg')
# we_code = write_error.get('code')
# we_index = write_error.get('index')
# # Duplicate key.
# if we_code == 11000:
# if we_msg and we_index is not None:
# raise SyDataError(
# f'Inserting index {we_index}: {we_msg}')
# raise
# out_res.set(to_json([{'inserted_id': id_}
# for id_ in res.inserted_ids]))
[docs]
class MongoDBInsertOne(node.Node):
"""
Insert a single document into the database.
For info on how to input data to MongoDB, see :ref:`mongodb_data_format`.
"""
name = 'Insert'
description = 'Insert a single document into the database'
nodeid = 'com.sympathyfordata.database.mongodb.insertone'
author = 'Erik der Hagopian'
tags = Tags(Tag.Database.MongoDB)
icon = 'insert_mongodb.svg'
inputs = Ports([
mongodb_port(),
Port.Json('Document', name='document')])
outputs = Ports([
mongodb_port(opt='out'),
json_port('Insert result', name='inserted', opt='out'),
write_operation_port()])
parameters = node.parameters()
def execute(self, ctx):
in_db = ctx.input['mongodb']
doc = to_bson(ctx.input['document'].get())
out_res = ctx.output.group('inserted')
write_operations = ctx.output.group('write_operation')
propagate_db(in_db, ctx.output.group('mongodb'))
operation = WriteOperations.InsertOne
kwargs = insert_one_kwargs(doc)
if write_operations:
write_operations[0].set(
store_write_operation(operation, kwargs))
return
with db_connected(self, in_db):
doclist = isinstance(doc, list)
if doclist:
raise SyDataError(
'Document input must be a single document.')
try:
res = write_method(operation)(in_db, **kwargs)
except pymongo.errors.DuplicateKeyError as dke:
write_error = dke.details
we_msg = write_error.get('errmsg')
we_code = write_error.get('code')
# Duplicate key.
if we_code == 11000:
if we_msg:
raise SyDataError(f"Inserting: {we_msg}") from dke
raise
if out_res:
out_res[0].set({'inserted_id': to_json(res.inserted_id)})
[docs]
class MongoDBUpdate(node.Node):
"""
Update documents from the database matching `filter`.
When `upsert` is active and no document matches `filter` a new document of
`Update` will be created.
The modifications are specified using `Update` which can be either an
update operator expression or an aggregation pipeline. Refer to
https://www.mongodb.com/docs/v7.0/reference/operator/update/ for writing
operator expressions, and
https://www.mongodb.com/docs/v7.0/reference/operator/aggregation-pipeline/
for aggregation pipelines.
The node can be configured to perform an Update One or an Update Many
`operation`. This determines if the update should be performed on only the
first or all documents matching filter.
Filter and Update can be specified either as part of the configuration
or using opt-in json-ports.
See
https://docs.mongodb.com/manual/reference/method/db.collection.updateOne/
and
https://docs.mongodb.com/manual/reference/method/db.collection.updateMany/
for more details.
For info on how to input data to MongoDB, see :ref:`mongodb_data_format`.
This node can output a write operation, see :ref:`mongodb_write_operation`.
"""
name = 'Update'
description = 'Update one or more documents in the database'
nodeid = 'com.sympathyfordata.database.mongodb.update'
author = 'Erik der Hagopian'
tags = Tags(Tag.Database.MongoDB)
icon = 'update_mongodb.svg'
inputs = Ports([
mongodb_port(),
json_port('Filter', 'filter', opt='in'),
json_port('Update', 'update', opt='in')])
outputs = Ports([
mongodb_port(opt='out'),
json_port('Update result', name='update_result', opt='out'),
write_operation_port()])
parameters = node.parameters()
set_filter_parameter(parameters)
set_update_parameter(parameters)
set_upsert_parameter(parameters)
set_operation_parameter(
parameters, 'Operation used for updating.',
WriteOperations.UpdateOne, [
WriteOperations.UpdateOne,
WriteOperations.UpdateMany])
controllers = [
port_field_exclusive_controller('filter'),
port_field_exclusive_controller('update'),
]
def execute(self, ctx):
in_db = ctx.input['mongodb']
update_result = ctx.output.group('update_result')
out_write_operations = ctx.output.group('write_operation')
upsert = ctx.parameters['upsert'].value
operation = WriteOperations(ctx.parameters['operation'].value)
filter = input_or_param('filter', ctx.input, ctx.parameters)
update = input_or_param('update', ctx.input, ctx.parameters)
propagate_db(in_db, ctx.output.group('mongodb'))
kwargs = {}
if operation == WriteOperations.UpdateOne:
kwargs = update_one_kwargs(
filter, update, upsert)
elif operation == WriteOperations.UpdateMany:
kwargs = update_many_kwargs(
filter, update, upsert)
if out_write_operations:
out_write_operations[0].set(
store_write_operation(operation, kwargs))
return
with db_connected(self, in_db):
res = write_method(operation)(in_db, **kwargs)
if update_result:
update_result[0].set(to_json({
'upserted_id': res.upserted_id,
'matched_count': res.matched_count,
'modified_count': res.modified_count}))
[docs]
class MongoDBReplace(node.Node):
"""
Replace a single document from the database matching `Filter`. If there are
multiple matches, only the first match will be replaced.
What to replace with is specified using `Replacement`, which should be a
complete replacement document. When `upsert` is active and no document
matches `Filter`, a new document of the `Replacement` will be created.
Filter and Replacement can be specified either as part of the configuration
or using opt-in json-ports.
See
https://docs.mongodb.com/manual/reference/method/db.collection.replaceOne/
for more details.
For info on how to input data to MongoDB, see :ref:`mongodb_data_format`.
This node can output a write operation, see :ref:`mongodb_write_operation`.
"""
name = 'Replace'
description = 'Replace one document in the database.'
nodeid = 'com.sympathyfordata.database.mongodb.replaceone'
author = 'Erik der Hagopian'
tags = Tags(Tag.Database.MongoDB)
icon = 'replace_mongodb.svg'
inputs = Ports([
mongodb_port(),
json_port('Filter', 'filter', opt='in'),
json_port('Replacement', 'replacement', opt='in')])
outputs = Ports([
mongodb_port(opt='out'),
json_port('Replace result', name='replace_result', opt='out'),
write_operation_port()])
parameters = node.parameters()
set_filter_parameter(parameters)
set_replacement_parameter(parameters)
set_upsert_parameter(parameters)
controllers = [
port_field_exclusive_controller('filter'),
port_field_exclusive_controller('replacement'),
]
def execute(self, ctx):
in_db = ctx.input['mongodb']
replace_result = ctx.output.group('replace_result')
out_write_operations = ctx.output.group('write_operation')
upsert = ctx.parameters['upsert'].value
operation = WriteOperations.ReplaceOne
filter = input_or_param('filter', ctx.input, ctx.parameters)
replacement = input_or_param('replacement', ctx.input, ctx.parameters)
propagate_db(in_db, ctx.output.group('mongodb'))
kwargs = replace_one_kwargs(
filter, replacement, upsert)
if out_write_operations:
out_write_operations[0].set(
store_write_operation(operation, kwargs))
return
with db_connected(self, in_db):
res = write_method(operation)(in_db, **kwargs)
if replace_result:
replace_result[0].set(to_json({
'upserted_id': res.upserted_id,
'matched_count': res.matched_count,
'modified_count': res.modified_count}))
[docs]
class MongoDBDelete(node.Node):
"""
Delete documents from the database matching `filter`.
The node can be configured to perform a Delete One or a Delete Many
`operation`. This determines if the delete should be performed on only the
first or all documents matching filter.
Filter can be specified either as part of the configuration or using an
opt-in json-port.
See
https://docs.mongodb.com/manual/reference/method/db.collection.deleteOne/
and
https://docs.mongodb.com/manual/reference/method/db.collection.deleteMany/
for more details.
For info on how to input data to MongoDB, see :ref:`mongodb_data_format`.
This node can output a write operation, see :ref:`mongodb_write_operation`.
"""
name = 'Delete'
description = 'Delete one or more documents in the database'
nodeid = 'com.sympathyfordata.database.mongodb.delete'
author = 'Erik der Hagopian'
tags = Tags(Tag.Database.MongoDB)
icon = 'delete_mongodb.svg'
inputs = Ports([
mongodb_port(),
json_port('Filter', 'filter', opt='in')])
outputs = Ports([
mongodb_port(opt='out'),
json_port('Delete result', name='delete_result', opt='out'),
write_operation_port()])
parameters = node.parameters()
set_filter_parameter(parameters)
set_operation_parameter(
parameters, 'Operation used for deleting.',
WriteOperations.DeleteOne, [
WriteOperations.DeleteOne,
WriteOperations.DeleteMany])
controllers = [
port_field_exclusive_controller('filter'),
]
def execute(self, ctx):
in_db = ctx.input['mongodb']
delete_result = ctx.output.group('delete_result')
out_write_operations = ctx.output.group('write_operation')
operation = WriteOperations(ctx.parameters['operation'].value)
filter = input_or_param('filter', ctx.input, ctx.parameters)
propagate_db(in_db, ctx.output.group('mongodb'))
kwargs = delete_one_kwargs(filter)
if out_write_operations:
out_write_operations[0].set(
store_write_operation(operation, kwargs))
return
with db_connected(self, in_db):
res = write_method(operation)(in_db, **kwargs)
if delete_result:
delete_result[0].set(to_json({
'deleted_count': res.deleted_count,
}))
[docs]
class MongoDBBulkWrite(node.Node):
"""
Perform a batch of write operations in the database.
The batch operations are specified using the `Requests` input port and
can contain Insert One, Update One, Update Many, Replace One, Delete
One and Delete Many operations. The operations should be specified as a
json-list, with each operation as an item in the list.
By adding the opt-in `write_operation` port, several MongoDB nodes can
output a json of write operation instead of performing the operation
directly. When put into a json-list, these write operations can be used as
input for the Bulk write node. To produce `Requests` input for the Bulk
write node from `write_operation` output ports of other nodes, use Item to
List to convert the operations to a List, and use List to Json to convert
the List to a json-list.
See
https://docs.mongodb.com/manual/reference/method/db.collection.bulkWrite/
for more details.
For info on how to input data to MongoDB, see :ref:`mongodb_data_format`.
"""
name = 'Bulk write'
description = 'Perform a batch of write operations in the database.'
nodeid = 'com.sympathyfordata.database.mongodb.bulkwrite'
author = 'Erik der Hagopian'
tags = Tags(Tag.Database.MongoDB)
icon = 'bulk_mongodb.svg'
inputs = Ports([mongodb_port(), Port.Json(
'Requests (Write operations list)', name='requests')])
outputs = Ports([
mongodb_port(opt='out'),
Port.Json('Batch result', name='batch_result')])
parameters = node.parameters()
set_ordered_parameter(parameters)
def execute(self, ctx):
in_db = ctx.input['mongodb']
dict_requests = to_bson(ctx.input['requests'].get())
batch_result = ctx.output['batch_result']
ordered = ctx.parameters['ordered'].value
object_requests = []
for dict_request in dict_requests:
for k, v in dict_request.items():
wo = getattr(WriteOperations, k)
object_requests.append(write_operations[wo][1](**v))
break
else:
raise AssertionError()
with db_connected(self, in_db):
res = in_db.bulk_write(object_requests, ordered=ordered)
batch_result.set(to_json({
'upserted_ids': list(res.upserted_ids.values()),
'deleted_count': res.deleted_count,
'inserted_count': res.inserted_count,
'matched_count': res.matched_count,
'modified_count': res.modified_count,
'upserted_count': res.upserted_count,
}))
propagate_db(in_db, ctx.output.group('mongodb'))