tnblog
首页
视频
资源
登录

JetBot 自主避障

1414人阅读 2024/5/23 11:33 总访问:3467183 评论:0 收藏:0 手机
分类: .net后台框架

JetBot 自主避障

数据采集


要实现自主避障首先我们需要采集数据,所以我们需要一个采集障碍物和非障碍物的图片。
工作目录在code文件夹下面,这个目录我们需要自己创建。

控制界面采集


创建一个dataset的文件夹,并定义一个getdata.py的文件,通过采集界面进行采集图片。

  1. import ipywidgets.widgets as widgets
  2. from IPython.display import display
  3. from jetbot import camera, image
  4. image = widgets.Image(format='jpeg',width=224,height=224)
  5. display(image)
  6. # 创建摄像头
  7. from jetbot import Camera
  8. camera = Camera.instance(width=224,height=224)
  9. # 关联摄像头和画布
  10. import traitlets
  11. from jetbot import bgr8_to_jpeg
  12. camera_link = traitlets.dlink((camera, 'value'), (image, 'value'), transform=bgr8_to_jpeg)
  13. # 添加释放资源按钮
  14. rb = widgets.Button(description='release carmera')
  15. def release_camera(x):
  16. camera_link.unlink()
  17. camera.stop()
  18. print("release camera ok")
  19. rb.on_click(release_camera)
  20. display(rb)
  21. # 创建存储目录
  22. import os
  23. blocked_dir = 'dataset/blocked'
  24. free_dir = 'dataset/free'
  25. try:
  26. os.makedirs(free_dir)
  27. os.makedirs(blocked_dir)
  28. except FileExistsError:
  29. print('Dir not create')
  30. # 创建采集界面
  31. btn_Layout = widgets.Layout(width='128px',height='64px')
  32. free_btn = widgets.Button(description='add free',button_style='success',layout=btn_Layout)
  33. block_btn = widgets.Button(description='add block',button_style='danger',layout=btn_Layout)
  34. free_count = widgets.IntText(value=len(os.listdir(free_dir)))
  35. block_count = widgets.IntText(value=len(os.listdir(blocked_dir)))
  36. display(widgets.HBox([free_count,free_btn]))
  37. display(widgets.HBox([block_count,block_btn]))
  38. # 生成随机ID号
  39. from uuid import uuid1
  40. # 保存文件
  41. def save_snapshot(directory):
  42. # 生产文件路径名
  43. image_path = os.path.join(directory,str(uuid1())+'.jpg')
  44. with open(image_path, 'wb') as f:
  45. f.write(image.value)
  46. # 保存无主档的图片
  47. def save_free():
  48. global free_dir, free_count
  49. save_snapshot(free_dir)
  50. free_count.value = len(os.listdir(free_dir))
  51. # 统计图片的数目
  52. def save_blocked():
  53. global blocked_dir, block_count
  54. save_snapshot(blocked_dir)
  55. block_count.value = len(os.listdir(blocked_dir))
  56. # 设置free按钮点击时回调函数为save_free
  57. free_btn.on_click(lambda x:save_free())
  58. # 设置block按钮点击时回调函数为save_blocked
  59. block_btn.on_click(lambda x:save_blocked())


在同一个目录下又创建一个ExecGetData.ipynb执行getdata.py文件。

  1. %run getdata.py


然后通过不同的角度去拍摄不同的图片,将图片通过点击add free和add block按钮分别添加图片到dataset/freedataset/blocked文件夹中。(add free收集没有障碍物的,add block 收集有障碍物的)

拍摄完成后点击release carmera进行释放。

在收集图片的时候最好都多收集些,至少几百张。

当你收集一段时间后就会发现,每次都需要切换角度太麻烦了。所以我改用了使用手柄进行采集。

手柄采集


我这里直接贴代码了,如果不知道如何对接手柄的可以参考我这篇文章(https://www.tnblog.net/hb/article/details/8376)。
贴部分代码,完整代码在jetbot/notebooks/teleoperation,只修改了最后几段的代码。

  1. import uuid
  2. #import subprocess
  3. #subprocess.call(['mkdir', '-p', 'snapshots'])
  4. snapshot_image = widgets.Image(format='jpeg', width=300, height=300)
  5. def save_free_snapshot(change):
  6. if change['new']:
  7. file_path = '../code/dataset/free/' + str(uuid.uuid1()) + '.jpg'
  8. with open(file_path, 'wb') as f:
  9. f.write(image.value)
  10. snapshot_image.value = image.value
  11. def save_block_snapshot(change):
  12. if change['new']:
  13. file_path = '../code/dataset/blocked/' + str(uuid.uuid1()) + '.jpg'
  14. with open(file_path, 'wb') as f:
  15. f.write(image.value)
  16. snapshot_image.value = image.value
  17. controller.buttons[4].observe(save_free_snapshot, names='value')
  18. controller.buttons[5].observe(save_block_snapshot, names='value')
  19. display(widgets.HBox([image, snapshot_image]))
  20. display(controller)


左边L第一个按钮就是拍照采集空白的图片(对应的按钮下标是4),右边R第一个按钮就是采集有障碍物的图片(对应的按钮下标是5)。
然后我们通过摇杆移动小车就可以不断的采集了。
下面两行代码进行查询文件夹的数量

  1. import os
  2. block_count = len(os.listdir('../code/dataset/blocked/'))
  3. block_count
  1. block_count = len(os.listdir('../code/dataset/free/'))
  2. block_count

AI训练模型


采集的图片后,我们通过alexnet模型进行训练,并在训练完成的时候保存最好的模型记录(best_model.pth)。
创建一个train.ipynb添加如下代码进行跑模型。
(由于在训练模型中遇到了一些代码错误,所以我拉到本地的jupyter-notebook跑了一下,当然dataset也拉到本地中了)

  1. import torch
  2. # 导入视觉库
  3. import torchvision
  4. # 导入数据集类
  5. import torchvision.datasets as datasets
  6. # 导入视觉库里的转换类
  7. import torchvision.transforms as transforms
  8. # 导入视觉库
  9. import torchvision.models as models
  10. # 导入torch库里的神经网络类里的functional函数
  11. import torch.nn.functional as F
  12. # 采用迁移学习:在已训练好的模型上(已识别大部分特征),再训练(用较少的数据训练,识别新的个性特征)
  1. # 数据预处理和增强
  2. transform = transforms.Compose([
  3. transforms.ColorJitter(0.1, 0.1, 0.1, 0.1),
  4. transforms.Resize((224, 224)),
  5. transforms.ToTensor(),
  6. transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
  7. ])
  8. # 自定义数据集加载器,跳过无法识别的图像并打印出错路径
  9. class CustomImageFolder(datasets.ImageFolder):
  10. def __getitem__(self, index):
  11. path, target = self.samples[index]
  12. try:
  13. sample = self.loader(path)
  14. #print(f'path {path}')
  15. if self.transform is not None:
  16. sample = self.transform(sample)
  17. return sample, target
  18. except UnidentifiedImageError:
  19. print(f"UnidentifiedImageError: Skipping file at path {path}")
  20. return None
  21. # 加载数据集
  22. dataset = CustomImageFolder('dataset', transform=transform)
  23. # 把数据集划分为训练集和测试集
  24. train_dataset, test_dataset = torch.utils.data.random_split(dataset,[len(dataset) - 50,50])
  25. # 设置训练集加载方式 预加载数据集,每批加载图片个数,混洗,不启动并行加载
  26. train_loader = torch.utils.data.DataLoader(
  27. train_dataset,
  28. batch_size=8,
  29. shuffle=True,
  30. num_workers=0
  31. )
  32. # 测试集
  33. test_loader = torch.utils.data.DataLoader(
  34. test_dataset,
  35. batch_size=8,
  36. shuffle=True,
  37. num_workers=0
  38. )
  1. # AI 训练 第二个
  2. # 导入已训练好的模型alexnet
  3. # 自动下载alexnet模型
  4. model = models.alexnet(pretrained=True)
  5. # 针对1000个类别标签的数据集进行训练,而这里只有两类(阻挡和不阻挡)
  6. model.classifier[6] = torch.nn.Linear(model.classifier[6].in_features,2)
  7. # 把模型传输到 GPU上执行
  8. device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
  9. model = model.to(device)
  1. # 导入torch库里的优化器类
  2. import torch.optim as optim
  3. # 设置训练的轮数
  4. NUM_EPOCHS = 30
  5. # 训练好后输出的模型名
  6. BEST_MODEL_PATH = 'bast_model.pth'
  7. base_accuracy = 0.0
  8. # 优化器采用随机梯度下降法(SGD)
  9. optimizer = optim.SGD(model.parameters(),lr=0.001,momentum=0.9)
  10. # 多轮训练
  11. for epoch in range(NUM_EPOCHS):
  12. model.train() # 训练模式
  13. for images, labels in iter(train_loader):
  14. images = images.to(device)
  15. #print(f'images.shape {images.shape}')
  16. labels = labels.to(device)
  17. optimizer.zero_grad()
  18. outputs = model(images)
  19. loss = F.cross_entropy(outputs, labels)
  20. # 采用交叉熵做损失函数
  21. loss.backward()
  22. optimizer.step()
  23. #print(f'train labels {labels} loss: {loss} preds {outputs.argmax(1)}')
  24. model.eval() # 评估模式
  25. test_error_count = 0.0
  26. for images, labels in iter(test_loader):
  27. images = images.to(device)
  28. labels = labels.to(device)
  29. outputs = model(images)
  30. #print(f'test images.shape {images.shape} outputs shape{outputs.shape}')
  31. preds = outputs.argmax(1)
  32. print(f'test labels {labels} preds: {preds} sum {torch.abs(labels - preds)}')
  33. test_error_count += float(torch.sum(torch.abs(labels - preds)))
  34. # 计算误差时,仅考虑实际有效的样本数量
  35. #batch_error_count = torch.sum(labels != preds)
  36. #test_error_count += batch_error_count.item()
  37. print(f'test_error_count {test_error_count}')
  38. test_accuracy = 1.0 - float(test_error_count) / float(len(test_dataset))
  39. # 输出测试的准确率
  40. print('%d: %f' % (epoch, test_accuracy))
  41. # 当本轮的精度比上一轮的高,才保存模型的参数(权重值)
  42. if test_accuracy > base_accuracy:
  43. torch.save(model.state_dict(),BEST_MODEL_PATH)
  44. best_accuracy = test_accuracy


训练好模型后我们就可以开始部署代码了。
对了,这里我是以一个杯子为障碍物。

AI部署


创建一个AIDeploy.ipynb文件,写我们的AI部署代码。

载入模型

  1. import torch
  2. import torchvision
  3. # 构建alexnet模型
  4. model = torchvision.models.alexnet(pretrained=False)
  5. # 修改最后一层的输出特征数设为2
  6. model.classifier[6] = torch.nn.Linear(model.classifier[6].in_features, 2)
  7. # 导入已训练好模型的权重值
  8. model.load_state_dict(torch.load('bast_model.pth'))
  9. # 把模型从cpu中,传输到gpu中
  10. device = torch.device('cuda')
  11. model = model.to(device)

预处理: 将图像从相机格式 转换为 神经网络输入格式

  1. import cv2
  2. import numpy as np
  3. mean = 255.0 * np.array([0.485, 0.456, 0.406])
  4. stdev = 255.0 * np.array([0.229, 0.224, 0.225])
  5. # 归一化
  6. normalize = torchvision.transforms.Normalize(mean, stdev)
  7. def preprocess(camera_value):
  8. global device, normalize
  9. x = camera_value
  10. # 把相机BGR 转换为 模型需要的 RGB格式
  11. x = cv2.cvtColor(x, cv2.COLOR_BGR2RGB)
  12. # 从HWC布局转为 模型需要的 CHW 布局 (C:通道数 H:高度 W:宽度)
  13. x = x.transpose((2, 0, 1))
  14. x = torch.from_numpy(x).float()
  15. # 数据归一化处理,都变成0~1间的数据分布
  16. x = normalize(x)
  17. # 将数据从 CPU 内存传输到 GPU 内存
  18. x = x.to(device)
  19. # 添加批次维度
  20. x = x[None, ...]
  21. return x

监控界面

  1. import traitlets
  2. from IPython.display import display
  3. import ipywidgets.widgets as widgets
  4. from jetbot import Camera, bgr8_to_jpeg
  5. camera = Camera.instance(width=224, height=224)
  6. image = widgets.Image(format='jpeg', width=224, height=224)
  7. #//滑块控制 阻挡的概率
  8. blocked_slider = widgets.FloatSlider(description='blocked', min=0.0, max=1.0, orientation='vertical')
  9. #//滑块控制 小车速度
  10. speed_slider = widgets.FloatSlider(description='speed', min=0.0, max=0.5, value=0.0, step=0.01, orientation='horizontal')
  11. camera_link = traitlets.dlink((camera, 'value'), (image, 'value'), transform=bgr8_to_jpeg)
  12. display(widgets.VBox([widgets.HBox([image, blocked_slider]), speed_slider]))

绕过障碍物: 不断监听相机值,阻挡概率小于0.5则直行,否则原地左转

  1. from jetbot import Robot
  2. robot = Robot()
  3. import torch.nn.functional as F
  4. import time
  5. def update(change):
  6. global blocked_slider, robot
  7. x = change['new']
  8. x = preprocess(x)
  9. y = model(x)
  10. # 归一化,输出0~1范围,方便用于计算阻挡的概率
  11. y = F.softmax(y, dim=1)
  12. prob_blocked = float(y.flatten()[0])
  13. blocked_slider.value = prob_blocked
  14. # 阻挡概率小于50%,则直行
  15. if prob_blocked < 0.5:
  16. robot.forward(speed_slider.value)
  17. else: #//否则,原地左转
  18. robot.left(speed_slider.value)
  19. time.sleep(0.001)
  20. # 我们调用一词函数来进行初始化
  21. update({'new': camera.value})
  22. # 监听相机值的变化
  23. camera.observe(update, names='value')

退出:释放资源

  1. import time
  2. release_btn = widgets.Button(description='release resource')
  3. def release_resource(x):
  4. camera.unobserve(update, names='value') #//取消监听相机
  5. time.sleep(0.1) #//添加小的sleep 以确保帧图像传输完成
  6. robot.stop() #//停止小车
  7. camera_link.unlink() #//取消相机连接 -> 不再把视频流 传输到 浏览器
  8. camera.stop() #//停止相机
  9. print("release resource go")
  10. release_btn.on_click(release_resource)
  11. display(release_btn)

进行测试


我们拖动speed0.1左右,小车会自动前进。
识别到障碍物会向左拐,旁边blocked检测是否遇到障碍物。


这里是视频:https://www.bilibili.com/video/BV12y411a7ZT/?spm_id_from=333.999.0.0&vd_source=0a9564bfc7982839767a99fe6bf60155


欢迎加群讨论技术,1群:677373950(满了,可以加,但通过不了),2群:656732739

评价

JetBot AI 烧写镜像学习笔记一

JetBot AI 烧写镜像学习笔记一[TOC] 购买JetBot【淘宝】https://m.tb.cn/h.g0v9FnntUA6422l?tk=UfJuWIAKI2n HU9196 「英...

JetBot 对接手柄

JetBot 对接手柄[TOC] 启动手柄首先将配对手柄的蓝牙插入到电脑中。 启动手柄,推到ON 检查手柄连接首先打开https://har...

JetBot uboot中运行程序

jetbot uboot中运行程序[TOC] 创建裸机程序编写一个led.s的程序 mov x3,#0x5A mov x4,#0x55 loop: b loop...

JetBot 硬件架构图与电路图

jetbot 硬件架构图与电路图[TOC] jetbot gpu有128个cuda核心,cpu有4核 ARM A57。这里我们使用的是4G内存。接着我们来看...

JetBot 芯片手册

jetbot 芯片手册[TOC] 什么是芯片手册控制芯片的详细说明。3.1CPU芯片手册Tegra_X1_TRM_DP07225001_v1.3p.pdf - 891 KB ...
这一世以无限游戏为使命!
排名
2
文章
634
粉丝
44
评论
93
docker中Sware集群与service
尘叶心繁 : 想学呀!我教你呀
一个bug让程序员走上法庭 索赔金额达400亿日元
叼着奶瓶逛酒吧 : 所以说做程序员也要懂点法律知识
.net core 塑形资源
剑轩 : 收藏收藏
映射AutoMapper
剑轩 : 好是好,这个对效率影响大不大哇,效率高不高
ASP.NET Core 服务注册生命周期
剑轩 : http://www.tnblog.net/aojiancc2/article/details/167
ICP备案 :渝ICP备18016597号-1
网站信息:2018-2025TNBLOG.NET
技术交流:群号656732739
联系我们:contact@tnblog.net
公网安备:50010702506256
欢迎加群交流技术