| import os | |
| import sys | |
| import gym | |
| import random | |
| import numpy as np | |
| import pickle | |
| from collections import deque | |
| from keras.layers import Dense | |
| from keras.optimizers import Adam | |
| from keras.models import Sequential | |
| from matplotlib import pyplot as plt | |
| WEIGHTS_PATH = './trained_models/CartPole-v0/1/' | |
| BUFFER_PATH = './buffers/CartPole-v0/1/' | |
| class Agent: | |
| def __init__(self, algorithm, state_size, action_size): | |
| self.algorithm = algorithm | |
| self.render = False | |
| self.state_size = state_size | |
| self.action_size = action_size | |
| self.memory = deque(maxlen=2000) | |
| if self.algorithm in ['DQN', 'DDQN', 'DQV']: | |
| self.model = self.build_model() | |
| self.model.load_weights(os.path.join(WEIGHTS_PATH, self.algorithm, 'trained_model.h5')) | |
| else: | |
| self.model = self.build_actor() | |
| self.model.load_weights(os.path.join(WEIGHTS_PATH, self.algorithm, 'trained_model.h5')) | |
| def build_actor(self): | |
| actor = Sequential() | |
| actor.add(Dense(24, input_dim=self.state_size, activation='relu', kernel_initializer='he_uniform')) | |
| actor.add(Dense(self.action_size, activation='softmax', kernel_initializer='he_uniform')) | |
| return actor | |
| def build_model(self): | |
| model = Sequential() | |
| model.add(Dense(24, input_dim=self.state_size, activation='relu', | |
| kernel_initializer='he_uniform')) | |
| model.add(Dense(24, activation='relu', | |
| kernel_initializer='he_uniform')) | |
| model.add(Dense(self.action_size, activation='linear', | |
| kernel_initializer='he_uniform')) | |
| return model | |
| def get_action(self, state): | |
| if self.algorithm == 'A2C': | |
| policy = self.model.predict(state, batch_size=1).flatten() | |
| return np.random.choice(self.action_size, 1, p=policy)[0] | |
| else: | |
| q_value = self.model.predict(state) | |
| return np.argmax(q_value[0]) | |
| def append_sample(self, state, action, reward, next_state, done): | |
| self.memory.append((state, action, reward, next_state, done)) | |
| def save_buffer(self): | |
| if not os.path.exists(os.path.join(BUFFER_PATH, self.algorithm)): | |
| os.makedirs(os.path.join(BUFFER_PATH, self.algorithm)) | |
| with open(os.path.join(BUFFER_PATH, self.algorithm, 'memory_buffer.p'), 'wb') as filehandler: | |
| pickle.dump(self.memory, filehandler) | |
| def fill_buffer(algorithm): | |
| max_len = 10000 | |
| results = [] | |
| game = 'CartPole-v0' | |
| env = gym.make(game) | |
| state_size = env.observation_space.shape[0] | |
| action_size = env.action_space.n | |
| agent = Agent(algorithm, state_size, action_size) | |
| while True: | |
| done = False | |
| score = 0 | |
| state = env.reset() | |
| state = np.reshape(state, [1, state_size]) | |
| while not done: | |
| action = agent.get_action(state) | |
| next_state, reward, done, info = env.step(action) | |
| next_state = np.reshape(next_state, [1, state_size]) | |
| agent.append_sample(state, action, reward, next_state, done) | |
| score += reward | |
| state = next_state | |
| if len(agent.memory) > max_len: | |
| agent.save_buffer() | |
| break | |
| fill_buffer('DQN') | |