hii everyone! im new to python and i need some help about the code: its about 2 parts, one is game and another one is for counting the successful rate using morte-carlo method
this one is game.py:
import gym
import numpy as np
import random
import json
ACTIONS = [(1, 0), (-1, 0), (0, 1), (0, -1)]
NACTIONSPACE = 4
NSTATES = 16
MAPSHAPE = (4, 4)
END_STATE = 'G'
START_STATE = 'S'
ICE_HOLE_STATE = 'H'
SAFE_STATE = 'F'
# !!!!!
GOLD_STATE = 'g'
def state_int2tuple(state:int)->tuple[int, int]:
return (state // 4, state % 4)
def state_tuple2int(state:tuple[int, int])->int:
return state[0] * 4 + state[1]
def create_env()->list:
env = gym.make("FrozenLake-v1")
env.reset()
state = env.render('ansi')
state = list(filter(lambda x : x == 'F' or x == 'S' or x == 'H' or x == 'G', state))
state = [state[i * 4: (i + 1) * 4] for i in range(4)]
# !!!!!!!
empty_cells = [(r, c) for r in range(4) for c in range(4) if state[r][c] == SAFE_STATE]
gold_pos = random.choice(empty_cells)
state[gold_pos[0]][gold_pos[1]] = 'g'
# !!!!!!!!
return state
def reset(map:list[list])->list[int, list[str]]:
for i in range(16):
pos = state_int2tuple(i)
if map[pos[0]][pos[1]] == 'S':
map[pos[0]][pos[1]] = 'F'
safe.append(i)
elif map[pos[0]][pos[1]] == 'F':
safe.append(i)
else:
continue
new_s = safe[random.randint(0, len(safe) - 1)]
new_pos = state_int2tuple(new_s)
map[new_pos[0]][new_pos[1]] = 'S'
return [new_s, map]
def action_mask(map:list[list], state:int)->list[int]:
row,col=state_int2tuple(state)
a=[]
for i,(row_,col_) in enumerate(ACTIONS):
new_row,new_col=row + row_,col + col_
if 0 <= new_row < MAPSHAPE[0] and 0 <= new_col < MAPSHAPE[1]:
a.append(i)
return a
def step(map:list[list], state:int, action:int)->tuple[int, int, bool, bool]:
row,col=state_int2tuple(state)
row_,col_=ACTIONS[int(action)]
new_row,new_col=row + row_,col + col_
if not (0 <= new_row < MAPSHAPE[0] and 0 <= new_col < MAPSHAPE[1]):
return state, 0, False, False
new_state=state_tuple2int((new_row,new_col))
place=map[new_row][new_col]
if place == ICE_HOLE_STATE:
return new_state, 0, True, False
elif place == END_STATE:
return new_state, 1, True, False
# !!!!!!!!!!
elif place == GOLD_STATE:
map[new_row][new_col] = SAFE_STATE
return new_state, 100, False, True
# !!!!!!!!
else:
return new_state, -1, False, False
def save_game(state: int, map: list[list]):
with open('savefile.json','w') as f:
json.dump({'state':state,'map':map},f)
def load_game():
try:
with open('savefile.json','r') as f:
saved=json.load(f)
return saved['state'],saved['map']
except FileNotFoundError:
return None
def player_map(map:list[list],state:int):
row,col=state_int2tuple(state)
map_=[row.copy() for row in map]
map_[row][col] = 'P'
for i in map_:
print(' '.join(i))
`if __name__ == '__main__':
map=create_env()
state,map=reset(map)
# !!!!!!!
reward_total = 0
while True:
print("\nmap:")
player_map(map,state)
print(f"position:{state_int2tuple(state)}")
# !!!!!!
print(f"reward: {reward_total}")
a = input("input:").strip().lower()
if a == 'quit':
print("bye!")
break
elif a == 'restart':
print("restarting...")
map=create_env()
state,map=reset(map)
# !!!!!!!!!!
reward_total = 0
print("success!")
continue
elif a == 'save':
save_game(state,map)
print("saved!")
continue
elif a == 'load':
file=load_game()
if file:
state,map=file
print("load!")
else:
print("None")
continue
action={'w':1,'s':0,'a':3,'d':2}
if a in action:
action_ = action[a]
if action_ in action_mask(map,state):
state,reward,end, gold = step(map,state,action_)
# !!!!!!!
reward_total += reward
# !!!!!!!!!!
if gold:
print("reward+100")
if end:
if reward == 1:
print("\ncongrats!")
else:
print("\noops!")
break
else:
print("out of map!")
else:
print("Unknown action")
And then anoher one is policy.py:
(based on morte carlo method to count the succuesful rate and iteration)
from tensorboardX import SummaryWriter
from tqdm import tqdm
import gym
import numpy as np
from random import random
from game import action_mask, NSTATES, NACTIONSPACE, reset, step, create_env
def sample_actions(map:list[list], state:int)->int:
a = action_mask(map,state)
return np.random.choice(a)
def compute_qpi_MC(pi:list[int], map:list[list], gamma:float, epsilon:float, num_episodes=1000):
Q = np.zeros((NSTATES, NACTIONSPACE), dtype=np.float32)
N = np.zeros((NSTATES, NACTIONSPACE), dtype=np.int64)
for i in range(num_episodes):
state, map = reset(map)
episode = []
while True:
# ====================================================
action = 0
if np.random.rand() < epsilon:
action = sample_actions(map,state)
else:
action = pi[state]
# ===================================================
next_state, reward, done, gold = step(map, state, action)
episode.append((state, action, reward))
if done == True:
episode.append((next_state, None, reward))
break
state = next_state
visited = set()
G = 0
for _, (state, action, reward) in enumerate(reversed(episode)):
# ==================================
G = gamma * G + reward
# =================================
if action == None:
continue
sa = (state, action)
if sa not in visited:
visited.add(sa)
state = int(state)
action = int(action)
# ============================
N[state][action] += 1
Q[state][action] = Q[state][action] + ((G - Q[state][action]) / N[state][action])
# ===============================================
return Q
def policy_iteration_MC(map:list[list], gamma:float, eps0:float=0.5, decay:float=0.1, num_episodes:int=100000,\
diff_p:bool=False, decay_f=None, writer=None, name='LGD!!')->np.array:
pi = np.zeros(NSTATES)
iteration = 1
while True:
if diff_p:
epsilon = decay_f(eps0, decay, iteration)
else:
epsilon = eps0/(1+decay*iteration)
Q = compute_qpi_MC(pi, map, gamma, epsilon, num_episodes)
new_pi = Q.argmax(axis=1)
if (pi != new_pi).sum() == 0:
print(iteration)
result = test_pi(map, new_pi)
writer.add_scalar(f'/MC3/{name}', result, global_step=iteration)
return new_pi
pi = new_pi
iteration = iteration + 1
if iteration % 1 == 0:
result = test_pi(map, new_pi)
writer.add_scalar(f'/MC3/{name}', result, global_step=iteration)
def test_pi(map:list[list], pi, num_episodes=1000):
count = 0
for e in range(num_episodes):
ob, map = reset(map)
while True:
a = pi[ob]
# !!!!!
ob, rew, done, gold = step(map, ob, a)
if done:
count += 1 if rew == 1 else 0
break
return count / num_episodes
if __name__ == '__main__':
map = create_env()
writer = SummaryWriter('./result')
pi = policy_iteration_MC(map, 0.95, decay=0.1, writer=writer, name=f'basic')
print(f'Successful Rate: {test_pi(map, pi)}')
print('finish')
for the game.py part it can work, but when i run the policy.py part, it seems like looping nonstop and no output T.T and then ive try to debug and found that the problem maybe occurs at the if place == GOLD_STATE part in STEP function in game.py, but i cannot figure out whats the problem is…can someone with more experience help me solve this?