chrome 浏览器里面有一个小彩蛋,当你没有网络时,打开任意的 url 都会出现一个恐龙小游戏(dino run),按空格键就可以跳跃。当然,直接打开 chrome://dino 也可以玩这个小游戏。近期,一名来自东北大学(美国)的研究生 ravi munde 利用强化学习,实现了对 dino run 的控制。
以下内容来自 ravi munde 博客,人工智能头条编译:
本文将从强化学习的基础开始,并详细介绍以下几个步骤:
在浏览器(javascript)和模型(python)之间构建双向接口
捕获和预处理图像
训练模型
评估
▌强化学习
对许多人来说,强化学习可能是一个新词,但其实小孩学步利用的就是强化学习(rl)的概念,这也是我们的大脑仍然工作的方式。奖励系统是任何 rl 算法的基础,就像小孩学步的阶段,积极的奖励将是来自父母的鼓掌或糖果,而负面奖励则是没有糖果。孩子在开始走路之前首先学会站起来。就人工智能而言,智能体(agent)的主要目标(在我们的案例中是 dino)是通过在环境中执行特定的操作序列来最大化某个数字奖励。rl 中最大的挑战是缺乏监督(标记数据)来指导智能体,它必须自己探索和学习。智能体从随机行动开始,观察每个行动带来的回报,并学习如何在面临类似环境状况时预测最佳行动。
图注:vanilla 强化学习框架
▌q-learning
我们使用 q-learning(rl 中的一种)来尝试逼近一个特殊函数,这个函数可以驱动任何环境状态序列的动作选择策略。q-learning 是 rl 的一种无模型实现,针对每个状态、采取的行动和得到的奖励来更新 q-table,它能让我们了解数据的结构。在我们的例子中,状态是游戏的截图、行动、不动、跳[0,1]。
我们通过回归方法来解决这个问题,并选择具有最高预测 q 值的动作。
图注:q-table 样本
▌设置
首先设置环境:
1、选择虚拟机
我们需要一个完整的桌面环境,在这里我们可以捕获和利用屏幕截图对模型进行训练。我选择了 paperspace ml-in-a-box(mliab)ubuntu 镜像。mliab 的优势在于它预装了anaconda 和许多其他 ml 库。
2、设置和安装 keras 以使用gpu
paperspace 的虚拟机已经预先安装了,如果没有的话,可以按照下面的方式:
pip install keraspip install tensorflow
另外,为了确保 gpu 可以被设置识别,执行下面的 python 代码,你应该看到可用的 gpu 设备:
from keras import backend as kk.tensorflow_backend._get_available_gpus()
3、安装 dependencies
selenium:
pip install selenium
opencv:
pip install opencv-python
下载 chromedrive:
http://chromedriver.chromium.org
▌游戏框架
打开 chrome://dino,按空格键就可以玩这个游戏了。如果需要修改游戏代码,就要 chromium 的开源库中提取游戏了。
由于这个游戏是用 javascript 写的,而我们的模型是用 python 写的,因此我们需要运用到一些接口工具。
selenium 是一个比较流行的浏览器自动化工具,用于向浏览器发送操作,并获取当前分数等不同的游戏参数。
在有了发送操作的接口之后,我们还需要一种捕获游戏画面的机制:
selenium 和 opencv 分别为屏幕捕获和图像预处理提供了最佳性能,可实现 6-7 fps 的帧率。
游戏模块
我们使用这个模块实现了 python 和 javascript 之间的接口,下面的代码可以让你知道模块的实现原理:
class game: def __init__(self): self._driver = webdriver.chrome(executable_path = chrome_driver_path) self._driver.set_window_position(x=-10,y=0) self._driver.get(game_url) def restart(self): self._driver.execute_script(runner.instance_.restart()) def press_up(self): self._driver.find_element_by_tag_name(body).send_keys(keys.arrow_up) def get_score(self): score_array = self._driver.execute_script(return runner.instance_.distancemeter.digits) score = ''.join(score_array). return int(score)
智能体模块
我们使用智能体模块来封装所有接口。我们使用此模块控制 dino,并获取智能体在环境中的状态。
class dinoagent: def __init__(self,game): #takes game as input for taking actions self._game = game; self.jump(); #to start the game, we need to jump once def is_crashed(self): return self._game.get_crashed() def jump(self): self._game.press_up()
游戏状态模块
为了将动作发送到模块并获得相应的结果状态,我们使用了 game-state 模块。它通过接收和执行操作来简化流程,决定奖励并返回经验元组。
class game_sate: def __init__(self,agent,game): self._agent = agent self._game = game def get_state(self,actions): score = self._game.get_score() reward = 0.1 #survival reward is_over = false #game over if actions[1] == 1: #else do nothing self._agent.jump() image = grab_screen(self._game._driver) if self._agent.is_crashed(): self._game.restart() reward = -1 is_over = true return image, reward, is_over #return the experience tuple
▌图像通道
图像捕捉
我们可以通过多种方式捕获游戏画面,例如使用 pil 和 mss python 库截取整个屏幕,并裁剪感兴趣区域(regionofinterest, roi)。然而,这个方法最大的缺点是对屏幕分辨率和窗口位置的敏感度问题。幸运的是,该游戏使用了 html canvas,我们可以使用 javascript 轻松获得 base64 格式的图像。现在,我们使用 selenium 来运行这个脚本。
#javascript code to get the image data from canvasvar canvas = document.getelementsbyclassname('runner-canvas')[0];var img_data = canvas.todataurl()return img_data
def grab_screen(_driver = none): image_b64 = _driver.execute_script(getbase64script) screen = np.array(image.open(bytesio(base64.b64decode(image_b64)))) image = process_img(screen)#processing image as required return image
图像处理
捕捉到的原始图像的分辨率为 600x150,具有 3 通道(rgb)。我们打算使用 4 个连续的屏幕截图作为模型的单个输入,这使得我们单个输入的尺寸为 600x150x3x4。输入太大,需要消耗大量的计算力,而且并不是所有的特征都是有用的,所以我们使用 opencv 库来调整、裁剪和处理图像。最终处理后的输入仅为 80x80 像素,而且是单通道(灰度,grey scale)。
def process_img(image): image = cv2.cvtcolor(image, cv2.color_bgr2gray) image = image[:300, :500] return image
图注:图像处理
模型架构
现在让我们看看模型架构。我们使用一系列的三个卷积层,然后将它们展平为密集层和输出层。针对 cpu 的模型不包括池化层,因为我已经删除了许多特征,添加池化层会导致本已稀疏的特征大量丢失。但有了 gpu 之后,我们的模型可以容纳更多的特征,而不用降低帧率。
最大池化图层显著改善了密集要素集的处理过程。
图注:模型架构
输出层由两个神经元组成,每个神经元代表每个动作的最大预测回报。然后我们选择具有最大回报( q值)的动作。
def buildmodel():
print(now we build the model)
model = sequential()
model.add(conv2d(32, (8, 8), padding='same',strides=(4, 4),input_shape=(img_cols,img_rows,img_channels))) #80*80*4
model.add(maxpooling2d(pool_size=(2,2)))
model.add(activation('relu'))
model.add(conv2d(64, (4, 4),strides=(2, 2), padding='same'))
model.add(maxpooling2d(pool_size=(2,2)))
model.add(activation('relu'))
model.add(conv2d(64, (3, 3),strides=(1, 1), padding='same'))
model.add(maxpooling2d(pool_size=(2,2)))
model.add(activation('relu'))
model.add(flatten())
model.add(dense(512))
model.add(activation('relu'))
model.add(dense(actions))
adam = adam(lr=learning_rate)
model.compile(loss='mse',optimizer=adam)
print(we finish building the model)
return model
▌训练
以静止开始,并获得初始状态(s_t)
观察步骤数量
预测并执行操作
在 replay memory 中存储经验
从 replay memory 中随机选择一个批次并在此基础上训练模型
游戏结束后重新开始
def trainnetwork(model,game_state):
# store the previous observations in replay memory
d = deque() #experience replay memory
# get the first state by doing nothing
do_nothing = np.zeros(actions)
do_nothing[0] =1 #0 => do nothing,
#1=> jump x_t, r_0, terminal = game_state.get_state(do_nothing) # get next step after performing the action
s_t = np.stack((x_t, x_t, x_t, x_t), axis=2).reshape(1,20,40,4) # stack 4 images to create placeholder input reshaped 1*20*40*4
observe = observation epsilon = initial_epsilon t = 0 while (true): #endless running
loss = 0
q_sa = 0
action_index = 0
r_t = 0 #reward at t
a_t = np.zeros([actions]) # action at t
q = model.predict(s_t)
#input a stack of 4 images, get the prediction
max_q = np.argmax(q)
# chosing index with maximum q value
action_index = max_q
a_t[action_index] = 1
# o=> do nothing, 1=> jump
#run the selected action and observed next state and reward
x_t1, r_t, terminal = game_state.get_state(a_t)
x_t1 = x_t1.reshape(1, x_t1.shape[0], x_t1.shape[1], 1) #1x20x40x1
s_t1 = np.append(x_t1, s_t[:, :, :, :3], axis=3) # append the new image to input stack and remove the first one
d.append((s_t, action_index, r_t, s_t1, terminal))# store the transition
#only train if done observing; sample a minibatch to train on
trainbatch(random.sample(d, batch)) if t > observe else 0
s_t = s_t1
t += 1
请注意,我们正在从 replay memory 中抽样 32 个随机经验重放,并使用分批训练的方法。这样做的原因是游戏结构中的动作分布不平衡以及避免过度拟合。
def trainbatch(minibatch): for i in range(0, len(minibatch)):
loss = 0
inputs = np.zeros((batch, s_t.shape[1], s_t.shape[2], s_t.shape[3])) #32, 20, 40, 4
targets = np.zeros((inputs.shape[0], actions))
#32, 2
state_t = minibatch[i][0] # 4d stack of images
action_t = minibatch[i][1] #this is action index
reward_t = minibatch[i][2] #reward at state_t due to action_t
state_t1 = minibatch[i][3] #next state
terminal = minibatch[i][4] #wheather the agent died or survided due the action
inputs[i:i + 1] = state_t
targets[i] = model.predict(state_t) # predicted q values
q_sa = model.predict(state_t1)
#predict q values for next step
if terminal:
targets[i, action_t] = reward_t # if terminated, only equals reward
else:
targets[i, action_t] = reward_t + gamma * np.max(q_sa)
loss += model.train_on_batch(inputs, targets)
结果
我们通过使用这种架构获得了良好的结果。下图显示了训练开始时的平均分数,训练结束时,每 10 场比赛的平均得分远远高于 1000 。
最高分数记录是 4000 +,远远超过了之前模型的的 250 分(也远远超过了大多数人所能做到的!) 。下图显示了训练期间比赛最高得分的进度。
dino 的速度与分数成正比,这使得在更高的速度下检测和决定一个动作更加困难。因此,整个游戏都是以恒定速度训练的。
在CPLD管理下实现高效多串口中断源
监测机器微小振动的图像动态,保障机器的正常运行!
iPhone X使用后总结出的6大槽点
如何计算处于活动状态的CAN收发器的功耗?
了解Goooood®狗帝融合CDN: 类型和部署选项
Ravi Munde利用强化学习,实现了对Dino Run的控制
2021年将是DDR5的DRAM竞争,手机存储容量升级驱动移动存储器需求
阿里云盘重新上架苹果 App Sotre:1T 存储空间 + 5G 极速下载
自动驾驶行业动荡:阿里达摩院撤出,图森未来面临退市
基于功率MOSFET的H桥PWM DC电机驱动器
无线中继器安装操作步骤
RFID感知层技术的发展得到了怎样的好机会
小米笔记本Air4G版:每月4GB流量的笔电,你需要吗?
光纤配线架在综合布线中充当什么角色
高清晰度数字电视传输系统设计与实现方案
i.MXRT上提升代码执行性能的十八般武艺
消息称印度授权苹果、联想等110家公司进口笔记本电脑和平板电脑
国产工业电脑的产品优势有哪些
萌新必看:嵌入式该怎么学
大陆为自动驾驶汽车提供了人机交互整体解决方案