Customize World¶
This documentation overviews how to create a new world and relevant useful methods. Since World class wraps the simulator inside, please make sure the simulator has been installed successfully before creating your custom world.
Here we would like to develop a new world, named NewSim
, and wrap the new simulator NewSim
.
Create a New World¶
Create a New World Class¶
First, please create a new file world_newsim.py
in the directory LinSignal/world/
and write the following code into the file.
import newsim
from common.registry import Registry
@Registry.register_world('newsim')
class World(object):
def __init__(self, newsim_config, **kwargs):
# section 1: initialize parameters and read traffic network from file
# initialize parameters
self.eng = None
self.intersections = None
self.id2intersection = None
self.all_roads = None
self.all_lanes = None
self.fns = None
self.info = None
# read traffic network file code
# section 2: create intersections, roads, lanes, roadlinks, lanelinks, etc.
# code
# section 3: define info_functions
self.info_functions = {
"vehicles": self.get_vehicles,
"lane_count": self.get_lane_vehicle_count,
"lane_waiting_count": self.get_lane_waiting_count,
"lane_vehicles": self.get_lane_vehicles,
"time": self.get_current_time,
"vehicle_distance": self.get_vehicle_distance,
"pressure": self.get_pressure,
"lane_waiting_time_count": self.get_lane_waiting_time_count,
"lane_delay": self.get_lane_delay,
"real_delay": self.get_real_delay,
"vehicle_trajectory": self.get_vehicle_trajectory,
"history_vehicles": self.get_history_vehicles,
"phase": self.get_cur_phase,
"throughput": self.get_cur_throughput,
"averate_travel_time": self.get_average_travel_time
}
def subscribe(self):
pass
def _update_infos(self):
pass
def step(self):
pass
def reset(self):
pass
You can see that we first register the new world using Registry
. Then we define __init__()
and some methods that must be implemented.
Section 1 of __init__()
is to read information from the traffic network file and initialize parameters, including engine, intersections, roads, lanes and others, users could also customize the parameters they used later.
Section 2 of __init__()
is to create intersections, roads and other required parameters according to the road network file read from Section 1. Since the format of Atomic Files for different simulators is different, different simulators have different building methods.
Section 3 of __init__()
, we define info_functions
, which aims to retrieve information from the environment and update information. In info_functions
, the information and methods are mapped as key-value pairs. Users could also customize the information they want to get by constructing a key-value mapping between information names and methods. Note,
the methods that appeared in the value list of info_functions
must be implemented by calling interfaces of the simulator or user-defined methods.
Then, we would introduce the methods that must be implemented when creating a new World.
Implement subscribe()¶
subscribe
methods would subscribe information that users want to get. When initializing Generators, init() methods would call subscribe
methods, so that the Generator class would call generate()
method to obtain the corresponding information after each episode or each step. Note, the name of subscribed information must appear in the value list of info_functions
.
You can define subscribe()
like this:
@Registry.register_world('newsim')
class World(object):
def subscribe(self, fns):
if isinstance(fns, str):
fns = [fns]
for fn in fns:
if fn in self.info_functions:
if not fn in self.fns:
self.fns.append(fn)
else:
raise Exception("info function %s not exists" % fn)
Implement _update_infos()¶
_update_infos
method would update the global world’s information after reset or each step.
You can define _update_infos()
like this:
@Registry.register_world('newsim')
class World(object):
def _update_infos(self):
self.info = {}
for fn in self.fns:
self.info[fn] = self.info_functions[fn]()
Implement step()¶
To make Agents interact with World, we should implement step()
method. It takes actions generated by Agent as input, performs the actions in units of intersections, and then updates the information, including global information, measurements and trajectory, etc.
You can define step()
like this:
@Registry.register_world('newsim')
class World(object):
def step(self, actions=None):
# section 1: take action to the intersections
if action is not None:
for i, intersection in enumerate(self.intersections):
intersection.step(action[i])
self.eng.next_step()
# section 2: update information
self._update_infos()
# section 3: update other information
# code
Implement reset()¶
Here, we take CityFlow as an example, you can define reset()
like this:
@Registry.register_world('newsim')
class World(object):
def reset(self):
# section 1: reconnect engine, in CityFlow:
self.eng.reset()
# section 2: reset intersections' information
for I in self.intersections:
I.reset()
# section 3: reset related information
self._update_infos()
# section 4: reset other information
# code
Create a New Intersection¶
Create a New Intersection Class¶
First, please write the following code in the file LinSignal/world/world_newsim.py
:
class Intersection(object):
def __init__(self, intersection, world):
# section 1: initialize parameters
self.id = intersection["id"]
self.eng = world.eng
self.roads = []
self.outs = []
self.directions = []
self.out_roads = None
self.in_roads = None
# section 2: set available phases and other related parameters
# code
def _change_phase(self):
pass
def step(self):
pass
def reset(self):
pass
def _get_direction(self):
pass
def sort_roads(self):
pass
You can see that in __init__()
method, we take the world and intersection information as input. We define and initialize parameters, including the world this intersection belongs to, the roads and lanes, etc. Then, there exist different methods to set available phases and other related parameters for different parameters. Finally, the methods that are essential to Intersection class are listed.
Section 1 of __init__()
is to define and initialize parameters, including the world this intersection belongs to, the roads and lanes, etc.
Section 2 of __init__()
is to set available phases and other related parameters. There are different methods to implement this part because of the differences in the internal implementation of different simulators.
Then, we would introduce the methods that must be implemented when creating a new Intersection.
Implement _change_phase()¶
For an intersection, it is needed to implement a method for switching traffic phases. _change_phase()
provides an interface to change the current phase into the next phase by interacting with simulators. In _change_phase()
method, phase
in input parameters is the traffic light planned to change to.
Take CityFlow as an example, you can define _change_phase()
like this:
class Intersection(object):
def _change_phase(self, phase):
self.eng.set_tl_phase(self.id, phase)
self._current_phase = phase
Implement step()¶
step()
method makes the intersection execute the action according to the interval, then updates the related information, including current_phase, current_phase_time, etc.
Take CityFlow as an example. You can define step()
like this:
class Intersection(object):
def step(self):
# if current phase is yellow, then continue to finish the yellow phase
if self._current_phase in self.yellow_phase_id:
if self.current_phase_time == self.yellow_phase_time:
self._change_phase(self.phases[self.action_before_yellow], interval)
self.current_phase = self.action_before_yellow
self.action_executed = self.action_before_yellow
else:
self.current_phase_time += interval
# if current phase is not yellow, then change the phase
else:
# if the phase planned to change to is the same as current phase, then just add the time of this phase
if action == self.current_phase:
self.current_phase_time += interval
else:
# if there exist yellow light behind each green light, then the next phase should be yellow phase as a transitional
if self.yellow_phase_time > 0:
# in SUMO, yellow(red) phase is arranged behind each green light
if self.if_sumo:
assert (self._current_phase+1)%len(self.all_phases) in self.yellow_phase_id
self._change_phase((self._current_phase+1)%len(self.all_phases), interval)
# in CityFlow, yellow(red) phase is fixed in the first location in phase list
else:
self._change_phase(self.yellow_phase_id[0], interval)
self.action_before_yellow = action
else:
self._change_phase(self.phases[action], interval)
self.current_phase = action
self.action_executed = action
Implement reset()¶
reset
method is to reset current_phase, action_before_yellow and action_executed, etc. By default, the first phase after resetting the environment is the one in the first position in the phase list.
Take CityFlow as an example. You can define step()
like this:
class Intersection(object):
def reset(self):
# section 1: set current phase id to 0, and take current phase into the engine
self.current_phase = 0
if len(self.phases) == 0:
self._current_phase = 0
else:
self._current_phase = self.phases[0] # true phase id (including yellow)
self.eng.set_tl_phase(self.id, self._current_phase)
# section 2: reset other informations
self.current_phase_time = 0
self.action_before_yellow = None
self.action_executed = None
Section 1 of reset()
is to set the current phase to the one that occupies the first position in the phase list.
Section 2 of reset()
is to reset related information, including current phase time and others.
Implement _get_direction()¶
_get_direction()
is to calculate the angle of the road for sorting roads later. The default order of roads is N-E-S-W. There are different calculation methods for different simulators.
Take CityFlow as an example, the code for getting the direction of a road is as follows:
class Intersection(object):
def _get_direction(self, road, out=True):
if out:
x = road[1][0] - road[0][0]
y = road[1][1] - road[0][1]
else:
x = road[-2][0] - road[-1][0]
y = road[-2][1] - road[-1][1]
tmp = atan2(x, y)
return tmp if tmp >= 0 else (tmp + 2 * pi)
Implement sort_roads()¶
sort_road()
is to sort roads, including in roads, out roads and others, by a specific order.
Take CityFlow as an example, the code for sorting is as follows:
class Intersection(object):
def sort_roads(self):
order = sorted(range(len(self.roads)),
key=lambda i: (self.directions[i], self.outs[i] if self.world.RIGHT else not self.outs[i]))
self.roads = [self.roads[i] for i in order]
self.directions = [self.directions[i] for i in order]
self.outs = [self.outs[i] for i in order]
self.out_roads = [self.roads[i] for i, x in enumerate(self.outs) if x]
self.in_roads = [self.roads[i] for i, x in enumerate(self.outs) if not x]
Now that you have learned how to add a new world, try the following commands to use this world!
python run.py -w newsim