#要说的话#
前面把中盛科技的控制器组件写完了。稍稍熟悉了一些HA,现在准备写窗帘控制组件,构想的东西会比较多,估计有些难度,过程会比较长,边写边记录吧!
#设备和场景环境#
使用的是Novo的电机,淘宝链接在【前言】文章中有,轨道应该是佳丽斯的,买电机的时候一起配的。电机提供的是RS485接口,所以需要增加一个RS485的服务器,我选用的是“亿佰特”的网口转RS485的模块,型号是NA111-A(使用220V电源供电,就避免了再加一个模块)统一放置在机柜那里,把原来的网络拿两路出来(蓝色和棕色组)作为485的线路,保留100M的网线功能。远端使用的是多功能的面板,带2+3的电源插口和8+4的网络口,网络口刚好可以分成百兆网和485接口。
#组件思路#
流程:选择Novo组件-->选择485设备类型-->发现485设备-->(可选:配置485设备)
-->通过485连接Novo设备-->配置Novo设备地址(需要按SET按钮)-->设置电机转向
-->设置开合范围-->完成配置
思路:
- 1、选择组件后,提供界面选择使用的485设备类型,使用下拉框给用户选择;
- 2、依据选择的485设备类型触发对应的自发现流程,并列出发现的485设备;
- 3、用户点击选择485设备,组件建立与485设备的链接,并显示通讯正常;
- 4、组件通过485设备向Novo电机发送查询命令,有返回则说明链路建立成功;
- 5、由于是并联了多个Novo电机,所以会返回多个查询回复,所以只能轮循查询,需要提供界面配置的Novo电机地址和通道;
- 6、点击485设备条目进行配置,配置界面自动生成Novo电机地址和通道,点击提交,然后到Novo电机上按下相应配置按钮,完成Novo电机的地址配置(电机正反转两次),该条目移至已经配置好的列表;
- 7、在已经配置好地址的列表里点击Novo电机,进入到电机的配置:可以配置电机转向、窗帘开合范围,检查开、合是否正确,都正确无误后完成配置(后续版本再做)。
-
#代码实现#
- 通过命令:
-
python3 -m script.scaffold integration
初始化组件代码。添加discover方法:
-
def ebyte_discover(ip_address: str | None = None) -> dict[str, EbyteConfig]:"""亿佰特-发现设备方法."""sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)sock.bind(("", BROADCAST_SRC_PORT))sock.settimeout(2)found_devices: dict[str, EbyteConfig] = {}if ip_address is None:addrs = enum_all_broadcast()else:addrs = [ip_address]for addr in addrs:try:sock.sendto(BROADCAST_MSG, (addr, BROADCAST_PORT))except Exception: # noqa: BLE001_LOGGER.warning(f"Can't access network {addr}") # noqa: G004while True:try:data, addr = sock.recvfrom(512)# 返回:fd 06 54 14 a7 dc 74 16 03 01# 返回:fd 06 54 14 a7 dc 72 83 03 01# 分析:fd 06 为帧头# 54 14 a7 dc 74 16 为MAC地址# 03 01 为帧尾mac = ebyte_get_mac(data)if len(mac[0]) > 6:found_devices[mac[0]] = EbyteConfig(mac[0], "Ebyte_RS485", mac[0], addr[0], 0)found_devices[mac[0]].set_mac_bytes(mac[1])except TimeoutError:breakexcept OSError as e:_LOGGER.error(f"Socket error: {e!r}") # noqa: G004if len(found_devices) > 0:# 查询相关信息for conf in found_devices.values():bts: bytearray = QUERY_CMD_FORMAT.copy()for i in range(len(conf.mac_bytes)):bts[i + QUERY_CMD_FORMAT_MAC_IDX] = conf.mac_bytes[i]# 获取名称、版本号和序列号bts[QUERY_CMD_FORMAT_CMD_IDX] = 0x05sock.sendto(bts, (conf.base_ip, BROADCAST_PORT))res = sock.recv(1024)mac = ebyte_get_mac(res)[0]if mac in found_devices:found_devices[mac].load_data(res)# 获取网络配置bts[QUERY_CMD_FORMAT_CMD_IDX] = 0x00sock.sendto(bts, (conf.base_ip, BROADCAST_PORT))res = sock.recv(1024)mac = ebyte_get_mac(res)[0]if mac in found_devices:found_devices[mac].load_data(res)return found_devices
这里绑定UDP的源端口,是转为亿佰特的设备返回的数据是另外一个固定端口。不绑定源端口的话,返回数据收不到。
-
修改config_flow.py中的函数,显示485设备列表:
-
async def async_step_user(self, user_input: dict[str, Any] | None = None) -> ConfigFlowResult:"""Handle the initial step."""errors: dict[str, str] = {}ent_data: dict[str, Any] = {}if user_input is not None:try:# 获取用户选择的设备IDconf: EbyteConfig = self.get_ebyteconfig_by_id(user_input[CONF_USER_INPURT_ID])ent_data[CONF_HOST] = conf.base_ipent_data[CONF_PORT] = conf.base_portent_data[CONF_MAC] = conf.base_macexcept Exception: # noqa: BLE001_LOGGER.exception("Unexpected exception")errors["base"] = "unknown"dp: dict[str, str] = {}dp["MAC"] = conf.mac_bytes.hex(":")dp["IP"] = conf.base_ipdp["Port"] = conf.base_portret: ConfigFlowResult = self.async_create_entry(# 生成集成条目title=f"Ebyte Device[{conf.base_ip}]",description="This is a Ebyte RS485 communication device!",description_placeholders=dp,options=dp,data=ent_data,)self._ebytelink = static_ebyte_manager.get_link_by_mac(conf.base_mac, conf.base_ip, conf.base_port)# # 创建对应的RS485通讯链路设备# 错误:在此处不能创建设备,只能在__init__.py里创建else: # noqa: RET505self._confs = ebyte_discover()options: dict[str, str] = {}if len(self._confs) < 1:_LOGGER.error("No ebyte communications!")else:for conf in self._confs.values():options[conf.base_mac] = (f"{conf.base_name}[{conf.base_ip}:{conf.base_port}]")ops = []# 从系统中获取已经配置了的条目clist: list[ConfigEntry] = self.hass.config_entries.async_entries(DOMAIN)cd: dict[str, str] = {}for cf in clist:cd[cf.data[CONF_MAC]] = cf.data[CONF_HOST]for k, v in options.items():if k not in cd:ops.append(SelectOptionDict(value=k, label=v))if len(ops) > 0:ebyteschema = vol.Schema({vol.Required(CONF_USER_INPURT_ID): SelectSelector(SelectSelectorConfig(options=ops,mode=SelectSelectorMode.DROPDOWN,)),})else:ebyteschema = vol.Schema({"ERROR:": "No Ebyte RS485 device founded!"})return self.async_show_form(step_id="user", data_schema=ebyteschema, errors=errors)
这里使用SelectSelector提供选择项。
-
生成的效果如下:
要显示集成条目右边的配置按钮,需要在Flow类中添加指定的方法:
@staticmethod@callbackdef async_get_options_flow(config_entry: ConfigEntry) -> OptionsFlow:"""增加本函数后,会在集成条目位置增加“配置”按钮."""return NovoOptionsFlow(config_entry)
完成RS485设备(集成条目)的添加,后续就是在点击“配置”的时候,弹出界面给Novo电机写地址和通道号,代码放在NovoOptionsFlow中,如下:
async def async_step_init(self, user_input: dict[str, Any] | None = None) -> FlowResult:"""Manage the options."""errors: dict[str, str] = {}if user_input is not None:# 有输入信息if not user_input[CONFIG_FLOW_TYPE]:# 未知的步骤_LOGGER.error("Unkown flow type!")elif user_input[CONFIG_FLOW_TYPE] == OPTIONS_FLOW_TYPE_SET_ADDRESS:# 当前为输入电机地址步骤,需要配置电机idb = bytes.fromhex(user_input[CONFIG_ID])did: int = int.from_bytes(idb, "big")channel: int = user_input["channel"]edata: dict[str, Any] = {}res = await self._device.async_set_motor_addr(did, channel)if res:# 写入成功,则保存相关信息edata[CONF_MAC] = self._entry.data[CONF_MAC]edata[CONF_HOST] = self._entry.data[CONF_HOST]edata[CONF_PORT] = self._entry.data[CONF_PORT]edata[CONFIG_ID] = didedata[CONFIG_CHANNEL] = channelself._save_device_config(edata)return self.async_create_entry(title="", data=edata)# 自动生成电机IDida = np.random.random_integers(161, 254, size=(2))idab: bytearray = bytearray(2)idab[0] = ida[0]idab[1] = ida[1]ebyteschema = vol.Schema({vol.Required(CONFIG_FLOW_TYPE, default=OPTIONS_FLOW_TYPE_SET_ADDRESS): vol.In(ADD_WAY),vol.Required(schema=CONFIG_ID,description="Enetry RS485 address.",default=idab.hex(" "),): str,vol.Required(schema=CONFIG_CHANNEL, description="Entery Novo channel.", default=4): int,})return self.async_show_form(step_id="init", data_schema=ebyteschema, errors=errors)
显示效果:
在这里做了选择器,最初是想把设置地址和配置旋转方向一起的,所以留在这里了。后续再完善。
这里配置完成后,我不知道应该怎么保存配置并使用,看了美的的组件,就直接用他的代码了,就是保存json文件到本地,使用的时候读取就是了。对应的代码:
def _save_device_config(self, data: dict):os.makedirs(self.hass.config.path(f"{STORAGE_PATH}/{data[CONF_MAC]}"), exist_ok=True)record_file = self.hass.config.path(f"{STORAGE_PATH}/{data[CONF_MAC]}/{data[CONFIG_ID]}.json")save_json(record_file, data)
添加并配置Novo电机就完成了,后而就是怎么加载这些实体,加载的工作必须放在__init__.py文件中,不能放在其他地方,回到__init__.py文件,修改对应的代码:
# Update entry annotation
async def async_setup_entry(hass: HomeAssistant, entry: NovoConfigEntry) -> bool:"""Set up Novo from a config entry."""# 说明:此方法,每个集成条目都会调用一次# 1. Create API instance# 2. Validate the API connection (and authentication)# 3. Store an API object for your platforms to accessip: str = entry.data[CONF_HOST]port: int = entry.data[CONF_PORT]mac: str = entry.data[CONF_MAC]link: EbyteRS485Link = static_ebyte_manager.get_link_by_mac(mac, ip, port)if link is None:_LOGGER.error(f"Device[{mac},{ip}:{port}] lost!") # noqa: G004# 注册通讯设备:当前是亿佰特的RS485设备device_registry = dr.async_get(hass)if link.is_connected:device_registry.async_get_or_create(config_entry_id=entry.entry_id,configuration_url=f"http://{link.tcp_ip}:{link.tcp_port}/",identifiers={("mac", mac)},connections={("mac", mac), ("ip", link.tcp_ip), ("port", link.tcp_port)},manufacturer="Ebyte Tech",model=link.DOMAIN,name=link.base_name,serial_number=mac,sw_version=link.version,translation_key="Ebyte communication",)await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)novo: NovoDevice = static_novo_device_manager.get_novo_device(mac, link)# conf: NovoConfigd_id = 0c_id = 0# 从保存的配置文件中获取实体jsons = _load_device_config(hass, mac)if len(jsons) > 0:for js in jsons:# _LOGGER.error(f'get json:{js}')d_id = js[CONFIG_ID]c_id = js[CONFIG_CHANNEL]c_mac = js[CONF_MAC]if c_mac == mac:# 发送消息给实体类,创建实体async_dispatcher_send(hass,DATA_DISCOVER_COMPONENT.format(NovoCoverEntry.DOMAIN),novo,entry,d_id,c_id,)return True
因为考虑到需要支持多RS485设备,不同的电机是挂在不同的RS485设备上的,加载的时候就需要一一对应上,在这里的思路就是使用RS485设备的MAC地址作为文件夹区分,并通过文件遍历的方式获取到配置的json文件(每个json文件对应一台电机),遍历的代码:
def _load_device_config(hass: HomeAssistant, device_id):# 列表出文件夹下所有的文件files = []jsons = []pypath = hass.config.path(f"{STORAGE_PATH}/{device_id}/")if not os.path.isdir(pypath):_LOGGER.error(f"file path :{pypath} not exists!") # noqa: G004return jsonsfor filename in os.listdir(pypath):filepath = os.path.join(pypath, filename)if os.path.isfile(filepath):files.append(filename)record_file = hass.config.path(f"{STORAGE_PATH}/{device_id}/{filename}")jsons.append(load_json(record_file, default={}))return jsons
在__init__.py中怎么把配置信息传到实体类,在我的上一篇文章中已经说了,不清楚的可以看看。这里使用了if c_mac == mac:进行限制是否生成实体,是因为在多个RS485(集成条目)的情况下,相应的代码就是执行多次,有冲突后,就不能正确给实体配置上对应的链路。
最后就是实体类Cover了,完整代码如下:
"""窗帘实体类."""import logging
from typing import Anyfrom homeassistant.components.cover import (ATTR_POSITION,CoverEntity,
)
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers.dispatcher import async_dispatcher_connect
from homeassistant.helpers.entity_platform import AddEntitiesCallbackfrom .const import DATA_DISCOVER_COMPONENT
from .core.NovoDevice import NovoDevice_LOGGER = logging.getLogger("novocover")async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry, async_add_entities: AddEntitiesCallback
) -> None:"""配置窗帘实体."""@callbackdef async_discover(device: NovoDevice, entry: ConfigEntry, id: int, ch: int) -> None:"""发现电机回调及添加方法."""nentry = NovoCoverEntry(device, entry, id, ch)async_add_entities([nentry])hass.data[DATA_DISCOVER_COMPONENT.format(NovoCoverEntry.DOMAIN)] = (async_dispatcher_connect(hass,DATA_DISCOVER_COMPONENT.format(NovoCoverEntry.DOMAIN),async_discover,))class NovoCoverEntry(CoverEntity):"""Novo电机窗帘实体类."""DOMAIN = "cover"_attr_has_entity_name = True_id: int_ch: int_entry: ConfigEntry_device: NovoDevice_pos: int_closed: booldef __init__(self, device: NovoDevice, entry: ConfigEntry, id: int, ch: int) -> None:"""初始化Novo电机窗帘实体."""super().__init__()self._id = idself._entry = entryself._unique_id = f"{self.DOMAIN}.novo_curtain_{id}"self.entity_id = self._unique_idself._closed = Trueself._ch = chself._device = deviceself._pos = 0self._attr_unique_id = f"{self.DOMAIN}.novo_curtain_{id}"self._attr_name = f"NovoCurtain_{id}"@propertydef unique_id(self) -> str | None:"""设备标识符."""return self._attr_unique_id@propertydef is_closed(self) -> bool:"""窗帘是否已经关闭."""return self._closeddef open_cover(self, **kwargs: Any) -> None:"""打开窗帘."""self._device.async_open_cover_by_id(self._id, self._ch, 0)async def async_open_cover(self, **kwargs: Any) -> None:"""异步打开窗帘."""return await self._device.async_open_cover_by_id(self._id, self._ch, 0)def close_cover(self, **kwargs: Any) -> None:"""关闭窗帘."""self._device.async_close_cover_by_id(self._id, self._ch, 100)async def async_close_cover(self, **kwargs: Any) -> None:"""异步关闭窗帘."""return await self._device.async_close_cover_by_id(self._id, self._ch, 100)def set_cover_position(self, **kwargs: Any) -> None:"""设置窗帘位置."""position = int(kwargs.get(ATTR_POSITION))self._device.async_set_cover_position_by_id(self._id, self._ch, position)async def async_update(self) -> None:"""更新代码."""await self._device.async_query_position_by_id(self._id, self._ch)self._pos = self._device.get_position_by_id(self._id)self._attr_current_cover_position = self._posif self._pos >= 99:self._attr_is_closed = Falseself._closed = Falseelse:self._attr_is_closed = Trueself._closed = True
最后运行结果:
学到的知识点:
1、使用界面获取用户输入:async_show_form方法的使用;
2、集成条目的使用:async_create_entry方法;
3、集成条目配置:async_get_options_flow方法
存在的问题:
1、Novo电机会主动发送信息,需要在链路代码里增加循环读取socket信息的功能,而不是现在一发一收的模式;
2、Novo电机的Update方法是直接调用链路模块发送命令的,有可能会出现冲突的情况,需要封装链路模块,在模块中处理好冲突问题;
3、使用集成条目的配置功能添加Novo电机时,不会自动删除原有的配置文件(json);
4、使用配置功能添加Novo电机后,不会自动刷新实体列表,需要手动“重新加载”集成条目。
增加了自动化:早上7点05分,开灯、开窗帘。早上自动打开了。
Novo窗帘的组件基本功能完成,家里的零冷水泵到了,后续就是把零冷水泵添加到HA中。还有人体传感器、空气质量传感器、电动水阀都有了,都得花时间把这些东西加进去……,又是得花时间折腾……
另外:最近想把项目放到HA里面去,因为需要用到厂家的LOGO,目前在跟厂家沟通,获得授权后,把LOGO加上,就可以放到HA里面了。