Last Update:04/09/2025
https://github.com/tetean/CSP-gym
This script trains a reinforcement learning agent in a custom environment using several RL algorithms (PPO, SAC, TD3, A2C, and DDPG) from the stable_baselines3
library. It does this by first setting up the environment (CSPEnvSiC
) and then training the agent for a fixed number of timesteps (5e5
). For each algorithm, the model is trained with a callback that saves the best-performing agent, and after training, it plots the total rewards over time for comparison. The results are saved in a timestamped directory for each algorithm.
import gymnasium as gym
import numpy as np
import matplotlib.pyplot as plt
from stable_baselines3 import PPO, SAC, TD3, A2C, DDPG
from stable_baselines3.common.monitor import Monitor
from callbacks.reward_callback import SaveOnBestTrainingRewardCallback
from stable_baselines3.common.results_plotter import plot_results
from stable_baselines3.common import results_plotter
from datetime import datetime
from environments import SiC
timesteps = int(5e5)
def train_model(env, algorithm, total_timesteps, callback):
if algorithm == "PPO":
model = PPO("MlpPolicy",env, verbose=1)
elif algorithm == "SAC":
model = SAC("MlpPolicy", env, verbose=1)
elif algorithm == "TD3":
model = TD3("MlpPolicy", env, verbose=1)
elif algorithm == "A2C":
model = A2C("MlpPolicy", env, verbose=1)
elif algorithm == "DDPG":
model = DDPG("MlpPolicy", env, verbose=1)
else:
raise ValueError("Invalid algorithm")
model.learn(total_timesteps=total_timesteps, callback=callback)
return model
algorithms = ['PPO', 'SAC', 'TD3', 'A2C', 'DDPG']
for _ in range(3):
for algorithm in algorithms:
current_time = datetime.now().strftime('%Y%m%d_%H%M%S')
log_dir = f'results/SiC/{algorithm}/{current_time}'
callback = SaveOnBestTrainingRewardCallback(check_freq=10000, log_dir=log_dir, verbose=1)
os.makedirs(log_dir, exist_ok=True)
env = SiC.CSPEnvSiC(num_atoms=4, reward_class=CSPEnergyReward)
env = Monitor(env, log_dir)
train_model(env, algorithm, timesteps, callback)
plt.clf()
plot_results([log_dir], timesteps, results_plotter.X_TIMESTEPS, "Total Rewards Comparison (SiC)")
plt.savefig(os.path.join(log_dir, f'res_fig.png'))
SaveOnBestTrainingRewardCallback
import os
import gymnasium as gym
import numpy as np
import matplotlib.pyplot as plt
from stable_baselines3 import TD3
from stable_baselines3.common import results_plotter
from stable_baselines3.common.monitor import Monitor
from stable_baselines3.common.results_plotter import load_results, ts2xy, plot_results
from stable_baselines3.common.noise import NormalActionNoise
from stable_baselines3.common.callbacks import BaseCallback
class SaveOnBestTrainingRewardCallback(BaseCallback):
"""
Callback for saving a model (the check is done every ``check_freq`` steps)
based on the training reward (in practice, we recommend using ``EvalCallback``).
:param check_freq:
:param log_dir: Path to the folder where the model will be saved.
It must contains the file created by the ``Monitor`` wrapper.
:param verbose: Verbosity level: 0 for no output, 1 for info messages, 2 for debug messages
"""
def __init__(self, check_freq: int, log_dir: str, verbose: int = 1):
super().__init__(verbose)
self.check_freq = check_freq
self.log_dir = log_dir
self.save_path = os.path.join(log_dir, "best_model")
self.best_mean_reward = -np.inf
def _init_callback(self) -> None:
# Create folder if needed
if self.save_path is not None:
os.makedirs(self.save_path, exist_ok=True)
def _on_step(self) -> bool:
if self.n_calls % self.check_freq == 0:
# Retrieve training reward
x, y = ts2xy(load_results(self.log_dir), "timesteps")
if len(x) > 0:
# Mean training reward over the last 100 episodes
mean_reward = np.mean(y[-100:])
if self.verbose >= 1:
print(f"Num timesteps: {self.num_timesteps}")
print(f"Best mean reward: {self.best_mean_reward:.2f} - Last mean reward per episode: {mean_reward:.2f}")
# New best model, you could save the agent here
if mean_reward > self.best_mean_reward:
self.best_mean_reward = mean_reward
# Example for saving best model
if self.verbose >= 1:
print(f"Saving new best model to {self.save_path}")
self.model.save(self.save_path)
return True
Description
A callback for saving the best-performing model during training was evaluated based on the mean reward over the last episodes.
Parameters
Methods
Code Example
# Create log dir
log_dir = "tmp/"
os.makedirs(log_dir, exist_ok=True)
# Create and wrap the environment
env = gym.make("LunarLanderContinuous-v3")
env = Monitor(env, log_dir)
# Add some action noise for exploration
n_actions = env.action_space.shape[-1]
action_noise = NormalActionNoise(mean=np.zeros(n_actions), sigma=0.1 * np.ones(n_actions))
# Because we use parameter noise, we should use a MlpPolicy with layer normalization
model = TD3("MlpPolicy", env, action_noise=action_noise, verbose=0)
# Create the callback: check every 1000 steps
callback = SaveOnBestTrainingRewardCallback(check_freq=1000, log_dir=log_dir)
# Train the agent
timesteps = 1e5
model.learn(total_timesteps=int(timesteps), callback=callback)
plot_results([log_dir], timesteps, results_plotter.X_TIMESTEPS, "TD3 LunarLander")
plt.show()
<aside> 💡
For more details, see the original file CSP-Gym/callbacks/reward_callback.py
</aside>
The base class for our callbacks is
BaseCallback
for stable_baselines3, see - https://stable-baselines3.readthedocs.io/en/master/guide/callbacks.html#custom-callback。
Function: draw_multi(data)
from utils.csv_read import load_csv
import numpy as np
import matplotlib.pyplot as plt
# Define a constant representing the maximum integer value
__INT_MAX = (1 << 31) - 1
def draw_multi(data):
"""
Draws a plot for multiple algorithms with confidence intervals using CSV data.
Args:
data (dict): Dictionary with algorithm names as keys and lists of CSV file paths as values.
Returns:
None
"""
rewards = {}
stk_top = __INT_MAX
# Set figure size using a custom set_size function, designed for thesis-level plotting
fig = plt.figure(figsize=set_size('thesis', 0.99, (1, 1), height_add=0))
ax = fig.subplots(ncols=1)
for algorithm, locations in data.items():
# Load rewards data from CSV files
rewards[algorithm] = [load_csv(locations[i])[1] for i in range(len(locations))]
# Apply a moving average with a window size of 10 for smoothing
for i in range(len(rewards[algorithm])):
rewards[algorithm][i] = moving_average(rewards[algorithm][i], 10)
# Track the shortest data length for consistent plotting
if len(rewards[algorithm][i]) < stk_top:
stk_top = len(rewards[algorithm][i])
# Trim all reward data to the shortest length
for i in range(len(rewards[algorithm])):
rewards[algorithm][i] = rewards[algorithm][i][:stk_top]
# Calculate mean and standard deviation across trials
reward_mean = np.mean(np.array(rewards[algorithm]), axis=0)
reward_std = np.std(np.array(rewards[algorithm]), axis=0)
# Generate x-axis values
x = list(range(stk_top))
# Calculate the confidence interval (95%)
ci = (reward_mean - 2 * reward_std, reward_mean + 2 * reward_std)
# Plot mean with shaded confidence interval
ax.plot(x, reward_mean, label=algorithm)
ax.fill_between(x, ci[0], ci[1], alpha=0.2)
# Configure plot aesthetics
plt.legend()
plt.title("")
plt.xlabel("Episode Number")
plt.ylabel("Average Reward")
# Save the plot as a PDF
plt.savefig('res_fig.pdf', format='pdf')
Description
Draws a multi-line plot with shaded confidence intervals using data from multiple algorithms. Each algorithm's results are averaged, and the standard deviation is used to calculate the confidence intervals. The plot is saved as a PDF.
Parameters
data
(dict): A dictionary where keys are algorithm names (str) and values are lists of file paths (str) containing CSV data. Each CSV is expected to have reward values.Code Example
# Example data
data = {
"Algorithm_A": ["path/to/file1.csv", "path/to/file2.csv"],
"Algorithm_B": ["path/to/file3.csv"]
}
draw_multi(data)
<aside> 💡
For more details, see the original file CSP-Gym/utils/plot_tool.py
</aside>
We recommend defining the reward function based on the potential energy because, in the minimization task, the potential energy is as small as possible. Still, the reward needs to be maximized, so a negative sign on the potential energy at the end confirms the positive correlation line between the potential energy and the reward. Also, note that we may need to do some scaling of the potential energy, with rewards beyond the computer number representation.
You can directly integrate existing potential functions from other Python libraries or customize your own. We provide two examples.
Using Self-Defined Function
you can modify the _compute_energy
method like this:
class CSPEnvSiC(gym.Env):
...
def _compute_energy(self, state):
...
return energy
With this setup, you now have a custom energy function that can calculate the potential energy of a system based on the atomic positions, and the reward will depend on this energy calculation. You can adjust any parameters in the energy function to match the physical system you're modeling.
Overview
CSPEnv
is a custom OpenAI Gymnasium environment designed for optimizing the atomic structure of an Argon (Ar) crystal unit cell. The environment simulates atomic arrangements and evaluates their potential energy using LAMMPS. For full implementation details, see the original file CSP-Gym/environments/Ar.py
.
Environment Details
(13,)
(1 lattice parameter + 3D coordinates for 4 atoms).(13,)
with each action ranging between 0.1
and 0.1
.Classes and Functions
compute_energy_custom_positions(positions, box_size=5.41)
Computes the potential energy of a given atomic configuration using LAMMPS.
Parameters:
positions
: A (4,3)
array of atomic Cartesian coordinates.box_size
: Length of the cubic simulation box (default: 5.41
Å).Returns:
energy
: The potential energy of the system after energy minimization.CSPEnv(gym.Env)
Custom reinforcement learning environment for optimizing Argon crystal structures.
Methods:
reset(seed=None, options=None) -> (np.array, dict)
: Resets the environment and returns the initial state.step(action) -> (np.array, float, bool, bool, dict)
: Applies an action, updates the state, and computes the reward._compute_energy(state) -> float
: Computes energy from the given lattice and atomic coordinates.render()
: Prints the current state.close()
: Cleans up resources.CSPEnergyReward(BaseReward)
Training
The environment supports reinforcement learning using Stable-Baselines3
algorithms like PPO, SAC, TD3, A2C, and DDPG.
Example Training Script:
import gymnasium as gym
import numpy as np
import matplotlib.pyplot as plt
from stable_baselines3 import PPO, SAC, TD3, A2C, DDPG
from stable_baselines3.common.monitor import Monitor
from callbacks.reward_callback import SaveOnBestTrainingRewardCallback
from stable_baselines3.common.results_plotter import plot_results
from stable_baselines3.common import results_plotter
from datetime import datetime
timesteps = int(5e5)
def train_model(env, algorithm, total_timesteps, callback):
if algorithm == "PPO":
model = PPO("MlpPolicy", env, verbose=1)
elif algorithm == "SAC":
model = SAC("MlpPolicy", env, verbose=1)
elif algorithm == "TD3":
model = TD3("MlpPolicy", env, verbose=1)
elif algorithm == "A2C":
model = A2C("MlpPolicy", env, verbose=1)
elif algorithm == "DDPG":
model = DDPG("MlpPolicy", env, verbose=1)
else:
raise ValueError("Invalid algorithm")
model.learn(total_timesteps=total_timesteps, callback=callback)
return model
algorithms = ['PPO', 'SAC', 'TD3', 'A2C', 'DDPG']
for _ in range(3):
for algorithm in algorithms:
current_time = datetime.now().strftime('%Y%m%d_%H%M%S')
log_dir = f'results/Ar/{algorithm}/{current_time}'
callback = SaveOnBestTrainingRewardCallback(check_freq=10000, log_dir=log_dir, verbose=1)
os.makedirs(log_dir, exist_ok=True)
env = CSPEnv(num_atoms=4, reward_class=CSPEnergyReward)
env = Monitor(env, log_dir)
train_model(env, algorithm, timesteps, callback)
plt.clf()
plot_results([log_dir], timesteps, results_plotter.X_TIMESTEPS, "Total Rewards Comparison (Ar)")
plt.savefig(os.path.join(log_dir, f'res_fig.png'))
Overview
CSPEnvSiC is a custom OpenAI Gymnasium environment designed for optimizing the structure of a Silicon Carbide (SiC) unit cell. The environment simulates atomic arrangements and evaluates their potential energy using LAMMPS. For more details, see the original file CSP-Gym/environments/SiC.py
Environment Details
Classes and Functions
import gymnasium as gym
import numpy as np
import matplotlib.pyplot as plt
from stable_baselines3 import PPO, SAC, TD3, A2C, DDPG
from stable_baselines3.common.monitor import Monitor
from callbacks.reward_callback import SaveOnBestTrainingRewardCallback
from stable_baselines3.common.results_plotter import plot_results
from stable_baselines3.common import results_plotter
from datetime import datetime
timesteps = int(5e5)
def train_model(env, algorithm, total_timesteps, callback):
if algorithm == "PPO":
model = PPO("MlpPolicy",env, verbose=1)
elif algorithm == "SAC":
model = SAC("MlpPolicy", env, verbose=1)
elif algorithm == "TD3":
model = TD3("MlpPolicy", env, verbose=1)
elif algorithm == "A2C":
model = A2C("MlpPolicy", env, verbose=1)
elif algorithm == "DDPG":
model = DDPG("MlpPolicy", env, verbose=1)
else:
raise ValueError("Invalid algorithm")
model.learn(total_timesteps=total_timesteps, callback=callback)
return model
algorithms = ['PPO', 'SAC', 'TD3', 'A2C', 'DDPG']
for _ in range(3):
for algorithm in algorithms:
current_time = datetime.now().strftime('%Y%m%d_%H%M%S')
log_dir = f'results/SiC/{algorithm}/{current_time}'
callback = SaveOnBestTrainingRewardCallback(check_freq=10000, log_dir=log_dir, verbose=1)
os.makedirs(log_dir, exist_ok=True)
env = CSPEnvSiC(num_atoms=4, reward_class=CSPEnergyReward)
env = Monitor(env, log_dir)
train_model(env, algorithm, timesteps, callback)
plt.clf()
plot_results([log_dir], timesteps, results_plotter.X_TIMESTEPS, "Total Rewards Comparison (SiC)")
plt.savefig(os.path.join(log_dir, f'res_fig.png'))
To comprehensively evaluate the method's performance in crystal structure prediction, the framework uses the following evaluation metrics:
$\begin{cases} \text{EE} :=\frac{1}{N} \left| E_\text{pred} - E_\text{real} \right|, \quad [\text{eV/atom}] \\\\ \text{SS} := \sqrt{\frac{1}{N}\sum_{i = 1}^{N} \left\lVert r_{i,\text{pred}} - r_{i,\text{true}} \right\rVert^2}, \quad [\text{Å}] \\\\ \text{CR} := \frac{1}{M}\sum_{j = 1}^{M} I\left( \text{dist}(S_j,S_\text{stable}) \leq \epsilon \right), \quad \in [0,1] \\\\ \text{CC} := \sum_{t = 1}^{T} \tau_t \cdot G_t, \quad [\text{GPU-hours}] \end{cases}$
1. Energy Error (EE)
Measures the difference between the predicted structure's total energy and the reference energy obtained from LAMMPS.