Source code for orchestrator.orchestrator_folder

from orchestrator.orchestrator_http import OrchestratorHTTP
from orchestrator.orchestrator_asset import Asset
from orchestrator.orchestrator_queue import Queue
from orchestrator.orchestrator_job import Job
from orchestrator.orchestrator_process_schedule import ProcessSchedule
from orchestrator.exceptions import OrchestratorMissingParam
from urllib.parse import urlencode
import requests


"""
Class to deal with calls and requests from a given Folder of an Orchestrator Cloud instance

"""


[docs]class Folder(OrchestratorHTTP): """Constructor :param client_id - the client id of your organization :type client_id - str :param refresh_token - a refresh token. :type refresh_token - str :param tenant_name - your account's logical name :type tenant_name - str :param session - an optional session object :type session - Session :param folder_id - the id of the folder (UiPath's organizations unit) :type folder_id - str :param folder_name - the name of the folder :type folder_name : str """ def __init__(self, client_id, refresh_token, tenant_name, session=None, folder_name=None, folder_id=None, access_token=None): super().__init__(client_id=client_id, refresh_token=refresh_token, tenant_name=tenant_name, folder_id=folder_id, session=session) if not tenant_name or not folder_id: raise OrchestratorMissingParam(value="tenant_name", message="Required parameter missing: tenant_name") self.id = folder_id self.access_token = access_token self.refresh_token = refresh_token self.tenant_name = tenant_name self.name = folder_name self.base_url = f"{self.cloud_url}/{self.tenant_name}/JTBOT/odata" if session: self.session = session else: self.session = requests.Session() def __str__(self): return f"Folder Id: {self.id} \nFolder Name: {self.name}"
[docs] def info(self): """Returns a information of a single folder based on its folder id """ endpoint = f"/Folders({self.id})" url = f"{self.base_url}{endpoint}" data = self._get(url) return data
[docs] def get_queues(self, options=None): """Parameters: :param options - dictionary of filtering odata options :type options - dict """ endpoint = "/QueueDefinitions" if options: query_params = urlencode(options) url = f"{self.base_url}{endpoint}?{query_params}" else: url = f"{self.base_url}{endpoint}" data = self._get(url) filt_data = data['value'] return [Queue(self.client_id, self.access_token, self.tenant_name, self.id, self.name, self.session, queue["Name"], queue["Id"], access_token=self.access_token) for queue in filt_data]
[docs] def get_queue_ids(self, options=None): """ Returns a list of dictionaries containing the queue name and the queue id :options dictionary for odata options """ queues = self.get_queues(options) ids = {} for queue in queues: ids.update({queue.id: queue.name}) return ids
[docs] def get_processing_records(self, options=None): """Returns a list of queue processing records for all the queues :param options - dictionary for odata options :type options - dict """ endpoint = "/QueueProcessingRecords" uipath_svc = "/UiPathODataSvc.RetrieveQueuesProcessingStatus" if options: query_params = urlencode(options) url = f"{self.base_url}{endpoint}?{query_params}" else: url = f"{self.base_url}{endpoint}{uipath_svc}" data = self._get(url) return data['value']
[docs] def get_queue_by_id(self, queue_id): queues = self.get_queue_ids() return Queue(self.client_id, self.refresh_token, self.tenant_name, self.id, self.name, self.session, queues[queue_id], queue_id=int(queue_id), access_token=self.access_token)
[docs] def get_assets(self, options=None): """ Returns list of assets :options dict of odata filter options """ endpoint = "/Assets" if options: query_params = urlencode(options) url = f"{self.base_url}{endpoint}?{query_params}" else: url = f"{self.base_url}{endpoint}" data = self._get(url) # pprint(data) # pprint(self.id) filt_data = data['value'] return [Asset(self.client_id, self.refresh_token, self.tenant_name, self.id, self.name, self.session, asset["Id"], asset["Name"], access_token=self.access_token) for asset in filt_data]
[docs] def get_asset_ids(self, options=None): """ Returns a dictionary of ky value pairs where keys are the assets names and the values are the ids :param options - dictionary of odata filter options """ assets = self.get_assets(options) ids = {} for asset in assets: ids.update({asset.id: asset.name}) return ids
[docs] def get_asset_by_id(self, asset_id): assets = self.get_asset_ids() return Asset(self.client_id, self.refresh_token, self.tenant_name, self.id, self.name, self.session, asset_id, assets[asset_id], access_token=self.access_token)
[docs] def create_asset(self, body=None): pass
[docs] def get_process_schedules(self, options=None): endpoint = "/ProcessSchedules" if options: query_params = urlencode(options) url = f"{self.base_url}{endpoint}?{query_params}" else: url = f"{self.base_url}{endpoint}" data = self._get(url) filt_data = data["value"] return [ProcessSchedule(self.client_id, self.refresh_token, self.tenant_name, self.folder_id, self.session, process["Id"], process["Name"], access_token=self.access_token) for process in filt_data]
[docs] def get_schedule_ids(self, options=None): """ Returns a list of dictionaries name -- schedule_id """ process_schedules = self.get_process_schedules(options=options) ids = {} for schedule in process_schedules: ids.update({schedule.id: schedule.name}) return ids
[docs] def get_sessions(self, options=None): """ Gets all the sessions for the current folder """ endpoint = "/Sessions" if options: query_params = urlencode(options) url = f"{self.base_url}{endpoint}?{query_params}" else: url = f"{self.base_url}{endpoint}" return self._get(url)["value"]
[docs] def get_machine_runtime_sessions(self): """ No se por que no va """ endpoint = "/Sessions" uipath_svc = "UiPath.Server.Configuration.OData.GetMachineSessionRuntimesByFolderId(folderId={self.id})" url = f"{self.base_url}{endpoint}{uipath_svc}" return self._get(url)
[docs] def get_jobs(self, top="100", options=None): """ Returns the jobs of a given folder @top : maximum number of results (100 default) @options: dictionary of odata filtering options """ endpoint = "/Jobs" default = {"$orderby": "StartTime desc", "$top": f"{top}"} enc_default = urlencode(default) if options: default.update(options) query_params = urlencode(default) url = f"{self.base_url}{endpoint}?{query_params}" else: url = f"{self.base_url}{endpoint}?{enc_default}" data = self._get(url)["value"] # print(len(data)) return [Job(self.client_id, self.refresh_token, self.tenant_name, self.id, self.name, self.session, job["Id"], job["Key"], job["ReleaseName"], access_token=self.access_token) for job in data]
[docs] def get_job_keys(self, top="100", options=None): """ """ jobs = self.get_jobs(top, options) ids = {} for job in jobs: ids.update({job.key: job.name}) return ids
[docs] def get_job_by_key(self, key): endpoint = "/Jobs" query_param = urlencode({"$filter": f"Key eq {key}"}) url = f"{self.base_url}{endpoint}?{query_param}" data = self._get(url)["value"][0] return Job(self.client_id, self.refresh_token, self.tenant_name, self.id, self.name, self.session, data["Id"], data["Key"], data["ReleaseName"], access_token=self.access_token)
[docs] def job_triggers(self, options=None): endpoint = "/JobTriggers" if options: query_params = urlencode(options) url = f"{self.base_url}{endpoint}?{query_params}" else: url = f"{self.base_url}{endpoint}" return self._get(url)["value"]