目前手里面有四套大数据集群的作业需要维护,分别属于不同的客户,我同岗位的兄弟离职后,所有的数据作业都落到我头上了,公司也不招人了。开发新的数据作业倒没有什么问题,就是客户叫我补数的时候,头比较大,当天的直接重跑就是了,尤其是要补很久以前的数据,还要去查看代码,找出作业之间的依赖。调度平台也不尽相同。如果能提前发现问题,当天问题当天解决,客户不找我补数,那不是有更多的时间摸鱼啦?
1.需要监控的组件盘点
针对Apache 原生部署的集群,重点监控对象 hdfs yarn ,挂的概率最大就是这两大组件,然后就是kafka、 zookeeper、Azkaban,这三个组件的监控比较简单,主要监听程序端口是否开启。针对cdh集群设置组件故障重启,利用cdh数据库监听集群组件状态。所有集群监听yarn上运行的作业。监控程序语言选择java,之前用python写过一些简单的监控程序,但是部署实在是很麻烦。
2.Hdfs 监控
主要是监控 namenode 、datanode 以及hdfs使用情况。 这里主要是用hadoop的api实现的。
public static String hdfsCheck() throws IOException{Configuration conf = new Configuration();conf.set("fs.defaultFS", "hdfs://xxxxx:9820"); // Namenode 地址和端口// 创建 HdfsAdmin 实例DistributedFileSystem dfs;try {dfs = (DistributedFileSystem) FileSystem.get(conf);dfs.getServerDefaults().getReplication();}catch (Exception e){conf.set("fs.defaultFS", "hdfs://XXXX:9820"); // HA Namenode 地址和端口dfs = (DistributedFileSystem) FileSystem.get(conf);}String warn = "";try {// 创建 HDFS 文件系统实例if (dfs.isInSafeMode()) {warn = "namenode is in safe mode";}long totalCapacity = dfs.getStatus().getCapacity(); // 总容量long usedCapacity = dfs.getStatus().getUsed(); // 已用容量long remainingCapacity = dfs.getStatus().getRemaining(); // 可用容量double usagePercentage = (double) usedCapacity / totalCapacity * 100;// 将字节转换为吉字节double totalCapacityGB = totalCapacity / (1024.0 * 1024 * 1024);double usedCapacityGB = usedCapacity / (1024.0 * 1024 * 1024);double remainingCapacityGB = remainingCapacity / (1024.0 * 1024 * 1024);// 输出容量信息log.info(String.format("Total Capacity: %.2f GB", totalCapacityGB));log.info(String.format("Used Capacity: %.2f GB", usedCapacityGB));log.info(String.format("Remaining Capacity: %.2f GB", remainingCapacityGB));log.info(String.format("Usage Percentage: %.2f%%", usagePercentage));// 检查使用率是否超过 90%if (usagePercentage > 90) {warn=warn+"Warning: HDFS usage exceeds 90%!";}// 检查使用率是否超过 90%if (usagePercentage > 90) {System.out.println("Warning: HDFS usage exceeds 90%!");}// 获取 Datanode 信息DatanodeInfo[] datanodes = dfs.getDataNodeStats();// 获取 Datanode 状态long currentTime = System.currentTimeMillis();int liveNodesCount = 0;// 假设心跳阈值为5分钟long heartBeatThreshold = 5 * 60 * 1000;for (DatanodeInfo datanode : datanodes) {long heartBeatTime = datanode.getLastUpdate();if (currentTime - heartBeatTime <= heartBeatThreshold) {liveNodesCount++;}else {warn=warn+"datanode: "+datanode.getHostName()+" is not alive\n";}}log.info("Number of live nodes: " + liveNodesCount);return warn;} catch (IOException e) {warn=warn+"hdfs is not available";return warn;}
}
3.yarn监控
主要是监控 resourcemanager、NodeManager, 这里主要是用yarn的api实现的。
public static boolean isPortOpen(String host, int port) {try (Socket socket = new Socket()) {socket.connect(new java.net.InetSocketAddress(host, port), 2000); // 设置超时时间为2000毫秒return true; // 连接成功,端口开放} catch (IOException e) {return false; // 连接失败,端口关闭}
}public static String yarnCheck() {// YARN ResourceManager 地址String resourceManagerHost = "XXXXX"; // ResourceManager 地址int resourceManagerPort = 8032; // ResourceManager 端口if(!isPortOpen(resourceManagerHost, resourceManagerPort)){ // 判断端口是否通return "yarn resourcemanager is not available";}String warn="";//如果集群有HA resourcemanager 需要像hdfs那样判断一下。// 创建 YARN 客户端Configuration conf = new Configuration();conf.set("yarn.resourcemanager.address", resourceManagerHost + ":" + resourceManagerPort);try (YarnClient yarnClient = YarnClient.createYarnClient()) {yarnClient.init(conf);yarnClient.start();List<NodeReport> nodeReports = yarnClient.getNodeReports();long currentTime = System.currentTimeMillis();int liveNodesCount = 0;long heartBeatThreshold = 5 * 60 * 1000; // 设置心跳间隔,大于这个间隔默认挂掉for (NodeReport nodeReport : nodeReports) {long heartBeatTime = nodeReport.getLastHealthReportTime();if (currentTime - heartBeatTime <= heartBeatThreshold) {liveNodesCount++;}else {warn= warn+"NodeManager: "+nodeReport.getNodeId()+" is not alive\n";}}log.info("Number of live yarn NodeManager: " + liveNodesCount);return warn;} catch (Exception e) {e.printStackTrace();}return warn;
}
4.yarnjob监控
监控指定指定时间的job是否运行成功,我们这边大部分作业都是spark job,提交的时候都有job名,不是hive sql提交的,所以一旦找到出错的job名,就能快定位故障,从而实现开始补数。
private static Long extractNumber(String s) { //用于 application id 排序String[] split = s.split("_");String numberPart = split[1]+split[2]; // 替换非数字字符return numberPart.isEmpty() ? 0 : Long.parseLong(numberPart); // 转换为整数
}public static String yarnJobCheck(String date) throws IOException, YarnException {// YARN ResourceManager 地址System.setProperty("HADOOP_USER_NAME", "xxxx"); // 有些集群hdfs会限制用户String resourceManagerHost = "XXXX"; // ResourceManager 地址int resourceManagerPort = 8032; // ResourceManager 端口if (!isPortOpen(resourceManagerHost, resourceManagerPort)) {return "yarn resourcemanager is not available";}String warn = "";// 创建 YARN 客户端Configuration conf = new Configuration();conf.set("yarn.resourcemanager.address", resourceManagerHost + ":" + resourceManagerPort);try (YarnClient yarnClient = YarnClient.createYarnClient()) {yarnClient.init(conf);yarnClient.start();conf.set("fs.defaultFS", "hdfs://XXX:8020"); // Namenode 地址和端口// 创建 HdfsAdmin 实例DistributedFileSystem dfs;try {dfs = (DistributedFileSystem) FileSystem.get(conf);dfs.getServerDefaults().getReplication();} catch (Exception e) {conf.set("fs.defaultFS", "hdfs://XXX:8020"); // HA Namenode 地址和端口dfs = (DistributedFileSystem) FileSystem.get(conf);}Path path = new Path("/yarn/application/XXX/logs/"); // hdfs 上存储日子的目录FileStatus[] fileStatusList = dfs.listStatus(path);ArrayList<String> appIdList = new ArrayList<>();// 遍历并打印文件名for (FileStatus fileStatus : fileStatusList) {if (fileStatus.isDirectory()) { // 确保是文件而不是目录java.util.Date finishTime = new Date(fileStatus.getModificationTime());LocalDate finishDate = finishTime.toInstant().atZone(ZoneId.systemDefault()).toLocalDate();if ((finishDate.toString()).equals(date)) { // 获取指定日期的application idappIdList.add(fileStatus.getPath().getName());}}}// 根据字符串中的数字对appid进行排序Collections.sort(appIdList, new Comparator<String>() {@Overridepublic int compare(String s1, String s2) {Long num1 = extractNumber(s1);Long num2 = extractNumber(s2);return Long.compare(num1, num2);}});HashMap<String, String> appNameMap = new HashMap<>();// 各个名字的job只取最新的那一个for (String appId : appIdList) {ApplicationReport applicationReport = yarnClient.getApplicationReport(ApplicationId.fromString(appId));appNameMap.put(applicationReport.getName(),appId);}for(String appName:appNameMap.keySet()){ApplicationReport applicationReport = yarnClient.getApplicationReport(ApplicationId.fromString(appNameMap.get(appName)));String diagnostics = applicationReport.getDiagnostics();//筛选指定的故障的jobif (!diagnostics.isEmpty()&&(applicationReport.getName().contains("Interface")||applicationReport.getName().contains("Report"))){warn=warn+""+"Job finish date :"+stampToDate(Long.valueOf(applicationReport.getFinishTime()))+"\n"+"job name:"+applicationReport.getName()+"\n"+"job id:"+applicationReport.getApplicationId()+"\n";}if(applicationReport.getName().contains("Report")||applicationReport.getName().contains("Interface")){log.info(String.format("app finish time: %s", stampToDate(Long.valueOf(applicationReport.getFinishTime()))));log.info(String.format("Application ID: %s", ApplicationId.fromString(appNameMap.get(appName))));log.info(String.format("Application Name: %s", applicationReport.getName()));log.info(String.format("Application State: %s", applicationReport.getYarnApplicationState()));log.info(String.format("Final Status: %s", applicationReport.getFinalApplicationStatus()));log.info("================================================");}}}return warn;
}
5.部署jar包
我是部署到自己工作的电脑上,工作的电脑能访问各个集群的服务器,设置定时每天发几次邮件给我。知道现在集群的现状,如果有问题及时干预,客户不找我补数,日子美滋滋撒。
定时设置:
邮件样例:
最后,欢迎大家关注我的公众号:造轮子的坦克,可以通过公众号联系我。