We provide a few examples that show how to use AI2-THOR.
A simple example that moves the agent ahead by one step and returns the corresponding image and metadata.
import ai2thor.controller
controller = ai2thor.controller.Controller()
controller.start()
# Kitchens: FloorPlan1 - FloorPlan30
# Living rooms: FloorPlan201 - FloorPlan230
# Bedrooms: FloorPlan301 - FloorPlan330
# Bathrooms: FloorPLan401 - FloorPlan430
controller.reset('FloorPlan28')
controller.step(dict(action='Initialize', gridSize=0.25))
event = controller.step(dict(action='MoveAhead'))
# Numpy Array - shape (width, height, channels), channels are in RGB order
event.frame
# Numpy Array in BGR order suitable for use with OpenCV
event.cv2image()
# current metadata dictionary that includes the state of the scene
event.metadata
To pickup an object the agent must first navigate to an area where there are pickupable/visible objects. Normally, it should be done through a series of MoveAhead, RotateLeft, RotateRight commands. Here we are teleporting directly to a known location that a mug exists.
import ai2thor.controller
controller = ai2thor.controller.Controller()
controller.start()
controller.reset('FloorPlan28')
controller.step(dict(action='Initialize', gridSize=0.25))
controller.step(dict(action='Teleport', x=-2.5, y=0.900998235, z=-3.0))
controller.step(dict(action='LookDown'))
event = controller.step(dict(action='Rotate', rotation=180))
# In FloorPlan28, the agent should now be looking at a mug
for o in event.metadata['objects']:
if o['visible'] and o['pickupable'] and o['objectType'] == 'Mug':
event = controller.step(dict(action='PickupObject', objectId=o['objectId']), raise_for_failure=True)
mug_object_id = o['objectId']
break
# the agent now has the Mug in its inventory
# to put it into the Microwave, we need to open the microwave first
event = controller.step(dict(action='LookUp'))
event = controller.step(dict(action='RotateLeft'))
event = controller.step(dict(action='MoveLeft'))
event = controller.step(dict(action='MoveLeft'))
event = controller.step(dict(action='MoveLeft'))
event = controller.step(dict(action='MoveLeft'))
event = controller.step(dict(action='MoveAhead'))
event = controller.step(dict(action='MoveAhead'))
event = controller.step(dict(action='MoveAhead'))
event = controller.step(dict(action='MoveAhead'))
event = controller.step(dict(action='MoveAhead'))
event = controller.step(dict(action='MoveAhead'))
for o in event.metadata['objects']:
if o['visible'] and o['openable'] and o['objectType'] == 'Microwave':
event = controller.step(dict(action='OpenObject', objectId=o['objectId']), raise_for_failure=True)
receptacle_object_id = o['objectId']
break
event = controller.step(dict(
action='PutObject',
receptacleObjectId=receptacle_object_id,
objectId=mug_object_id), raise_for_failure=True)
# close the microwave
event = controller.step(dict(
action='CloseObject',
objectId=receptacle_object_id), raise_for_failure=True)
This example shows how to run AI2-THOR in the multi-agent setting.
import ai2thor.controller
controller = ai2thor.controller.Controller()
controller.start()
# agentCount specifies the number of agents in a scene
multi_agent_event = controller.step(dict(action='Initialize', gridSize=0.25, agentCount=2))
# print out agentIds
for e in mult_agent_event.events:
print(e.metadata['agentId'])
# move the second agent ahead, agents are 0-indexed
multi_agent_event = controller.step(dict(action='MoveAhead', agentId=1))
This example shows how to run multiple instances of an agent in a multi-threaded fashion.
import threading
import time
import ai2thor.controller
thread_count = 8
def run():
controller = ai2thor.controller.Controller()
controller.start()
# 100 is an arbritary number
for _ in range(100):
t_start = time.time()
controller.reset('FloorPlan1')
controller.step({'action' : 'Initialize', 'gridSize' : 0.25})
print('init time', time.time() - t_start)
t_start_total = time.time()
for _ in range(10):
controller.step({'action' : 'MoveAhead'})
controller.step({'action' : 'RotateRight'})
total_time = time.time() - t_start_total
print('total time', total_time, 20 / total_time, 'fps')
threads = [threading.Thread(target=run) for _ in range(thread_count)]
for t in threads:
t.daemon = True
t.start()
time.sleep(1)
for t in threads:
# calling join() in a loop/timeout to allow for Python 2.7
# to be interrupted with SIGINT
while t.isAlive():
t.join(1)
print('done')