Source code for ultipa.operate.edge_extra

from ultipa.operate.base_extra import BaseExtra

from ultipa.types import ULTIPA_REQUEST, ULTIPA, ULTIPA_RESPONSE
from ultipa.utils import UQLMAKER, CommandList
from ultipa.configuration.RequestConfig import RequestConfig

[docs] class EdgeExtra(BaseExtra): ''' Processing class that defines settings for edge related operations. '''
[docs] def searchEdge(self, request: ULTIPA_REQUEST.SearchEdge, requestConfig: RequestConfig = RequestConfig()) -> ULTIPA_RESPONSE.ResponseSearchEdge: ''' Query for edges. Args: request: An object of SearchEdge class requestConfig: An object of RequestConfig class Returns: ResponseSearchEdge ''' uqlMaker = UQLMAKER(command=CommandList.edges, commonParams=requestConfig) if request.id: uqlMaker.setCommandParams(request.id) elif request.filter: uqlMaker.setCommandParams(request.filter) uqlMaker.addParam('as', request.select.aliasName) uqlMaker.addParam("return", request.select) res = self.uqlSingle(uqlMaker) if res.status.code != ULTIPA.Code.SUCCESS: return res return res
[docs] def insertEdge(self, request: ULTIPA_REQUEST.InsertEdge, requestConfig: RequestConfig = RequestConfig()) -> ULTIPA_RESPONSE.ResponseInsert: ''' Insert edges. Args: request: An object of InsertEdge class requestConfig: An object of RequestConfig class Returns: ResponseInsert ''' uqlMaker = UQLMAKER(command=CommandList.insert, commonParams=requestConfig) if request.upsert: uqlMaker = UQLMAKER(command=CommandList.upsert, commonParams=requestConfig) if request.overwrite: uqlMaker.addParam('overwrite', "", required=False) if request.schemaName: uqlMaker.addParam('into', request.schemaName, required=False) uqlMaker.addParam('edges', request.edges) if request.isReturnID: uqlMaker.addParam('as', "edges") uqlMaker.addParam('return', "edges._uuid") res = self.uqlSingle(uqlMaker) if res.status.code != ULTIPA.Code.SUCCESS: return res if request.isReturnID: if len(res.aliases) > 0: res.data = res.items.get(res.aliases[0].alias).data return res
[docs] def updateEdge(self, request: ULTIPA_REQUEST.UpdateEdge, requestConfig: RequestConfig = RequestConfig()) -> ULTIPA_RESPONSE.UltipaResponse: ''' Update edges. Args: request: An object of UpdateEdge class requestConfig: An object of RequestConfig class Returns: UltipaResponse ''' uqlMaker = UQLMAKER(command=CommandList.updateEdges, commonParams=requestConfig) if request.id: uqlMaker.setCommandParams(request.id) elif request.filter: uqlMaker.setCommandParams(request.filter) uqlMaker.addParam("set", request.values) uqlMaker.addParam("silent", request.silent) res = self.UqlUpdateSimple(uqlMaker) return res
[docs] def deleteEdge(self, request: ULTIPA_REQUEST.DeleteEdge, requestConfig: RequestConfig = RequestConfig()) -> ULTIPA_RESPONSE.UltipaResponse: ''' Delete edges. Args: request: An object of DeleteEdge class requestConfig: An object of RequestConfig class Returns: UltipaResponse ''' uqlMaker = UQLMAKER(command=CommandList.deleteEdges, commonParams=requestConfig) if request.id: uqlMaker.setCommandParams(request.id) elif request.filter: uqlMaker.setCommandParams(request.filter) res = self.UqlUpdateSimple(uqlMaker) return res