Customize Agent¶
This documentation overviews how to create a new agent, including importing an agent from other libs.
Create a New Agent Class¶
To begin with, we should create a new model implemented from RLAgent for RL-based agents or BaseAgent for other types of agents.
Here we take the RL-based agent as an example. We would like to develop a new model, named NewModel
, for traffic signal control tasks.
First please create a new file newmodel.py
in the directory LibSignal/agent/
and write the following code into the file.
from . import RLAgent
from common.registry import Registry
@Registry.register_model('NewModel')
class NewModelAgent(RLAgent):
def __init__(self, world, rank):
pass
def reset(self):
pass
def __repr__(self):
pass
def get_ob(self):
pass
def get_reward(self):
pass
def get_phase(self):
pass
def get_queue(self):
pass
def get_delay(self):
pass
def get_action(self):
pass
def sample(self):
pass
def remember(self):
pass
def build_model(self):
pass
def update_target_network(self):
pass
def train(self):
pass
def load_model(self):
pass
def save_model(self):
pass
class NewModelNet():
def __init__(self, **kwargs):
pass
def forward(self, x):
pass
You can see that we use Registry
to register the new model, define the NewModelNet
class, and list the methods that must appear in the class.
In the following sections, we take the configuration of DQN model as an example.
Implement __init__()¶
Then we implement __init__()
method, which is used to define the model structure according to the parameters of input and configuration information from Registry
.
The input parameters of __init__()
are world
and this intersection’s rank
. The configuration information from Registry
contains various model parameters, input parameters required by creating generators and so on.
You can define __init__()
like this:
@Registry.register_model('NewModel')
class NewModelAgent(RLAgent):
def __init__(self, world, rank):
# section 1: get configs
self.world = world
self.rank = rank
self.sub_agents = len(self.world.intersections)
self.inter_id = self.world.intersection_ids[self.rank]
self.inter = self.world.id2intersection[self.inter_id]
self.phase = Registry.mapping['model_mapping']['setting'].param['phase']
self.one_hot = Registry.mapping['model_mapping']['setting'].param['one_hot']
self.gamma = Registry.mapping['model_mapping']['setting'].param['gamma']
self.grad_clip = Registry.mapping['model_mapping']['setting'].param['grad_clip']
self.epsilon = Registry.mapping['model_mapping']['setting'].param['epsilon']
self.epsilon_decay = Registry.mapping['model_mapping']['setting'].param['epsilon_decay']
self.epsilon_min = Registry.mapping['model_mapping']['setting'].param['epsilon_min']
self.learning_rate = Registry.mapping['model_mapping']['setting'].param['learning_rate']
self.vehicle_max = Registry.mapping['model_mapping']['setting'].param['vehicle_max']
self.batch_size = Registry.mapping['model_mapping']['setting'].param['batch_size']
# section 2: create generators for each Agent
self.ob_generator = LaneVehicleGenerator(self.world, self.inter, ['lane_count'], in_only=True, average=None)
self.phase_generator = IntersectionPhaseGenerator(world, self.inter, ["phase"],
targets=["cur_phase"], negative=False)
self.reward_generator = LaneVehicleGenerator(self.world, self.inter, ["lane_waiting_count"],
in_only=True, average='all', negative=True)
# section 3: set action space and ob_length
self.action_space = gym.spaces.Discrete(len(self.inter.phases))
if self.phase:
if self.one_hot:
self.ob_length = self.ob_generator.ob_length + len(self.inter.phases)
else:
self.ob_length = self.ob_generator.ob_length + 1
else:
self.ob_length = self.ob_generator.ob_length
# section 4: create model, target model and others
self.model = self._build_model()
self.target_model = self._build_model()
self.update_target_network()
self.criterion = nn.MSELoss(reduction='mean')
self.optimizer = optim.RMSprop(self.model.parameters(),
lr=self.learning_rate,
alpha=0.9, centered=False, eps=1e-7)
You can see that in the section 1 of the code, we take the necessary parameters from Registry
and input parameters.
In the section 2 of the code, we create observation, reward and phase generators for the model. Notes that different models have different input parameters of generators. Since the definitions of queue and delay are unified, queue and delay generators are directly created in the RLAgent.
In the section 3 of the code, we set action space list and observation length.
In the section 4 of the code, we create model, target model and others, including a criterion and an optimizer.
.. Note:: For Multi-Agent that shares information among agents, the code for creating generators is as follows:
# get generators for Agent
observation_generators = []
for inter in self.world.intersections:
node_id = inter.id
node_idx = self.world.id2idx[node_id]
node_obj = self.world.id2intersection[node_id]
tmp_generator = LaneVehicleGenerator(self.world, node_obj, ['lane_count'], in_only=True, average=None)
observation_generators.append((node_idx, tmp_generator))
sorted(observation_generators, key=lambda x: x[0]) # now generator's order is according to its index in graph
self.ob_generator = observation_generators
# get reward generator
rewarding_generators = []
for inter in self.world.intersections:
node_id = inter.id
node_idx = self.world.id2idx[node_id]
node_obj = self.world.id2intersection[node_id]
tmp_generator = LaneVehicleGenerator(self.world, node_obj, ["lane_waiting_count"],
in_only=True, average='all', negative=True)
rewarding_generators.append((node_idx, tmp_generator))
sorted(rewarding_generators, key=lambda x: x[0]) # now generator's order is according to its index in graph
self.reward_generator = rewarding_generators
# get phase generator
phasing_generators = []
for inter in self.world.intersections:
node_id = inter.id
node_idx = self.world.id2idx[node_id]
node_obj = self.world.id2intersection[node_id]
tmp_generator = IntersectionPhaseGenerator(self.world, node_obj, ['phase'],
targets=['cur_phase'], negative=False)
phasing_generators.append((node_idx, tmp_generator))
sorted(phasing_generators, key=lambda x: x[0]) # now generator's order is according to its index in graph
self.phase_generator = phasing_generators
# get queue generator
queues = []
for inter in self.world.intersections:
node_id = inter.id
node_idx = self.world.id2idx[node_id]
node_obj = self.world.id2intersection[node_id]
tmp_generator = LaneVehicleGenerator(self.world, node_obj, ["lane_waiting_count"],
in_only=True, negative=False)
queues.append((node_idx, tmp_generator))
sorted(queues, key=lambda x: x[0])
self.queue = queues
# get delay generator
delays = []
for inter in self.world.intersections:
node_id = inter.id
node_idx = self.world.id2idx[node_id]
node_obj = self.world.id2intersection[node_id]
tmp_generator = LaneVehicleGenerator(self.world, node_obj, ["lane_delay"],
in_only=True, average="all", negative=False)
delays.append((node_idx, tmp_generator))
sorted(delays, key=lambda x: x[0])
self.delay = delays
Implement reset()¶
Then we implement reset()
method, reset()
is used to reset information, including ob_generator, phase_generator, reward_generator, queue, delay, etc.
For example, you can define reset()
like this:
@Registry.register_model('NewModel')
class NewModelAgent(RLAgent):
def reset(self):
inter_id = self.world.intersection_ids[self.rank]
inter_obj = self.world.id2intersection[inter_id]
self.ob_generator = LaneVehicleGenerator(self.world, inter_obj, ['lane_count'], in_only=True, average=None)
self.phase_generator = IntersectionPhaseGenerator(self.world, inter_obj, ["phase"],
targets=["cur_phase"], negative=False)
self.reward_generator = LaneVehicleGenerator(self.world, inter_obj, ["lane_waiting_count"],
in_only=True, average='all', negative=True)
self.queue = LaneVehicleGenerator(self.world, inter_obj,
["lane_waiting_count"], in_only=True,
negative=False)
self.delay = LaneVehicleGenerator(self.world, inter_obj,
["lane_delay"], in_only=True, average="all",
negative=False)
Note: The above codes are applied for Single-Agent. For Multi-Agent generators, the user must reset by the way of traversing the list.
Implement build_model()¶
Then we implement build_model()
method, build_model()
is used to create a model(network). This method will be called by __init__()
twice for creating the model and target model.
For example, you can define build_model()
like this:
@Registry.register_model('NewModel')
class NewModelAgent(RLAgent):
def build_model(self):
model = NewModelNet(self.ob_length, self.action_space.n)
return model
Implement NewModelNet¶
Then we implement NewModelNet
class, which is the core of the model. NewModelNet
class can inherit both the base nn.Module
class and classes already implemented in other libs.
For example, you can define NewModelNet
like this:
class NewModelNet(nn.Module):
def __init__(self):
super(NewModelNet, self).__init__()
pass
def _forward(self, x):
pass
Implement __repr__()¶
Then we implement repr()
method, repr()
is used to return the self.model
structure.
For example, you can define __repr__()
like this:
@Registry.register_model('NewModel')
class NewModelAgent(RLAgent):
def __repr__(self):
return self.model.__repr__()
Implement get_ob()¶
Then we implement get_ob()
method, get_ob()
is used to get observation from environment.
For Single-Agent, you can define get_ob()
like this:
@Registry.register_model('NewModel')
class NewModelAgent(RLAgent):
def get_ob(self):
x_obs = []
x_obs.append(self.ob_generator.generate())
x_obs = np.array(x_obs, dtype=np.float32)
return x_obs
For Multi-Agent, you can define get_ob()
like this:
@Registry.register_model('NewModel')
class NewModelAgent(RLAgent):
def get_ob(self):
# sub_agents * lane_nums,
x_obs = []
for i in range(len(self.ob_generator)):
x_obs.append(self.ob_generator[i][1].generate())
length = set([len(i) for i in x_obs])
if len(length) == 1:
x_obs = np.array(x_obs, dtype=np.float32)
else:
x_obs = [np.expand_dims(x,axis=0) for x in x_obs]
return x_obs
Implement get_reward()¶
Then we implement get_reward()
method, get_reward()
is used to get reward from environment.
For Single-Agent, you can define get_reward()
like this:
@Registry.register_model('NewModel')
class NewModelAgent(RLAgent):
def get_reward(self):
rewards = []
rewards.append(self.reward_generator.generate())
rewards = np.squeeze(np.array(rewards))
return rewards
For Multi-Agent, you can define get_reward()
like this:
@Registry.register_model('NewModel')
class NewModelAgent(RLAgent):
def get_reward(self):
rewards = [] # sub_agents
for i in range(len(self.reward_generator)):
rewards.append(self.reward_generator[i][1].generate())
rewards = np.squeeze(np.array(rewards))
return rewards
Implement get_phase()¶
Then we implement get_phase()
method, get_phase()
is used to get current phase of intersection(s) from environment.
For Single-Agent, you can define get_phase()
like this:
@Registry.register_model('NewModel')
class NewModelAgent(RLAgent):
def get_phase(self):
phase = []
phase.append(self.phase_generator.generate())
phase = (np.concatenate(phase)).astype(np.int8)
return phase
For Multi-Agent, you can define get_phase()
like this:
@Registry.register_model('NewModel')
class NewModelAgent(RLAgent):
def get_phase(self):
phase = [] # sub_agents
for i in range(len(self.phase_generator)):
phase.append((self.phase_generator[i][1].generate()))
phase = (np.concatenate(phase)).astype(np.int8)
return phase
Implement get_queue()¶
Then we implement get_queue()
method, get_queue()
is used to get queue length of intersection.
For Single-Agent, you can define get_queue()
like this:
@Registry.register_model('NewModel')
class NewModelAgent(RLAgent):
def get_queue(self):
queue = []
queue.append(self.queue.generate())
# sum of lane nums
queue = np.sum(np.squeeze(np.array(queue)))
return queue
For Multi-Agent, you can define get_queue()
like this:
@Registry.register_model('NewModel')
class NewModelAgent(RLAgent):
def get_queue(self):
queue = []
for i in range(len(self.queue)):
queue.append((self.queue[i][1].generate()))
tmp_queue = np.squeeze(np.array(queue))
queue = np.sum(tmp_queue, axis=1 if len(tmp_queue.shape)==2 else 0)
return queue
Implement get_delay()¶
Then we implement get_delay()
method, get_delay()
is used to get delay of intersection.
For Single-Agent, you can define get_delay()
like this:
@Registry.register_model('NewModel')
class NewModelAgent(RLAgent):
def get_delay(self):
delay = []
delay.append(self.delay.generate())
delay = np.sum(np.squeeze(np.array(delay)))
return delay
For Multi-Agent, you can define get_delay()
like this:
@Registry.register_model('NewModel')
class NewModelAgent(RLAgent):
def get_delay(self):
delay = []
for i in range(len(self.delay)):
delay.append((self.delay[i][1].generate()))
delay = np.squeeze(np.array(delay))
return delay # [intersections,]
Implement get_action()¶
Then we implement get_action()
method, get_action()
is used to generate action according to features. Different models have different features, users only need to input the required parameters.
For DQN, it requires observation and phase as a feature, so you can define get_action()
like this:
@Registry.register_model('NewModel')
class NewModelAgent(RLAgent):
def get_action(self, ob, phase, test=False):
if not test:
if np.random.rand() <= self.epsilon:
return self.sample()
if self.phase:
if self.one_hot:
feature = np.concatenate([ob, utils.idx2onehot(phase, self.action_space.n)], axis=1)
else:
feature = np.concatenate([ob, phase], axis=1)
else:
feature = ob
observation = torch.tensor(feature, dtype=torch.float32)
actions = self.model(observation, train=False)
actions = actions.clone().detach().numpy()
return np.argmax(actions, axis=1)
Implement sample()¶
Then we implement sample()
method, sample()
is used to sample action randomly.
For example, you can define sample()
like this:
@Registry.register_model('NewModel')
class NewModelAgent(RLAgent):
def sample(self):
return np.random.randint(0, self.action_space.n, self.sub_agents)
Implement remember()¶
Then we implement remember()
method, remember()
is used to put current step information into the replay buffer for training the agent later.
For example, you can define remember()
like this:
@Registry.register_model('NewModel')
class NewModelAgent(RLAgent):
def remember(self, last_obs, last_phase, actions, actions_prob, rewards, obs, cur_phase, done, key):
self.replay_buffer.append((key, (last_obs, last_phase, actions, rewards, obs, cur_phase)))
Implement update_target_network()¶
Then we implement update_target_network()
method, update_target_network()
is used to update params of target network.
For example, you can define update_target_network()
like this:
@Registry.register_model('NewModel')
class NewModelAgent(RLAgent):
def update_target_network(self):
weights = self.model.state_dict()
self.target_model.load_state_dict(weights)
Implement train()¶
Then we implement train()
method, train()
is used to train the agent, and optimize the action generated by agent.
For example, you can define train()
like this:
@Registry.register_model('NewModel')
class NewModelAgent(RLAgent):
def train(self):
#take batch-sized samples from the replay buffer randomly
samples = random.sample(self.replay_buffer, self.batch_size)
# convert samples into corresponding formats
b_t, b_tp, rewards, actions = self._batchwise(samples)
# put the next_feature into target model
out = self.target_model(b_tp, train=False)
target = rewards + self.gamma * torch.max(out, dim=1)[0]
# put the current_feature into target model
target_f = self.model(b_t, train=False)
for i, action in enumerate(actions):
target_f[i][action] = target[i]
loss = self.criterion(self.model(b_t, train=True), target_f)
self.optimizer.zero_grad()
loss.backward()
clip_grad_norm_(self.model.parameters(), self.grad_clip)
self.optimizer.step()
if self.epsilon > self.epsilon_min:
self.epsilon *= self.epsilon_decay
return loss.clone().detach().numpy()
Implement load_model()¶
Then we implement load_model()
method, load_model()
is used to load model params of an episode.
For example, you can define load_model()
like this:
@Registry.register_model('NewModel')
class NewModelAgent(RLAgent):
def load_model(self, e):
model_name = os.path.join(Registry.mapping['logger_mapping']['path'].path,
'model', f'{e}_{self.rank}.pt')
self.model = self._build_model()
self.model.load_state_dict(torch.load(model_name))
self.target_model = self._build_model()
self.target_model.load_state_dict(torch.load(model_name))
Implement save_model()¶
Then we implement save_model()
method, save_model()
is used to save model params of an episode.
For example, you can define save_model()
like this:
@Registry.register_model('NewModel')
class NewModelAgent(RLAgent):
def save_model(self, e):
path = os.path.join(Registry.mapping['logger_mapping']['path'].path, 'model')
if not os.path.exists(path):
os.makedirs(path)
model_name = os.path.join(path, f'{e}_{self.rank}.pt')
torch.save(self.target_model.state_dict(), model_name)
.. Note:: If the customized model needs more complex methods, then you can rewrite the corresponding interface mentioned above. If the model is imported from other libs, and some methods have already been included in the library, then users can use “pass” to skip the implementation of the corresponding interface.
Import The Model¶
After adding the model, you need to modify the __init__.py
file in LibSignal/agent/__init__.py
.
Please add code like this:
from .newmodel import NewModelAgent
Add Model Config¶
Then, you need to create the LibSignal/configs/tsc/newmodel.yml
file, which is used to set the parameters of the model, trainer, world and logger.
For example, you can add codes like the following:
includes:
- configs/tsc/base.yml
model:
name: newmodel
train_model: True
epsilon: 0.1
one_hot: True
phase: True
trainer:
learning_start: 1000
world:
signal_config: {
hz1x1: {
phase_pairs: [[2, 6], [0, 4], [3, 7], [1, 5], [6, 7], [2, 3], [4, 5], [0, 1]],
valid_acts: null
}
}
logger:
attention: True
Note: The filename of the config and the value must be the same as the class name of the model you added. Just like the NewModel
above.
Now that you have learned how to add a new model, try the following commands to run this model!
python run.py -a newmodel