Untitled
unknown
plain_text
8 months ago
6.3 kB
4
Indexable
from air_lab_interfaces.srv import ExecuteTst
import TstML
import ament_index_python
import sys
import traceback
import rclpy
from rclpy.node import Node
import os
from .undock_executor import UndockExecutor
from .dock_executor import DockExecutor
from .drive_to_executor import DriveToExecutor
from std_srvs.srv import Empty
import TstML.Executor
# Create a registry with node executors
class our_tst_execute(Node):
def __init__(self):
super().__init__('execute_tst')
self.get_logger().info(f'Executor node is getting started!')
self.tst_executor_registry = TstML.Executor.NodeExecutorRegistry()
self.tst_registry = TstML.TSTNodeModelsRegistry()
filepath = ament_index_python.get_package_prefix("air_tst") + "/share/air_tst/configs"
self.tst_registry.loadDirectory(filepath)
# Setup the executors for sequence and concurrent
self.tst_executor_registry.registerNodeExecutor(
self.tst_registry.model("seq"),
TstML.Executor.DefaultNodeExecutor.Sequence)
self.tst_executor_registry.registerNodeExecutor(
self.tst_registry.model("conc"),
TstML.Executor.DefaultNodeExecutor.Concurrent)
self.tst_executor_registry.registerNodeExecutor(
self.tst_registry.model("undock"),
UndockExecutor)
#self.get_logger().info(f'EXECUTOR START! ')
#for model in self.tst_registry.models():
# print(model)
self.tst_executor_registry.registerNodeExecutor(
self.tst_registry.model("dock"),
DockExecutor)
self.tst_executor_registry.registerNodeExecutor(
self.tst_registry.model("drive_to_executor"),
DriveToExecutor)
self.tst_executor = None
self.group = rclpy.callback_groups.ReentrantCallbackGroup()
# Add control services for abort, stop, pause, resume
self.create_service(Empty, 'abort', self.callback_abort,callback_group=self.group)
self.create_service(Empty, 'stop', self.callback_stop,callback_group=self.group)
self.create_service(Empty, 'pause', self.callback_pause,callback_group=self.group)
self.create_service(Empty, 'resume', self.callback_resume,callback_group=self.group)
self.srv = self.create_service(ExecuteTst, 'execute_tst', self.execute_tst_callback)
self.get_logger().info(f'Executor node is up and running!')
#self.get_logger().info(f'{repr(self.tst_e)}')
def display_exceptions(func):
def wrapper(*args):
try:
func(*args)
except Exception as ex:
traceback.print_exception(*sys.exc_info())
print(f"Unhandled exception was caught: '{ex}'")
return wrapper
@display_exceptions
def execute_tst_callback(self, request, response):
filename = request.tst_file
if self.tst_executor:
error_msg = f'TST Executor already and tst {filename=} could not be ran'
self.get_logger().error(error_msg)
response.success = False
response.error_message = error_msg
return response
tst_node = TstML.TSTNode.load(filename, self.tst_registry)
self.get_logger().info(f'Starting TST: {filename=}')
self.tst_executor = TstML.Executor.Executor(tst_node,
self.tst_executor_registry)
self.tst_executor.start()
# Block until the execution has finished
status = self.tst_executor.waitForFinished()
self.get_logger().info(f'TST finished running with status {status.type()=}')
# Display the result of execution
if status.type() == TstML.Executor.ExecutionStatus.Type.Finished:
response.success = True
self.tst_executor = None
return response
elif status.type() == TstML.Executor.ExecutionStatus.Type.Failed:
error_msg = print(f'{self.tst_executor.ros_node=} execution failed ')
self.get_logger().info(error_msg)
response.error_msg = error_msg
else:
pass
self.get_logger().info(f'Something undefined happend. {status.type()=}')
self.tst_executor = None
response.success = False
return
def callback_abort(self, request, response):
#self.get_logger().info(f'{repr(self.tst_executor)} ')
if not self.tst_executor:
error_msg = "No tst_executor to abort!"
self.get_logger().info(error_msg)
if self.tst_executor:
self.get_logger().info(f'Abort for {request.filename=} been requested.')
self.tst_executor.abort()
else:
error_msg = "Something undefined happend in callback abort!"
self.get_logger().info(error_msg)
return response
# Stop callback (stop execution, but not necessarily abort)
def callback_stop(self, request, response):
if self.tst_executor:
self.get_logger().info(f'Stop has been requested.')
self.tst_executor.stop()
else:
self.get_logger().info(f'TST executor is undefined!')
return response
# Pause callback (pause the current execution)
def callback_pause(self, request, response):
if self.tst_executor:
self.get_logger().info(f'pause has been requested.')
self.tst_executor.pause()
else:
self.get_logger().info(f'TST executor is undefined!')
return response
# Resume callback (resume execution if paused)
def callback_resume(self, request, response):
if self.tst_executor:
self.get_logger().info(f'Resume has been requested.')
self.tst_executor.resume()
else:
self.get_logger().info(f'TST executor is undefined!')
return response
def main():
rclpy.init()
minimal_service = our_tst_execute()
executor = rclpy.executors.MultiThreadedExecutor()
executor.add_node(minimal_service)
executor.spin()
rclpy.shutdown()
if __name__ == '__main__':
main()Editor is loading...
Leave a Comment