tinygrad/examples/deep_deterministic_policy_g...

256 lines
8.6 KiB
Python

from typing import Optional, Tuple
from numpy.typing import NDArray
from tinygrad.tensor import Tensor
from tinygrad.nn import optim
from tinygrad.helpers import getenv
import numpy as np
import gym
DEVICE = "GPU" if getenv("GPU") else "CPU"
class Actor:
def __init__(self, num_actions: int, num_states: int, hidden_size: Tuple[int, int] = (400, 300)):
self.l1 = Tensor.glorot_uniform(num_states, hidden_size[0])
self.l2 = Tensor.glorot_uniform(hidden_size[0], hidden_size[1])
self.mu = Tensor.glorot_uniform(hidden_size[1], num_actions)
def forward(self, state: Tensor, upper_bound: float) -> Tensor:
out = state.dot(self.l1).relu()
out = out.dot(self.l2).relu()
out = out.dot(self.mu).tanh()
output = out * upper_bound
return output
class Critic:
def __init__(self, num_inputs: int, hidden_size: Tuple[int, int] = (400, 300)):
self.l1 = Tensor.glorot_uniform(num_inputs, hidden_size[0])
self.l2 = Tensor.glorot_uniform(hidden_size[0], hidden_size[1])
self.q = Tensor.glorot_uniform(hidden_size[1], 1)
def forward(self, state: Tensor, action: Tensor) -> Tensor:
inputs = state.cat(action, dim=1)
out = inputs.dot(self.l1).relu()
out = out.dot(self.l2).relu()
q = out.dot(self.q)
return q
class Buffer:
def __init__(self, num_actions: int, num_states: int, buffer_capacity: int = 100000, batch_size: int = 64):
self.buffer_capacity = buffer_capacity
self.batch_size = batch_size
self.buffer_counter = 0
self.state_buffer = np.zeros((self.buffer_capacity, num_states))
self.action_buffer = np.zeros((self.buffer_capacity, num_actions))
self.reward_buffer = np.zeros((self.buffer_capacity, 1))
self.next_state_buffer = np.zeros((self.buffer_capacity, num_states))
self.done_buffer = np.zeros((self.buffer_capacity, 1))
def record(
self, observations: Tuple[Tensor, NDArray, float, NDArray, bool]
) -> None:
index = self.buffer_counter % self.buffer_capacity
self.state_buffer[index] = observations[0].detach().numpy()
self.action_buffer[index] = observations[1]
self.reward_buffer[index] = observations[2]
self.next_state_buffer[index] = observations[3]
self.done_buffer[index] = observations[4]
self.buffer_counter += 1
def sample(self) -> Tuple[Tensor, Tensor, Tensor, Tensor, Tensor]:
record_range = min(self.buffer_counter, self.buffer_capacity)
batch_indices = np.random.choice(record_range, self.batch_size)
state_batch = Tensor(self.state_buffer[batch_indices], device=DEVICE, requires_grad=False)
action_batch = Tensor(self.action_buffer[batch_indices], device=DEVICE, requires_grad=False)
reward_batch = Tensor(self.reward_buffer[batch_indices], device=DEVICE, requires_grad=False)
next_state_batch = Tensor(self.next_state_buffer[batch_indices], device=DEVICE, requires_grad=False)
done_batch = Tensor(self.done_buffer[batch_indices], device=DEVICE, requires_grad=False)
return state_batch, action_batch, reward_batch, next_state_batch, done_batch
class GaussianActionNoise:
def __init__(self, mean: NDArray, std_deviation: NDArray):
self.mean = mean
self.std_dev = std_deviation
def __call__(self) -> Tensor:
return Tensor(
np.random.default_rng()
.normal(self.mean, self.std_dev, size=self.mean.shape)
.astype(np.float32),
device=DEVICE,
requires_grad=False,
)
class DeepDeterministicPolicyGradient:
"""Deep Deterministic Policy Gradient (DDPG).
https://arxiv.org/pdf/1509.02971.pdf
Args:
env: The environment to learn from.
lr_actor: The learning rate of the actor.
lr_critic: The learning rate of the critic.
gamma: The discount factor.
buffer_capacity: The size of the replay buffer.
tau: The soft update coefficient.
hidden_size: The number of neurons in the hidden layers of the actor and critic networks.
batch_size: The minibatch size for each gradient update.
noise_stddev: The standard deviation of the exploration noise.
Note:
In contrast to the original paper, actions are already included in the first layer
of the Critic and we use a Gaussian distribution instead of an Ornstein Uhlenbeck
process for exploration noise.
"""
def __init__(
self,
env: gym.Env,
lr_actor: float = 0.001,
lr_critic: float = 0.002,
gamma: float = 0.99,
buffer_capacity: int = 100000,
tau: float = 0.005,
hidden_size: Tuple[int, int] = (400, 300),
batch_size: int = 64,
noise_stddev: float = 0.1,
):
self.num_states = env.observation_space.shape[0]
self.num_actions = env.action_space.shape[0]
self.max_action = env.action_space.high.item()
self.min_action = env.action_space.low.item()
self.gamma = gamma
self.tau = tau
self.memory = Buffer(
self.num_actions, self.num_states, buffer_capacity, batch_size
)
self.batch_size = batch_size
self.noise = GaussianActionNoise(
mean=np.zeros(self.num_actions),
std_deviation=noise_stddev * np.ones(self.num_actions),
)
self.actor = Actor(self.num_actions, self.num_states, hidden_size)
self.critic = Critic(self.num_actions + self.num_states, hidden_size)
self.target_actor = Actor(self.num_actions, self.num_states, hidden_size)
self.target_critic = Critic(self.num_actions + self.num_states, hidden_size)
actor_params = optim.get_parameters(self.actor)
critic_params = optim.get_parameters(self.critic)
target_actor_params = optim.get_parameters(self.target_actor)
target_critic_params = optim.get_parameters(self.target_critic)
if DEVICE == "GPU":
[x.gpu_() for x in actor_params + critic_params + target_actor_params + target_critic_params]
self.actor_optimizer = optim.Adam(actor_params, lr_actor)
self.critic_optimizer = optim.Adam(critic_params, lr_critic)
self.update_network_parameters(tau=1.0)
def update_network_parameters(self, tau: Optional[float] = None) -> None:
"""Updates the parameters of the target networks via 'soft updates'."""
if tau is None:
tau = self.tau
for param, target_param in zip(
optim.get_parameters(self.actor), optim.get_parameters(self.target_actor)
):
target_param.assign(param.detach() * tau + target_param * (1.0 - tau))
for param, target_param in zip(
optim.get_parameters(self.critic), optim.get_parameters(self.target_critic)
):
target_param.assign(param.detach() * tau + target_param * (1.0 - tau))
def choose_action(self, state: Tensor, evaluate: bool = False) -> NDArray:
mu = self.actor.forward(state, self.max_action)
if not evaluate:
mu = mu.add(self.noise())
mu = mu.clip(self.min_action, self.max_action)
return mu.detach().numpy()
def learn(self) -> None:
"""Performs a learning step by sampling from replay buffer and updating networks."""
if self.memory.buffer_counter < self.batch_size:
return
(
state_batch,
action_batch,
reward_batch,
next_state_batch,
done_batch,
) = self.memory.sample()
target_actions = self.target_actor.forward(next_state_batch, self.max_action)
y = reward_batch + self.gamma * self.target_critic.forward(
next_state_batch, target_actions.detach()
) * (Tensor.ones(*done_batch.shape, device=DEVICE, requires_grad=False) - done_batch)
self.critic_optimizer.zero_grad()
critic_value = self.critic.forward(state_batch, action_batch)
critic_loss = y.detach().sub(critic_value).pow(2).mean()
critic_loss.backward()
self.critic_optimizer.step()
self.actor_optimizer.zero_grad()
actions = self.actor.forward(state_batch, self.max_action)
critic_value = self.critic.forward(state_batch, actions)
actor_loss = -critic_value.mean()
actor_loss.backward()
self.actor_optimizer.step()
self.update_network_parameters()
if __name__ == "__main__":
env = gym.make("Pendulum-v1")
agent = DeepDeterministicPolicyGradient(env)
num_episodes = 150
for episode in range(1, num_episodes+1):
cumulative_reward = 0.0
prev_state, info = env.reset() # for older gym versions only state is returned, so remove info
done = False
while not done:
prev_state = Tensor(prev_state, device=DEVICE, requires_grad=False)
action = agent.choose_action(prev_state)
state, reward, done, _, info = env.step(action) # for older gym versions there is only one bool, so remove _
cumulative_reward += reward
agent.memory.record((prev_state, action, reward, state, done))
agent.learn()
if done:
break
prev_state = state
print(
f"Episode {episode}/{num_episodes} - cumulative reward: {cumulative_reward}"
)