"""
The enviornment.py module contains core data container classes and retrieval methods. The core classes
are the Environment, Demonstration, and Obsevation used throughout Cairo LfD's codebase.
"""
import json
import numpy as np
from collections import OrderedDict
from geometry_msgs.msg import Pose
[docs]def import_configuration(filepath):
"""
Wrapper function around json.load() to import a config.json file used to inform the Environment object.
"""
with open(filepath) as json_data:
return json.load(json_data, object_pairs_hook=OrderedDict)
[docs]class Environment(object):
"""
Environment container class that houses various objects (items, robots, constraints) relevant to conducting
LfD experimentation.
Attributes
----------
items : list
List of environment items/objects i.e. blocks, spatulasm, cups etc,.. of the AbstractItem class.
robot : SawyerRobot
AbstractItem extended class object representing the robot.
constraints : list
List of constrain class objects representing the available constraints for the Demonstration.
"""
def __init__(self, items, robot, constraints):
"""
Parameters
----------
items : list
List of environment items/objects i.e. blocks, spatulasm, cups etc,.. of the AbstractItem class.
robot : SawyerRobot
AbstractItem extended class object representing the robot.
constraints : list
List of constrain class objects representing the available constraints for the Demonstration.
"""
self.items = items
self.robot = robot
self.constraints = constraints
[docs] def get_robot_state(self):
"""
Retrieves the robot item's current state
Returns
-------
entries : dict
Dictionary representing robot's state. See SawyerRobot class.
"""
return self.robot.get_state()
[docs] def get_robot_info(self):
"""
Retrieves the robot item's configuration information.
Returns
-------
entries : dict
Dictionary representing robot's information. See SawyerRobot class.
"""
return self.robot.get_info()
[docs] def get_item_states(self):
"""
Retrieves the Environment's items states.
Returns
-------
entries : list
List of dictionaries for each of the Environment's items (exluding the robot item)
"""
item_states = []
if self.items is not None:
for item in self.items:
item_states.append(item.get_state())
return item_states
[docs] def get_item_info(self):
"""
Retrieves the Environment's items information.
Returns
-------
entries : list
List of dictionaries for each of the Environment's items (exluding the robot item)
"""
item_info = []
if self.items is not None:
for item in self.items:
item_info.append(item.get_info())
return item_info
[docs] def get_constraint_by_id(self, constraint_id):
"""
Retrieves a constraint from the Environment by it's id.
Parameters
----------
iconstraint_id : int
The id of the constraint to retrieve.
Returns
-------
: Constraint class
Constraint class for the given id.
"""
return [constraint for constraint in self.constraints if constraint.id == constraint_id][0]
[docs] def check_constraint_triggers(self):
"""
Checks all constraints for their trigger. A triggered constraint might be a button press on Sawyer's
cuff or a natural language dicated constraint.
Returns
-------
triggered_cosntraints: list
List of the id's of all the constraints currently triggered
"""
triggered_constraints = []
for constraint in self.constraints:
result = constraint.check_trigger()
if result != 0:
triggered_constraints.append(constraint.id)
return triggered_constraints
[docs]class Demonstration(object):
"""
Demonstration object to contain list of osbervations and various methods to perform on those observations.
Attributes
----------
observations : list
List of Observation objects representing raw observations from demonstration. (Requried)
aligned_observation : SawyerRobot
List of Observation objects representing DTW aligned observations.
labeled_observations : list
List of Observation objects representing keyframe labeled observations.
"""
def __init__(self, observations, aligned_observation=None, labeled_observations=None):
"""
Parameters
----------
observations : list
List of Observation objects representing raw observations from demonstration. (Requried)
aligned_observation : SawyerRobot
List of Observation objects representing DTW aligned observations.
labeled_observations : list
List of Observation objects representing keyframe labeled observations.
"""
self.observations = observations
self.aligned_observation = aligned_observation
self.labeled_observations = labeled_observations
[docs] def get_observation_by_index(self, idx):
"""
Returns an raw observation by its index.
Parameters
----------
idx : int
Index integer
Returns
-------
: Observation
The retrieved observation object.
"""
return self.observations[idx]
[docs] def get_applied_constraint_order(self):
"""
Returns the applied constraint order of a Demonstration's aligned observation list.
Returns
-------
constraint_order: list
List of list where each element is ordered by the sequence of the applied constraints and represents
the set of constraints applied.
"""
constraint_order = []
curr = []
for ob in self.aligned_observations:
if curr != ob.data["applied_constraints"]:
constraint_order.append(ob.data["applied_constraints"])
curr = ob.data["applied_constraints"]
return constraint_order
[docs]class Observation(object):
"""
Observation object to contain raw dictionary data for an observation and retrieval methods for that data.
Example
-------
.. code-block:: json
{
"applied_constraints": [],
"items": [
{
"id": 1,
"orientation": [
0.6874194527002508,
-0.06937214001305077,
0.7140914218104518,
0.1127627754891884
],
"position": [
0.8263962716618255,
0.3449914200419018,
-0.1353068521650853
},
{
"id": 2,
"orientation": [
0.7874194527002508,
-0.16937214001305077,
0.23140914218104518,
0.2327627754891884
],
"position": [
0.1034962716618255,
0.9469914200419018,
-0.54330685216508534
]
}
],
"keyframe_id": null, => labeled_demosntration only
"keyframe_type": null, => labeled_demosntration only
"robot": {
"gripper": 0.0,
"id": 1,
"joints": [
-0.1242646484375,
0.1710810546875,
3.044984375,
-1.006376953125,
-2.976296875,
-1.216076171875,
-1.41415234375
],
"orientation": [
0.6874194527002508,
-0.06937214001305077,
0.7140914218104518,
0.11276277548918841
],
"position": [
0.8263962716618255,
0.3449914200419018,
-0.13530685216508534
]
},
"time": 0.38476085662841797,
"triggered_constraints": []
}
Attributes
----------
data : dict
Dictionary of raw data collecting item, robot and constraint states.
"""
def __init__(self, observation_data):
"""
Parameters
----------
observation_data : dict
Dictionary of raw data collecting item, robot and constraint states.
"""
self.data = observation_data
[docs] @classmethod
def init_samples(cls, pose, orientation, joints):
"""
Parameters
----------
pose : array of pose data
orientation: array of orientation data
"""
observation_data = {"robot": {"orientation": orientation,
"position": pose,
"joints": joints}}
return cls(observation_data)
[docs] def get_timestamp(self):
"""
Get's osbervations timestamp
Returns
-------
: float
Integer timestamp in milliseconds representing time from epoch.
"""
return self.data["time"]
[docs] def get_robot_data(self):
"""
Get the observation's robot data.
Returns
-------
: dictionary
dictionary of robot data
"""
return self.data["robot"]
[docs] def get_pose_list(self):
"""
Get the pose data of the observation as a list of numerical values.
Returns
-------
: list
combined data from position and orientation
"""
robot = self.get_robot_data()
if isinstance(robot["orientation"], np.ndarray) and isinstance(robot["orientation"], np.ndarray):
return np.concatenate((robot["position"], robot["orientation"]))
else:
return robot["position"] + robot["orientation"]
[docs] def get_joint_list(self):
"""
Get the joint data of the observation as a list of numerical values.
Returns
-------
: list
list of numerical joint values for the given robot.
"""
robot = self.get_robot_data()
return robot["joints"]
[docs] def get_pose_msg(self):
"""
Get pose data and return a pose msg type
Returns:
--------
: geometry_msg Pose
a pose message of the observation
"""
pose_list = self.get_pose_list()
pose_msg = Pose()
pose_msg.position.x = pose_list[0]
pose_msg.position.y = pose_list[1]
pose_msg.position.z = pose_list[2]
pose_msg.orientation.x = pose_list[3]
pose_msg.orientation.y = pose_list[4]
pose_msg.orientation.z = pose_list[5]
pose_msg.orientation.w = pose_list[6]
return pose_msg
[docs] def get_keyframe_info(self):
"""
send back tuple of keyframe num and type
Returns
-------
: tuple
(keyframe_id, keyframe_type)
"""
return (self.data["keyframe_id"], self.data["keyframe_type"])
[docs] def get_item_data(self, item_id):
"""
Gets an items information based on its id
Parameters
----------
observation_data : dict
Dictionary of raw data collecting item, robot and constraint states.
Returns
-------
: float
Integer timestamp in milliseconds representing time from epoch.
"""
for item in self.data["items"]:
# return first occurance, should only be one
if item["id"] == item_id:
return item
[docs] def get_triggered_constraint_data(self):
"""
Get the triggered constraints of an object.
Returns
-------
: list
List of constraint id's.
"""
return self.data["triggered_constraints"]
[docs] def get_applied_constraint_data(self):
"""
Get the applied constraints of an object.
Returns
-------
: list
List of constraint id's.
"""
if "applied_constraints" in self.data.keys():
return self.data["applied_constraints"]
else:
return None