tinygrad/examples/deep_deterministic_policy_g...

257 lines
8.7 KiB
Python

from typing import Optional, Tuple
from numpy.typing import NDArray
from tinygrad.state import get_parameters
from tinygrad.tensor import Tensor
from tinygrad.nn import optim
from tinygrad.helpers import getenv
import numpy as np
import gym
DEVICE = "GPU" if getenv("GPU") else "CPU"
class Actor:
def __init__(self, num_actions: int, num_states: int, hidden_size: Tuple[int, int] = (400, 300)):
self.l1 = Tensor.glorot_uniform(num_states, hidden_size[0])
self.l2 = Tensor.glorot_uniform(hidden_size[0], hidden_size[1])
self.mu = Tensor.glorot_uniform(hidden_size[1], num_actions)
def forward(self, state: Tensor, upper_bound: float) -> Tensor:
out = state.dot(self.l1).relu()
out = out.dot(self.l2).relu()
out = out.dot(self.mu).tanh()
output = out * upper_bound
return output
class Critic:
def __init__(self, num_inputs: int, hidden_size: Tuple[int, int] = (400, 300)):
self.l1 = Tensor.glorot_uniform(num_inputs, hidden_size[0])
self.l2 = Tensor.glorot_uniform(hidden_size[0], hidden_size[1])
self.q = Tensor.glorot_uniform(hidden_size[1], 1)
def forward(self, state: Tensor, action: Tensor) -> Tensor:
inputs = state.cat(action, dim=1)
out = inputs.dot(self.l1).relu()
out = out.dot(self.l2).relu()
q = out.dot(self.q)
return q
class Buffer:
def __init__(self, num_actions: int, num_states: int, buffer_capacity: int = 100000, batch_size: int = 64):
self.buffer_capacity = buffer_capacity
self.batch_size = batch_size
self.buffer_counter = 0
self.state_buffer = np.zeros((self.buffer_capacity, num_states), np.float32)
self.action_buffer = np.zeros((self.buffer_capacity, num_actions), np.float32)
self.reward_buffer = np.zeros((self.buffer_capacity, 1), np.float32)
self.next_state_buffer = np.zeros((self.buffer_capacity, num_states), np.float32)
self.done_buffer = np.zeros((self.buffer_capacity, 1), np.float32)
def record(
self, observations: Tuple[Tensor, NDArray, float, NDArray, bool]
) -> None:
index = self.buffer_counter % self.buffer_capacity
self.state_buffer[index] = observations[0].detach().numpy()
self.action_buffer[index] = observations[1]
self.reward_buffer[index] = observations[2]
self.next_state_buffer[index] = observations[3]
self.done_buffer[index] = observations[4]
self.buffer_counter += 1
def sample(self) -> Tuple[Tensor, Tensor, Tensor, Tensor, Tensor]:
record_range = min(self.buffer_counter, self.buffer_capacity)
batch_indices = np.random.choice(record_range, self.batch_size)
state_batch = Tensor(self.state_buffer[batch_indices], device=DEVICE, requires_grad=False)
action_batch = Tensor(self.action_buffer[batch_indices], device=DEVICE, requires_grad=False)
reward_batch = Tensor(self.reward_buffer[batch_indices], device=DEVICE, requires_grad=False)
next_state_batch = Tensor(self.next_state_buffer[batch_indices], device=DEVICE, requires_grad=False)
done_batch = Tensor(self.done_buffer[batch_indices], device=DEVICE, requires_grad=False)
return state_batch, action_batch, reward_batch, next_state_batch, done_batch
class GaussianActionNoise:
def __init__(self, mean: NDArray, std_deviation: NDArray):
self.mean = mean
self.std_dev = std_deviation
def __call__(self) -> Tensor:
return Tensor(
np.random.default_rng()
.normal(self.mean, self.std_dev, size=self.mean.shape)
.astype(np.float32),
device=DEVICE,
requires_grad=False,
)
class DeepDeterministicPolicyGradient:
"""Deep Deterministic Policy Gradient (DDPG).
https://arxiv.org/pdf/1509.02971.pdf
Args:
env: The environment to learn from.
lr_actor: The learning rate of the actor.
lr_critic: The learning rate of the critic.
gamma: The discount factor.
buffer_capacity: The size of the replay buffer.
tau: The soft update coefficient.
hidden_size: The number of neurons in the hidden layers of the actor and critic networks.
batch_size: The minibatch size for each gradient update.
noise_stddev: The standard deviation of the exploration noise.
Note:
In contrast to the original paper, actions are already included in the first layer
of the Critic and we use a Gaussian distribution instead of an Ornstein Uhlenbeck
process for exploration noise.
"""
def __init__(
self,
env: gym.Env,
lr_actor: float = 0.001,
lr_critic: float = 0.002,
gamma: float = 0.99,
buffer_capacity: int = 100000,
tau: float = 0.005,
hidden_size: Tuple[int, int] = (400, 300),
batch_size: int = 64,
noise_stddev: float = 0.1,
):
self.num_states = env.observation_space.shape[0]
self.num_actions = env.action_space.shape[0]
self.max_action = env.action_space.high.item()
self.min_action = env.action_space.low.item()
self.gamma = gamma
self.tau = tau
self.memory = Buffer(
self.num_actions, self.num_states, buffer_capacity, batch_size
)
self.batch_size = batch_size
self.noise = GaussianActionNoise(
mean=np.zeros(self.num_actions),
std_deviation=noise_stddev * np.ones(self.num_actions),
)
self.actor = Actor(self.num_actions, self.num_states, hidden_size)
self.critic = Critic(self.num_actions + self.num_states, hidden_size)
self.target_actor = Actor(self.num_actions, self.num_states, hidden_size)
self.target_critic = Critic(self.num_actions + self.num_states, hidden_size)
actor_params = get_parameters(self.actor)
critic_params = get_parameters(self.critic)
target_actor_params = get_parameters(self.target_actor)
target_critic_params = get_parameters(self.target_critic)
if DEVICE == "GPU":
[x.gpu_() for x in actor_params + critic_params + target_actor_params + target_critic_params]
self.actor_optimizer = optim.Adam(actor_params, lr_actor)
self.critic_optimizer = optim.Adam(critic_params, lr_critic)
self.update_network_parameters(tau=1.0)
def update_network_parameters(self, tau: Optional[float] = None) -> None:
"""Updates the parameters of the target networks via 'soft updates'."""
if tau is None:
tau = self.tau
for param, target_param in zip(
get_parameters(self.actor), get_parameters(self.target_actor)
):
target_param.assign(param.detach() * tau + target_param * (1.0 - tau))
for param, target_param in zip(
get_parameters(self.critic), get_parameters(self.target_critic)
):
target_param.assign(param.detach() * tau + target_param * (1.0 - tau))
def choose_action(self, state: Tensor, evaluate: bool = False) -> NDArray:
mu = self.actor.forward(state, self.max_action)
if not evaluate:
mu = mu.add(self.noise())
mu = mu.clip(self.min_action, self.max_action)
return mu.detach().numpy()
def learn(self) -> None:
"""Performs a learning step by sampling from replay buffer and updating networks."""
if self.memory.buffer_counter < self.batch_size:
return
(
state_batch,
action_batch,
reward_batch,
next_state_batch,
done_batch,
) = self.memory.sample()
target_actions = self.target_actor.forward(next_state_batch, self.max_action)
y = reward_batch + self.gamma * self.target_critic.forward(
next_state_batch, target_actions.detach()
) * (Tensor.ones(*done_batch.shape, device=DEVICE, requires_grad=False) - done_batch)
self.critic_optimizer.zero_grad()
critic_value = self.critic.forward(state_batch, action_batch)
critic_loss = y.detach().sub(critic_value).pow(2).mean()
critic_loss.backward()
self.critic_optimizer.step()
self.actor_optimizer.zero_grad()
actions = self.actor.forward(state_batch, self.max_action)
critic_value = self.critic.forward(state_batch, actions)
actor_loss = -critic_value.mean()
actor_loss.backward()
self.actor_optimizer.step()
self.update_network_parameters()
if __name__ == "__main__":
env = gym.make("Pendulum-v1")
agent = DeepDeterministicPolicyGradient(env)
num_episodes = 150
for episode in range(1, num_episodes+1):
cumulative_reward = 0.0
prev_state, info = env.reset() # for older gym versions only state is returned, so remove info
done = False
while not done:
prev_state = Tensor(prev_state, device=DEVICE, requires_grad=False)
action = agent.choose_action(prev_state)
state, reward, done, _, info = env.step(action) # for older gym versions there is only one bool, so remove _
cumulative_reward += reward
agent.memory.record((prev_state, action, reward, state, done))
agent.learn()
if done:
break
prev_state = state
print(
f"Episode {episode}/{num_episodes} - cumulative reward: {cumulative_reward}"
)