我们通过源码发现,xxl-job主要是由客户端发起的注册和存活上报(心跳检测)的;
主要是执行器在启动时,会初始化一个线程,每隔30
秒请求调度中心接口,维护存活状态;
- 注册接口/上报接口:
/api/register
; - 调度中心存储执行器的信息,并维护其末次心跳上报时间
xxl_job_registry.update_time
我们主要看 JobRegistryHelper 执行器注册类
包含3个功能: 1. 进行执行器的注册和心跳检测; 2. 执行器的注销下线 3. 刷新执行器的信息,将实时的执行器地址列表更新到表中;xxl_job_group;
- 执行器的注册和心跳检测:接收执行器的
/api/register
请求,插入或者更新updateTime. xxl_job_registry表。
public ReturnT<String> registry(RegistryParam registryParam) {// valid// registryGroup=EXECUTOR, registryKey=appName, registryValue=http://127.0.0.1:5555if (!StringUtils.hasText(registryParam.getRegistryGroup()) || !StringUtils.hasText(registryParam.getRegistryKey()) || !StringUtils.hasText(registryParam.getRegistryValue())) {return new ReturnT<String>(ReturnT.FAIL_CODE, "Illegal Argument.");}/ async execute// 异步执行注册和心跳检测的逻辑:xxl_job_registryregistryOrRemoveThreadPool.execute(new Runnable() {@Overridepublic void run() {int ret = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registryUpdate(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());if (ret < 1) {XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registrySave(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());// freshfreshGroupRegistryInfo(registryParam);}}});return ReturnT.SUCCESS;
}
- 执行器的注销下线:当执行器服务关机时,会调用对应的
/api/registerRemove
接口进行执行器下线;
public ReturnT<String> registryRemove(RegistryParam registryParam) {// valid 参数和注册参数一致if (!StringUtils.hasText(registryParam.getRegistryGroup()) || !StringUtils.hasText(registryParam.getRegistryKey()) || !StringUtils.hasText(registryParam.getRegistryValue())) {return new ReturnT<String>(ReturnT.FAIL_CODE, "Illegal Argument.");}// 异步执行下线逻辑registryOrRemoveThreadPool.execute(new Runnable() {@Overridepublic void run() {// 删除对应的 xxl_job_registry 记录int ret = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registryDelete(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue());if (ret > 0) {// freshfreshGroupRegistryInfo(registryParam);}}});return ReturnT.SUCCESS;
}
- 定时刷新执行器情况,并更新到
xxl_job_group.address_list
字段
每30秒检测一次执行器注册情况, 将超过90秒的执行器删除,将更新时间在90秒之内的更新到执行字段中。
registryMonitorThread = new Thread(new Runnable() {@Overridepublic void run() {while (!toStop) {try {// 查询自动注册的执行器List<XxlJobGroup> groupList = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().findByAddressType(0);if (groupList!=null && !groupList.isEmpty()) {// 移除超时下线节点 xxl_job_registry,超时时间为执行器注册(心跳)间隔的3倍;Dead_Timeout = 30 * 3;List<Integer> ids = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findDead(RegistryConfig.DEAD_TIMEOUT, new Date());if (ids!=null && ids.size()>0) {XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().removeDead(ids);}// 刷新在线的机器(执行器地址)HashMap<String, List<String>> appAddressMap = new HashMap<String, List<String>>();List<XxlJobRegistry> list = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findAll(RegistryConfig.DEAD_TIMEOUT, new Date());if (list != null) {for (XxlJobRegistry item: list) {if (RegistryConfig.RegistType.EXECUTOR.name().equals(item.getRegistryGroup())) {String appname = item.getRegistryKey();List<String> registryList = appAddressMap.get(appname);if (registryList == null) {registryList = new ArrayList<String>();}if (!registryList.contains(item.getRegistryValue())) {registryList.add(item.getRegistryValue());}appAddressMap.put(appname, registryList);}}}// fresh group addressfor (XxlJobGroup group: groupList) {List<String> registryList = appAddressMap.get(group.getAppname());String addressListStr = null;if (registryList!=null && !registryList.isEmpty()) {Collections.sort(registryList);StringBuilder addressListSB = new StringBuilder();for (String item:registryList) {addressListSB.append(item).append(",");}addressListStr = addressListSB.toString();addressListStr = addressListStr.substring(0, addressListStr.length()-1);}group.setAddressList(addressListStr);group.setUpdateTime(new Date());// 将执行器地址列表自动更新到表中的对应字段 xxl_job_groupXxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().update(group);}}} catch (Exception e) {if (!toStop) {logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);}}try {// 每30秒检测一次执行器注册情况TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);} catch (InterruptedException e) {if (!toStop) {logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);}}}logger.info(">>>>>>>>>>> xxl-job, job registry monitor thread stop");}
});
registryMonitorThread.setDaemon(true);
registryMonitorThread.setName("xxl-job, admin JobRegistryMonitorHelper-registryMonitorThread");
registryMonitorThread.start();