整体思路:
收集数据集,数据集中包含3种类型的数据,分别是跨越、坐立、其他(站立、睡着等等)。3种类型的数据样本量持平。
首先基于OpenPose进行人体关键点的检测,得到人体的18个关键点。然后基于该算法将上面的数据集跑一遍,得到所有数据的人体关键点和类别。基于该识别结果输入xgboost模型进行3分类训练,得到最终的输出结果。
数据集准备:
跨越数据,类别为0
坐立数据,类别为1
其他类型数据,类别为2
mxpiOpenposeProto库的编译:
这里使用的protoc是3.14.0的版本,protobuf是3.19.0的版本
cd proto
bash build.sh
后处理插件编译:
cd plugins
bash build.sh
chmod 440 build/libmxpi_openposepostprocess.so
cp build/libmxpi_openposepostprocess.so ${SDK_INSTALL_PATH}/mxVision/lib/plugins/ # ${SDK_INSTALL_PATH}替换为用户的SDK安装路径
模型转化:
atc --model=./simplified_560_openpose_pytorch.onnx --framework=5 --output=openpose_pytorch_560 --soc_version=Ascend310P3 --input_shape="data:1, 3, 560, 560" --input_format=NCHW --insert_op_conf=./insert_op.cfg
推理代码实现:
import sys
import os
import enumimport numpy as np
import cv2
import timefrom StreamManagerApi import StreamManagerApi, MxDataInput, StringVector
sys.path.append("../proto")
import mxpiOpenposeProto_pb2 as mxpiOpenposeProtofrom xgb import XGBCOCO_PAIRS = [(1, 2), (1, 5), (2, 3), (3, 4), (5, 6), (6, 7), (1, 8), (8, 9), (9, 10), (1, 11),(11, 12), (12, 13), (1, 0), (0, 14), (14, 16), (0, 15), (15, 17), (2, 16), (5, 17)] # = 19COCO_PAIRS_RENDER = COCO_PAIRS[:-2]COCO_COLORS = [[255, 0, 0], [255, 85, 0], [255, 170, 0], [255, 255, 0], [170, 255, 0], [85, 255, 0], [0, 255, 0],[0, 255, 85], [0, 255, 170], [0, 255, 255], [0, 170, 255], [0, 85, 255], [0, 0, 255], [85, 0, 255],[170, 0, 255], [255, 0, 255], [255, 0, 170], [255, 0, 85]]class OPENPOSE(object):def __init__(self):# init stream managerself.stream_manager_api = StreamManagerApi()ret = self.stream_manager_api.InitManager()if ret != 0:print("Failed to init Stream manager, ret=%s" % str(ret))exit()# create streams by pipeline config filewith open("./pipeline/Openpose.pipeline", "rb") as f:pipeline_str = f.read()ret = self.stream_manager_api.CreateMultipleStreams(pipeline_str)if ret != 0:print("Failed to create Stream, ret=%s" % str(ret))exit()#跨越、坐立预测self.xgb = XGB()def preproc(self, img, img_size, swap=(2, 0, 1)):h,w = img.shape[:2]h_ratio = img_size[0] / img.shape[0]w_ratio = img_size[1] / img.shape[1]resized_img = cv2.resize(img,(img_size[1], img_size[0]),interpolation=cv2.INTER_AREA,).astype(np.uint8)return resized_img, h_ratio, w_ratiodef get_pose_bbox(self, person_list, h_ratio, w_ratio):joints, xcenter = [], []for person in person_list:skeletons = person.skeletonInfoVecx_coords, y_coords, centers = [], [], {}seen_idx = []for skele in skeletons:part_idx1 = skele.cocoSkeletonIndex1part_idx2 = skele.cocoSkeletonIndex2if part_idx1 not in seen_idx:seen_idx.append(part_idx1)center = (int(skele.x0 / w_ratio), int(skele.y0 / h_ratio))centers[part_idx1] = centerx_coords.append(center[0])y_coords.append(center[1])if part_idx2 not in seen_idx:seen_idx.append(part_idx2)center = (int(skele.x1 / w_ratio), int(skele.y1 / h_ratio))centers[part_idx2] = centerx_coords.append(center[0])y_coords.append(center[1])joints.append(centers)return jointsdef draw(self, npimg, results):for joint in results:minx, miny = -1,-1for key, value in joint["joint"].items():# draw keypointscenter = valuecv2.circle(npimg, center, 3, COCO_COLORS[key], thickness=3, lineType=8, shift=0)if minx==-1 and miny==-1:minx = center[0]miny = center[1]# draw skeletonsfor pair_order, pair in enumerate(COCO_PAIRS_RENDER):if pair[0] not in joint["joint"].keys() or pair[1] not in joint["joint"].keys():continuecv2.line(npimg, joint["joint"][pair[0]], joint["joint"][pair[1]], COCO_COLORS[pair_order], 3, cv2.LINE_AA)label = joint["pred_label"] +" {:.3f}".format(joint["prob"])cv2.putText(npimg, label, (minx, miny), 0, 0.6, (255,255,255), thickness=1, lineType=cv2.LINE_AA)return npimgdef process(self, image):stream_name = b"classification+detection"in_plugin_id = 0h0, w0 = image.shape[:2]input_shape = (560, 560)pre_img, h_ratio, w_ratio = self.preproc(image, input_shape)pre_img = np.ascontiguousarray(pre_img)image_bytes = cv2.imencode('.jpg', pre_img)[1].tobytes()data_input = MxDataInput()data_input.data = image_bytesunique_id = self.stream_manager_api.SendData(stream_name, in_plugin_id, data_input)if unique_id < 0:print("Failed to send data to stream.")exit()keys = [b"mxpi_openposepostprocess0"]key_vec = StringVector()for key in keys:key_vec.push_back(key)infer_result = self.stream_manager_api.GetProtobuf(stream_name, in_plugin_id, key_vec)if infer_result.size() == 0:print("infer_result is null")exit()if infer_result[0].errorCode != 0:print("infer_result error. errorCode=%d" % (infer_result[0].errorCode))exit()result_personlist = mxpiOpenposeProto.MxpiPersonList()result_personlist.ParseFromString(infer_result[0].messageBuf)detect_person_list = result_personlist.personInfoVecjoints = self.get_pose_bbox(detect_person_list, h_ratio, w_ratio)results = []for joint in joints:joint_np = np.ones((1,36))*(-1)for i in range(18):if i in joint.keys():joint_np[0,2*i] = joint[i][0]joint_np[0,2*i+1] = joint[i][1]pred, pred_prob, pred_label = self.xgb.pred(joint_np)results.append({"joint":joint, "pred":pred, "prob":pred_prob, "pred_label":pred_label})return resultsdef __del__(self):# destroy streamsself.stream_manager_api.DestroyAllStreams()def test_image():openpose = OPENPOSE()file_name = "./images/1.jpg"image = cv2.imread(file_name, 1)results = openpose.process(image)print("#####", results)image_show = openpose.draw(image, results)cv2.imwrite(file_name.split('.')[0] + "_detect_result.jpg", image_show)def test_images():openpose = OPENPOSE()data_dir = "./data/other/"#data_dir = "./data/sit/"#data_dir = "./data/span/"for name in os.listdir(data_dir):fullname = os.path.join(data_dir, name)image = cv2.imread(fullname, 1)joints = openpose.process(image)#print(name, joints)for joint in joints:out = ""+name +" 2"for i in range(18):if i in joint["joint"].keys():out = out + " "+ str(joint["joint"][i][0]) + " " + str(joint["joint"][i][1])else:out = out+" -1 -1"print(out)#image_show = openpose.draw(image, joints)#cv2.imwrite(name, image_show)def test_video():openpose = OPENPOSE()# Open the video filevideo_path = "./images/span.mp4"cap = cv2.VideoCapture(video_path)fourcc = cv2.VideoWriter_fourcc('X', 'V', 'I', 'D') # 确定视频被保存后的编码格式output = cv2.VideoWriter("output.mp4", fourcc, 20, (640, 480)) # 创建VideoWriter类对象# Loop through the video frameswhile cap.isOpened():# Read a frame from the videosuccess, frame = cap.read()if success:# Run YOLOv8 tracking on the frame, persisting tracks between framest1 = time.time()results = openpose.process(frame)t2 = time.time()annotated_frame = openpose.draw(frame, results)print("time", t2-t1)output.write(annotated_frame)# Break the loop if 'q' is pressedif cv2.waitKey(1) & 0xFF == ord("q"):breakelse:# Break the loop if the end of the video is reachedbreak# Release the video capture object and close the display windowcap.release()cv2.destroyAllWindows()if __name__ == '__main__':#test_image()test_images()#test_video()
模型识别效果:
Xgboost数据集准备:
基于OpenPose模型将数据集跑一遍,得到关键点坐标数据集,数据集保存在txt里面,每一行格式为(图片名 类别 关键点xy坐标),如果身体遮挡没有关键点的使用-1代替。
Xgboost模型训练:
训练代码,
import xgboost
from xgboost import XGBClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
#from sklearn.externals import joblib
import joblib
import numpy as np
from matplotlib import pyplot as plt
import osfrom matrix import DrawConfusionMatrix
data_label = ["span", "sit", "other"]
drawconfusionmatrix = DrawConfusionMatrix(labels_name=data_label)def train():# 载入数据集dataset = np.loadtxt('./data/all.txt', delimiter=" ", usecols=list(range(1,38)))X = dataset[:, 1:]Y = dataset[:, 0]"""#数据归一化x = X[:,0:36:2].copy()y = X[:,1:36:2].copy()maxx = np.max(x, axis=1).reshape(-1,1)x[x==-1]=10000minx = np.min(x, axis=1).reshape(-1,1)maxy = np.max(y, axis=1).reshape(-1,1)y[y==-1]=10000miny = np.min(y, axis=1).reshape(-1,1)minxy = np.hstack([minx, miny])maxxy = np.hstack([maxx, maxy])minxy = np.tile(minxy,(1,18))maxxy = np.tile(maxxy,(1,18))X = (X - minxy)/(maxxy-minxy)"""# 把数据集拆分成训练集和测试集seed = 7test_size = 0.15X_train, X_test, y_train, y_test = train_test_split(X, Y, test_size=test_size, random_state=seed)evalset = [(X_train, y_train), (X_test, y_test)]model = XGBClassifier(max_depth=2, num_class=3, learning_rate=0.1, n_estimators=300, silent=True, reg_alpha=0.01, objective='multi:softprob')model.fit(X_train, y_train, eval_metric=['mlogloss','merror'], eval_set = evalset, verbose=True)results = model.evals_result()plt.plot(results["validation_0"]["mlogloss"], label="train", linestyle="solid", color='k')plt.plot(results["validation_1"]["mlogloss"], label="test", linestyle="dotted", color='k')plt.legend()plt.xlabel('epoch')plt.ylabel('loss')plt.savefig("loss.png")plt.plot(1.0-np.array(results["validation_0"]["merror"]), label="train", linestyle="solid", color='k')plt.plot(1.0-np.array(results["validation_1"]["merror"]), label="test", linestyle="dotted", color='k')plt.legend()plt.xlabel('epoch')plt.ylabel('accuracy')plt.savefig("acc.png")# 保存模型joblib.dump(model, "xg.model") # 加载模型model = joblib.load("xg.model")# 对测试集做预测y_pred = model.predict(X_test)predictions = [round(value) for value in y_pred]y_pred_prob = model.predict_proba(X_test)#混淆矩阵drawconfusionmatrix.update(np.array(predictions,np.int32), y_test.astype(np.int32))drawconfusionmatrix.drawMatrix()# 评估预测结果accuracy = accuracy_score(y_test, predictions)print("Accuracy: %.2f%%" % (accuracy * 100.0))def test():xgb = XGB()data = "283 183 271 225 244 224 226 285 272 309 297 226 296 282 300 304 246 323 333 345 296 435 281 318 350 348 294 435 275 175 289 176 263 182 297 183"data = data.split(" ")x_test = np.array(data,np.float32).reshape(-1, 36)predictions, predictions_label = xgb.pred(x_test)print(predictions, predictions_label)class XGB():def __init__(self):self.model = joblib.load("./models/xg.model")self.data_label = {0:"span", 1:"sit", 2:"other"}def pred(self, X_test):#pred = self.model.predict(X_test)[0]pred_prob = self.model.predict_proba(X_test)[0]pred = int(np.argmax(pred_prob))pred_prob = pred_prob[pred]pred_label = self.data_label[pred]return pred, pred_prob, pred_labelif __name__ == "__main__":train()#test()
训练结果,
整体测试:
整体感受:
(1)目前基于该方法有一定的效果,精度不高主要还是因为训练数据太少。关键点模型也是直接使用的开源的模型,没有在自己私有数据上微调,等等问题都会对最终的结果有影响。
(2)本质来看,跨越、坐立还是一个时序问题,基于时序的思路解答这个问题效果应该是会高一个量级的。但是时序的模型一般都大,过程复杂,效率偏低,工程视频实时推理使用又不实际。
参考链接:
https://github.com/Daniil-Osokin/lightweight-human-pose-estimation.pytorch
https://gitee.com/ascend/mindxsdk-referenceapps/tree/master/contrib/OpenposeKeypointDetection
快速安装昇腾环境 — 昇腾开源 1.0 文档