Customize World

This documentation overviews how to create a new world and relevant useful methods. Since World class wraps the simulator inside, please make sure the simulator has been installed successfully before creating your custom world.

Here we would like to develop a new world, named NewSim, and wrap the new simulator NewSim.

Create a New World

Create a New World Class

First, please create a new file world_newsim.py in the directory LinSignal/world/ and write the following code into the file.

import newsim
from common.registry import Registry

@Registry.register_world('newsim')
class World(object):
    def __init__(self, newsim_config, **kwargs):
        # section 1: initialize parameters and read traffic network from file
        # initialize parameters
        self.eng = None
        self.intersections = None
        self.id2intersection = None
        self.all_roads = None
        self.all_lanes = None
        self.fns = None
        self.info = None
        # read traffic network file code

        # section 2: create intersections, roads, lanes, roadlinks, lanelinks, etc.
        # code

        # section 3: define info_functions
        self.info_functions = {
            "vehicles": self.get_vehicles,
            "lane_count": self.get_lane_vehicle_count,
            "lane_waiting_count": self.get_lane_waiting_count,
            "lane_vehicles": self.get_lane_vehicles,
            "time": self.get_current_time,
            "vehicle_distance": self.get_vehicle_distance,
            "pressure": self.get_pressure,
            "lane_waiting_time_count": self.get_lane_waiting_time_count,
            "lane_delay": self.get_lane_delay,
            "real_delay": self.get_real_delay,
            "vehicle_trajectory": self.get_vehicle_trajectory,
            "history_vehicles": self.get_history_vehicles,
            "phase": self.get_cur_phase,
            "throughput": self.get_cur_throughput,
            "averate_travel_time": self.get_average_travel_time
        }

    def subscribe(self):
        pass
    
    def _update_infos(self):
        pass
    
    def step(self):
        pass
    
    def reset(self):
        pass

You can see that we first register the new world using Registry. Then we define __init__() and some methods that must be implemented.

Section 1 of __init__() is to read information from the traffic network file and initialize parameters, including engine, intersections, roads, lanes and others, users could also customize the parameters they used later.

Section 2 of __init__() is to create intersections, roads and other required parameters according to the road network file read from Section 1. Since the format of Atomic Files for different simulators is different, different simulators have different building methods.

Section 3 of __init__(), we define info_functions, which aims to retrieve information from the environment and update information. In info_functions, the information and methods are mapped as key-value pairs. Users could also customize the information they want to get by constructing a key-value mapping between information names and methods. Note, the methods that appeared in the value list of info_functions must be implemented by calling interfaces of the simulator or user-defined methods.

Then, we would introduce the methods that must be implemented when creating a new World.

Implement subscribe()

subscribe methods would subscribe information that users want to get. When initializing Generators, init() methods would call subscribe methods, so that the Generator class would call generate() method to obtain the corresponding information after each episode or each step. Note, the name of subscribed information must appear in the value list of info_functions.

You can define subscribe() like this:

@Registry.register_world('newsim')
class World(object):
    def subscribe(self, fns):
        if isinstance(fns, str):
                fns = [fns]
            for fn in fns:
                if fn in self.info_functions:
                    if not fn in self.fns:
                        self.fns.append(fn)
                else:
                    raise Exception("info function %s not exists" % fn)

Implement _update_infos()

_update_infos method would update the global world’s information after reset or each step.

You can define _update_infos() like this:

@Registry.register_world('newsim')
class World(object):
    def _update_infos(self):
        self.info = {}
        for fn in self.fns:
            self.info[fn] = self.info_functions[fn]()

Implement step()

To make Agents interact with World, we should implement step() method. It takes actions generated by Agent as input, performs the actions in units of intersections, and then updates the information, including global information, measurements and trajectory, etc.

You can define step() like this:

@Registry.register_world('newsim')
class World(object):
    def step(self, actions=None):
        # section 1: take action to the intersections
        if action is not None:
            for i, intersection in enumerate(self.intersections):
                intersection.step(action[i])
        self.eng.next_step()

        # section 2: update information
        self._update_infos()

        # section 3: update other information
        # code

Implement reset()

Here, we take CityFlow as an example, you can define reset() like this:

@Registry.register_world('newsim')
class World(object):
    def reset(self):
        # section 1: reconnect engine, in CityFlow:
        self.eng.reset()

        # section 2: reset intersections' information
        for I in self.intersections:
            I.reset()

        # section 3: reset related information
        self._update_infos()

        # section 4: reset other information
        # code

Create a New Intersection

Create a New Intersection Class

First, please write the following code in the file LinSignal/world/world_newsim.py:

class Intersection(object):
    def __init__(self, intersection, world):
        # section 1: initialize parameters
        self.id = intersection["id"]
        self.eng = world.eng
        self.roads = []
        self.outs = []
        self.directions = []
        self.out_roads = None
        self.in_roads = None

        # section 2: set available phases and other related parameters
        # code

    def _change_phase(self):
        pass
    
    def step(self):
        pass

    def reset(self):
        pass

    def _get_direction(self):
        pass

    def sort_roads(self):
        pass
    

You can see that in __init__() method, we take the world and intersection information as input. We define and initialize parameters, including the world this intersection belongs to, the roads and lanes, etc. Then, there exist different methods to set available phases and other related parameters for different parameters. Finally, the methods that are essential to Intersection class are listed.

Section 1 of __init__() is to define and initialize parameters, including the world this intersection belongs to, the roads and lanes, etc.

Section 2 of __init__() is to set available phases and other related parameters. There are different methods to implement this part because of the differences in the internal implementation of different simulators.

Then, we would introduce the methods that must be implemented when creating a new Intersection.

Implement _change_phase()

For an intersection, it is needed to implement a method for switching traffic phases. _change_phase() provides an interface to change the current phase into the next phase by interacting with simulators. In _change_phase() method, phase in input parameters is the traffic light planned to change to.

Take CityFlow as an example, you can define _change_phase() like this:

class Intersection(object):
    def _change_phase(self, phase):
        self.eng.set_tl_phase(self.id, phase)
        self._current_phase = phase

Implement step()

step() method makes the intersection execute the action according to the interval, then updates the related information, including current_phase, current_phase_time, etc.

Take CityFlow as an example. You can define step() like this:

class Intersection(object):
    def step(self):
        # if current phase is yellow, then continue to finish the yellow phase
        if self._current_phase in self.yellow_phase_id:
            if self.current_phase_time == self.yellow_phase_time:
                self._change_phase(self.phases[self.action_before_yellow], interval)
                self.current_phase = self.action_before_yellow
                self.action_executed = self.action_before_yellow
            else:
                self.current_phase_time += interval
                
        # if current phase is not yellow, then change the phase
        else:
            # if the phase planned to change to is the same as current phase, then just add the time of this phase
            if action == self.current_phase:
                    self.current_phase_time += interval
                else:
                    # if there exist yellow light behind each green light, then the next phase should be yellow phase as a transitional
                    if self.yellow_phase_time > 0:
                        # in SUMO, yellow(red) phase is arranged behind each green light
                        if self.if_sumo:
                            assert (self._current_phase+1)%len(self.all_phases) in self.yellow_phase_id
                            self._change_phase((self._current_phase+1)%len(self.all_phases), interval)

                        # in CityFlow, yellow(red) phase is fixed in the first location in phase list
                        else:
                            self._change_phase(self.yellow_phase_id[0], interval)
                        self.action_before_yellow = action
                    else:
                        self._change_phase(self.phases[action], interval)
                        self.current_phase = action
                        self.action_executed = action

        

Implement reset()

reset method is to reset current_phase, action_before_yellow and action_executed, etc. By default, the first phase after resetting the environment is the one in the first position in the phase list.

Take CityFlow as an example. You can define step() like this:

class Intersection(object):
    def reset(self):
        # section 1: set current phase id to 0, and take current phase into the engine
        self.current_phase = 0
        if len(self.phases) == 0:
            self._current_phase = 0
        else:
            self._current_phase = self.phases[0]  # true phase id (including yellow)
        self.eng.set_tl_phase(self.id, self._current_phase)

        # section 2: reset other informations
        self.current_phase_time = 0
        self.action_before_yellow = None
        self.action_executed = None

Section 1 of reset() is to set the current phase to the one that occupies the first position in the phase list.

Section 2 of reset() is to reset related information, including current phase time and others.

Implement _get_direction()

_get_direction() is to calculate the angle of the road for sorting roads later. The default order of roads is N-E-S-W. There are different calculation methods for different simulators.

Take CityFlow as an example, the code for getting the direction of a road is as follows:

class Intersection(object):
    def _get_direction(self, road, out=True):
        if out:
            x = road[1][0] - road[0][0]
            y = road[1][1] - road[0][1]
        else:
            x = road[-2][0] - road[-1][0]
            y = road[-2][1] - road[-1][1]
        tmp = atan2(x, y)
        return tmp if tmp >= 0 else (tmp + 2 * pi)

Implement sort_roads()

sort_road() is to sort roads, including in roads, out roads and others, by a specific order.

Take CityFlow as an example, the code for sorting is as follows:

class Intersection(object):
    def sort_roads(self):
        order = sorted(range(len(self.roads)),
                       key=lambda i: (self.directions[i], self.outs[i] if self.world.RIGHT else not self.outs[i]))
        self.roads = [self.roads[i] for i in order]
        self.directions = [self.directions[i] for i in order]
        self.outs = [self.outs[i] for i in order]
        self.out_roads = [self.roads[i] for i, x in enumerate(self.outs) if x]
        self.in_roads = [self.roads[i] for i, x in enumerate(self.outs) if not x]

Now that you have learned how to add a new world, try the following commands to use this world!

python run.py -w newsim