Kafka-Connect源码分析

一、上下文

《Kafka-Connect自带示例》中我们尝试了零配置启动producer和consumer去生产和消费数据,那么它内部是如何实现的呢?下面我们从源码来揭开它神秘的面纱。

二、入口类有哪些?

从启动脚本(connect-standalone.sh)中我们可以获取到启动类为ConnectStandalone

# ......省略......exec $(dirname $0)/kafka-run-class.sh $EXTRA_ARGS org.apache.kafka.connect.cli.ConnectStandalone "$@"

此外我们在参数中还给了source和sink的配置文件,从这source文件中可以获取到入口类为:FileStreamSource,从sink文件中可以获取到入口类为:FileStreamSink

FileStreamSource对应源码中的FileStreamSourceConnector

FileStreamSink对应源码中的FileStreamSinkConnector

connect-file-source.properties

name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=/opt/cloudera/parcels/CDH-6.3.1-1.cdh6.3.1.p0.1470567/etc/kafka/conf.dist/connect-file-test-data/source.txt
topic=connect-test

connect-file-sink.properties

name=local-file-sink
connector.class=FileStreamSink
tasks.max=1
file=/opt/cloudera/parcels/CDH-6.3.1-1.cdh6.3.1.p0.1470567/etc/kafka/conf.dist/connect-file-test-data/sink.txt
topics=connect-test

三、ConnectStandalone

它将Kafka Connect作为独立进程运行。在此模式下,work(连接器和任务)不会被分配。相反,所有正常的Connect机器都在一个过程中工作。适合一些临时、小型或实验性工作。

在此模式下,连接器和任务配置存储在内存中,不是持久的。但是,连接器偏移数据是持久的,因为它使用文件存储(可通过offset.storage.file.filename配置)

public class ConnectStandalone extends AbstractConnectCli<StandaloneConfig> {public static void main(String[] args) {ConnectStandalone connectStandalone = new ConnectStandalone(args);//调用父类的run()connectStandalone.run();}}//这里用到了AbstractConnectCli,它里面有Kafka Connect的通用初始化逻辑。
public abstract class AbstractConnectCli<T extends WorkerConfig> {//验证第一个CLI参数、进程工作器属性,并启动Connectpublic void run() {if (args.length < 1 || Arrays.asList(args).contains("--help")) {log.info("Usage: {}", usage());Exit.exit(1);}try {//connect-standalone.properties 配置文件String workerPropsFile = args[0];//从connect-standalone.properties 中将配置的参数都转成 mapMap<String, String> workerProps = !workerPropsFile.isEmpty() ?Utils.propsToStringMap(Utils.loadProps(workerPropsFile)) : Collections.emptyMap();//这里面存放了 connect-standalone.properties 后的其他参数//比如connect-file-source.properties、connect-file-sink.propertiesString[] extraArgs = Arrays.copyOfRange(args, 1, args.length);//启动ConnectConnect connect = startConnect(workerProps, extraArgs);// 关机将由Ctrl-C或通过HTTP关机请求触发connect.awaitStop();} catch (Throwable t) {log.error("Stopping due to error", t);Exit.exit(2);}}}

总结下来做了两件事情

1、初始化connect-standalone.properties配置信息

2、启动Connect

那什么是Connect,且它有做了什么呢?下面我们来看下

1、启动Connect

public abstract class AbstractConnectCli<T extends WorkerConfig> {public Connect startConnect(Map<String, String> workerProps, String... extraArgs) {//Kafka Connect工作进程初始化log.info("Kafka Connect worker initializing ...");long initStart = time.hiResClockMs();WorkerInfo initInfo = new WorkerInfo();initInfo.logAll();//正在扫描插件类。这可能需要一点时间log.info("Scanning for plugin classes. This might take a moment ...");Plugins plugins = new Plugins(workerProps);plugins.compareAndSwapWithDelegatingLoader();T config = createConfig(workerProps);log.debug("Kafka cluster ID: {}", config.kafkaClusterId());RestClient restClient = new RestClient(config);ConnectRestServer restServer = new ConnectRestServer(config.rebalanceTimeout(), restClient, config.originals());restServer.initializeServer();URI advertisedUrl = restServer.advertisedUrl();String workerId = advertisedUrl.getHost() + ":" + advertisedUrl.getPort();//connector.client.config.override.policy   默认值 All//ConnectorClientConfigOverridePolicy 实现的类名或别名。// 定义连接器可以覆盖哪些客户端配置。默认实现为“All”,这意味着连接器配置可以覆盖所有客户端属性。// 框架中的其他可能策略包括“无”禁止连接器覆盖客户端属性,“主体”允许连接器仅覆盖客户端主体ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy = plugins.newPlugin(config.getString(WorkerConfig.CONNECTOR_CLIENT_POLICY_CLASS_CONFIG),config, ConnectorClientConfigOverridePolicy.class);Herder herder = createHerder(config, workerId, plugins, connectorClientConfigOverridePolicy, restServer, restClient);final Connect connect = new Connect(herder, restServer);//Kafka Connect工作进程初始化已完成log.info("Kafka Connect worker initialization took {}ms", time.hiResClockMs() - initStart);try {connect.start();} catch (Exception e) {log.error("Failed to start Connect", e);connect.stop();Exit.exit(3);}//这里面会依次处理 connect-file-source.properties、connect-file-sink.properties 等参数processExtraArgs(herder, connect, extraArgs);return connect;}}

可以理解connect-standalone.properties是用于启动connect的,connect-file-source.properties是用于启动source任务的的,connect-file-sink.properties是用于启动sink任务的,source、sink的任务是在processExtraArgs(herder, connect, extraArgs)中完成的,这里用到了三个参数,

1、Herder:牧民,可用于在Connect群集上执行操作的实例,它里面有Worker

2、Connect:这个类将Kafka Connect进程的所有组件(牧民、工人、存储、命令接口)联系在一起,管理它们的生命周期

3、extraArgs:用于配置source、sink任务的参数

2、启动source、sink任务

这里我们对着参数(connect-file-source.properties、connect-file-sink.properties)来看更为形象

    protected void processExtraArgs(Herder herder, Connect connect, String[] extraArgs) {try {//一个配置一个配置去解析//按照官方的例子会依次解析connect-file-source.properties、connect-file-sink.propertiesfor (final String connectorConfigFile : extraArgs) {CreateConnectorRequest createConnectorRequest = parseConnectorConfigurationFile(connectorConfigFile);FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<>((error, info) -> {if (error != null)log.error("Failed to create connector for {}", connectorConfigFile);elselog.info("Created connector {}", info.result().name());});//依次把每个配置文件的解析结果放入 herder 牧民中herder.putConnectorConfig(createConnectorRequest.name(), createConnectorRequest.config(),createConnectorRequest.initialTargetState(),false, cb);//Future的get方法‌是一个阻塞方法,用于获取任务的运行结果。当调用get方法时,如果任务尚未完成,线程会阻塞,直到任务完成。cb.get();}} catch (Throwable t) {//.....}}

下面我们分别用connect-file-source.properties、connect-file-sink.properties带入看看牧民(herder)是如何将任务进行执行的。一份文件就是一个connector,这里我们先分析StandaloneHerder

public class StandaloneHerder extends AbstractHerder {private final ScheduledExecutorService requestExecutorService;StandaloneHerder(Worker worker,String workerId,String kafkaClusterId,StatusBackingStore statusBackingStore,MemoryConfigBackingStore configBackingStore,ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy,Time time) {super(worker, workerId, kafkaClusterId, statusBackingStore, configBackingStore, connectorClientConfigOverridePolicy, time);this.configState = ClusterConfigState.EMPTY;//创建一个单线程执行器,可以安排命令在给定延迟后运行,或定期执行。this.requestExecutorService = Executors.newSingleThreadScheduledExecutor();configBackingStore.setUpdateListener(new ConfigUpdateListener());}public void putConnectorConfig(final String connName, final Map<String, String> config, final TargetState targetState,final boolean allowReplace, final Callback<Created<ConnectorInfo>> callback) {try {validateConnectorConfig(config, (error, configInfos) -> {if (error != null) {callback.onCompletion(error, null);return;}//向执行器提交 source 或 sink 的 ConnectorConfigrequestExecutorService.submit(() -> putConnectorConfig(connName, config, targetState, allowReplace, callback, configInfos));});} catch (Throwable t) {callback.onCompletion(t, null);}}private synchronized void putConnectorConfig(String connName,final Map<String, String> config,TargetState targetState,boolean allowReplace,final Callback<Created<ConnectorInfo>> callback,ConfigInfos configInfos) {try {//........//将此连接器配置(以及可选的目标状态)写入持久存储,并等待其被确认,//然后通过在Kafka日志中添加消费者来读回。如果将worker配置为使用fencable生产者写入配置topic,//则必须在调用此方法之前成功调用claimWritePrivileges()configBackingStore.putConnectorConfig(connName, config, targetState);startConnector(connName, (error, result) -> {if (error != null) {callback.onCompletion(error, null);return;}//执行器提交任务requestExecutorService.submit(() -> {updateConnectorTasks(connName);callback.onCompletion(null, new Created<>(created, createConnectorInfo(connName)));});});} catch (Throwable t) {callback.onCompletion(t, null);}}private synchronized void updateConnectorTasks(String connName) {//......//配置 connectorTask 这里会用到配置文件中的connector.class//既:List<Map<String, String>> newTaskConfigs = recomputeTaskConfigs(connName);List<Map<String, String>> rawTaskConfigs = reverseTransform(connName, configState, newTaskConfigs);if (taskConfigsChanged(configState, connName, rawTaskConfigs)) {removeConnectorTasks(connName);configBackingStore.putTaskConfigs(connName, rawTaskConfigs);createConnectorTasks(connName);}}private void createConnectorTasks(String connName) {List<ConnectorTaskId> taskIds = configState.tasks(connName);createConnectorTasks(connName, taskIds);}private void createConnectorTasks(String connName, Collection<ConnectorTaskId> taskIds) {Map<String, String> connConfigs = configState.connectorConfig(connName);for (ConnectorTaskId taskId : taskIds) {//依次启动每个task(配置的 source和sink task)startTask(taskId, connConfigs);}}private boolean startTask(ConnectorTaskId taskId, Map<String, String> connProps) {switch (connectorType(connProps)) {case SINK:return worker.startSinkTask(taskId,configState,connProps,configState.taskConfig(taskId),this,configState.targetState(taskId.connector()));case SOURCE:return worker.startSourceTask(taskId,configState,connProps,configState.taskConfig(taskId),this,configState.targetState(taskId.connector()));default:throw new ConnectException("Failed to start task " + taskId + " since it is not a recognizable type (source or sink)");}}}

牧民(Herder)会用Worker来启动配置的SouceTask和SinkTask,最终他们调用的还是同一个方法,只是任务构建器不同而已,下面我们继续分析

Worker启动SouceTask

    public boolean startSourceTask(...) {return startTask(id, connProps, taskProps, configState, statusListener,new SourceTaskBuilder(id, configState, statusListener, initialState));}

Worker启动SinkTask

    public boolean startSinkTask(...) {return startTask(id, connProps, taskProps, configState, statusListener,new SinkTaskBuilder(id, configState, statusListener, initialState));}

下面我们共同来分析startTask(...)

    //线程池//Executors.newCachedThreadPool()private final ExecutorService executor;private boolean startTask(...){//......//从 connector.class 获取类进行加载String connType = connProps.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);ClassLoader connectorLoader = plugins.connectorLoader(connType);//......final Class<? extends Task> taskClass = taskConfig.getClass(TaskConfig.TASK_CLASS_CONFIG).asSubclass(Task.class);//对应的sourceTask 和 sinkTaskfinal Task task = plugins.newTask(taskClass);//此处就是根据传的参数来workerTask 会加载不同的Task// SourceTaskBuilder --->  SouceTask// SinkTaskBuilder   --->  SinkTaskworkerTask = taskBuilder.withTask(task).withConnectorConfig(connConfig).withKeyConverter(keyConverter).withValueConverter(valueConverter).withHeaderConverter(headerConverter).withClassloader(connectorLoader).build();workerTask.initialize(taskConfig);WorkerTask<?, ?> existing = tasks.putIfAbsent(id, workerTask);//我们继续往下分析,看看 SourceTask  和 SinkTask 都是怎么执行的executor.submit(plugins.withClassLoader(connectorLoader, workerTask));if (workerTask instanceof WorkerSourceTask) {SourceTask 有一个单独 定时提交 offset 的 任务,默认间隔为 1minsourceTaskOffsetCommitter.ifPresent(committer -> committer.schedule(id, (WorkerSourceTask) workerTask));}return true;}

FileStreamSource对应的Task为:FileStreamSourceTask

FileStreamSink对应的Task为:FileStreamSinkTask

Worker会将WorkerTask调起去生产和消费数据

3、调度运行WorkerTask

WorkerTask会提供Worker用于管理任务的基本方法。实现将用户指定的Task与Kafka相结合,以创建数据流。且WorkerTask会放到线程池中进行调度,下面我们看下它的run()

//以下只是主要代码
abstract class WorkerTask<T, R extends ConnectRecord<R>> implements Runnable {public void run() {doRun();}private void doRun() throws InterruptedException {//会初始化 我们在connect-file-source.properties、connect-file-sink.properties中对Producer、Consumer的参数配置doStart();//真的开始//    用对应的SourceTask去读取数据,并交由Producer去生产//    用Consumer接收数据交由 SinkTask去处理数据execute();}}

doStart()

    void doStart() {retryWithToleranceOperator.reporters(errorReportersSupplier.get());initializeAndStart();statusListener.onStartup(id);}

Source 和 Sink 会对initializeAndStart()有不同的实现

Source

这里用的是WorkerTask的子类:AbstractWorkerSourceTask

public abstract class AbstractWorkerSourceTask extends WorkerTask<SourceRecord, SourceRecord> {protected void initializeAndStart() {prepareToInitializeTask();offsetStore.start();//启动标记设置为 truestarted = true;//使用指定的上下文对象初始化此SourceTask。task.initialize(sourceTaskContext);//启动任务。这应该处理任何配置解析和任务的一次性设置。//这里会实际调用 FileStreamSourceTask 或者我们指定的其他 SourceTask的 start()task.start(taskConfig);log.info("{} Source task finished initialization and start", this);}}
public class FileStreamSourceTask extends SourceTask {public void start(Map<String, String> props) {AbstractConfig config = new AbstractConfig(FileStreamSourceConnector.CONFIG_DEF, props);//name=local-file-source//connector.class=FileStreamSource//tasks.max=1//file=/opt/cloudera/parcels/CDH-6.3.1-1.cdh6.3.1.p0.1470567/etc/kafka/conf.dist/connect-file-test-data/source.txt//topic=connect-test// filefilename = config.getString(FileStreamSourceConnector.FILE_CONFIG);if (filename == null || filename.isEmpty()) {stream = System.in;//跟踪stdin的偏移量没有意义streamOffset = null;reader = new BufferedReader(new InputStreamReader(stream, StandardCharsets.UTF_8));}//topic=connect-testtopic = config.getString(FileStreamSourceConnector.TOPIC_CONFIG);//batch.sizebatchSize = config.getInt(FileStreamSourceConnector.TASK_BATCH_SIZE_CONFIG);}}
Sink

这里用的是WorkerTask的子类:WorkerSinkTask

    protected void initializeAndStart() {SinkConnectorConfig.validate(taskConfig);//订阅 topicif (SinkConnectorConfig.hasTopicsConfig(taskConfig)) {List<String> topics = SinkConnectorConfig.parseTopicsList(taskConfig);consumer.subscribe(topics, new HandleRebalance());log.debug("{} Initializing and starting task for topics {}", this, String.join(", ", topics));} else {//topics.regex//根据正则设置的 要消费的 topicString topicsRegexStr = taskConfig.get(SinkTask.TOPICS_REGEX_CONFIG);Pattern pattern = Pattern.compile(topicsRegexStr);consumer.subscribe(pattern, new HandleRebalance());log.debug("{} Initializing and starting task for topics regex {}", this, topicsRegexStr);}//初始化此任务的上下文。task.initialize(context);//启动任务。这应该处理任何配置解析和任务的一次性设置。//这里会真正的调用 FileStreamSinkTask 或者 其他我们配置的SinkTask 的 start()task.start(taskConfig);log.info("{} Sink task finished initialization and start", this);}
public class FileStreamSinkTask extends SinkTask {public void start(Map<String, String> props) {AbstractConfig config = new AbstractConfig(FileStreamSinkConnector.CONFIG_DEF, props);filename = config.getString(FileStreamSinkConnector.FILE_CONFIG);if (filename == null || filename.isEmpty()) {outputStream = System.out;} else {try {//根据我们在配置文件中结果文件 创建输出流outputStream = new PrintStream(Files.newOutputStream(Paths.get(filename), StandardOpenOption.CREATE, StandardOpenOption.APPEND),false,StandardCharsets.UTF_8.name());} catch (IOException e) {throw new ConnectException("Couldn't find or create file '" + filename + "' for FileStreamSinkTask", e);}}}}

execute()

为了看的清晰,这里我们只列举主要代码

Source
public abstract class AbstractWorkerSourceTask extends WorkerTask<SourceRecord, SourceRecord> {public void execute() {while (!isStopping()) {//这里会调用 task.poll(); 也就是从文件读取数据toSend = poll();//这里真的就会调用producer.send() 发送数据sendRecords()}}
}
Sink
class WorkerSinkTask extends WorkerTask<ConsumerRecord<byte[], byte[]>, SinkRecord> {public void execute() {while (!isStopping())iteration();}}protected void iteration() {poll(timeoutMs);}protected void poll(long timeoutMs) {//用consumer拉回数据    ConsumerRecords<byte[], byte[]> msgs = pollConsumer(timeoutMs);//转化并交由 自己定义的 SinkTask 处理数据convertMessages(msgs);deliverMessages()}private ConsumerRecords<byte[], byte[]> pollConsumer(long timeoutMs) {ConsumerRecords<byte[], byte[]> msgs = consumer.poll(Duration.ofMillis(timeoutMs));}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/35731.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

[Redis#17] 主从复制 | 拓扑结构 | 复制原理 | 数据同步 | psync

目录 主从模式 主从复制作用 建立主从复制 主节点信息 从节点信息 断开主从复制关系 主从拓扑结构 主从复制原理 1. 复制过程 2. 数据同步&#xff08;PSYNC&#xff09; 3. 三种复制方式 一、全量复制 二、部分复制 三、实时复制 四、主从复制模式存在的问题 在…

【Unity高级】如何动态调整物体透明度

本文介绍了如何设置及动态调整物体的透明度。 一、手动设置的方法 我们先来看下如何手动设置物体的透明度。 物体的透明与否是通过材质来设置的。只有我们把具有透明度的材质指给物体的渲染器&#xff08;Render&#xff09;&#xff0c;物体就被设置成相应的透明度了。 看一…

相机动态/在线标定

图1 图2 基本原理 【原理1】平行线在射影变换后会交于一点。如图所示,A为相机光心,蓝色矩形框为归一化平面,O为平面中心。地面四条黄色直线为平行且等距的车道线。HI交其中两条车道线于H、I, 过G作HI的平行线GM交车道线于M。HI、GM在归一化平面上的投影分别为JK、PN,二者会…

通俗易懂理解:网络安全恶意节点的检测与哨兵节点的激活【论文+代码】

以下资料参考来自本文末尾的参考资料与代码&#xff1a; 在网络安全中&#xff0c;恶意节点检测和哨兵节点激活是确保网络稳定性、可靠性和安全性的关键技术&#xff0c;尤其是在分布式系统、物联网 (IoT)、区块链网络等环境中。下面将详细介绍这两个概念及其应用。 一、恶意…

python作业

1.D 2.B 3.D 4.C 5.B 6.D 7.D 8.B 9.D 10. A 11.D 12.C 13.√ 14.√ 16.√ 17.√ 18.None 19.([1,3],[2]) 20. 列表思维导图

Redis(上)

Redis 基础 什么是 Redis&#xff1f; Redis &#xff08;REmote DIctionary Server&#xff09;是一个基于 C 语言开发的开源 NoSQL 数据库&#xff08;BSD 许可&#xff09;。与传统数据库不同的是&#xff0c;Redis 的数据是保存在内存中的&#xff08;内存数据库&#xf…

LabVIEW气缸摩擦力测试系统

基于LabVIEW的气缸摩擦力测试系统实现了气缸在不同工作状态下摩擦力的快速、准确测试。系统由硬件平台和软件两大部分组成&#xff0c;具有高自动化、精确测量和用户友好等特点&#xff0c;可广泛应用于精密机械和自动化领域。 ​ 项目背景&#xff1a; 气缸作为舵机关键部件…

CentOS7.X 安装RustDesk自建服务器实现远程桌面控制

参照文章CentOS安装RustDesk自建服务器中间总有几个位置出错&#xff0c;经实践做个记录防止遗忘 一 环境&工具准备 1.1 阿里云轻量服务器、Centos7系统、目前最高1.1.11版本rustdesk-server-linux-amd64.zip 1.2 阿里云轻量服务器–安全组–开放端口&#xff1a;TCP(21…

工具篇:IDEA VFS 损害启动报错 com.intellij.util.io.CorruptedException 处理

文章目录 前言一、 idea 的 VFS是什么&#xff1f;二、解决方式&#xff1a;2.1 退出Idea 然后重新打开&#xff1a;2.2 手动清除Idea 缓存&#xff0c;让Idea 重新建立缓存&#xff1a;2.2.1 打开 Invalidate Caches / Restart 对话框:2.2.2 勾选要清除的缓存&#xff1a; 总结…

2.linux中调度kettle

一.准备转换&#xff0c;等会在linux中用 1.添加excel输入组件&#xff0c;并添加对应的文件 2.添加列拆分为多行组件 3.添加文本文件输出组件 4.保存转换 二.linux安装java 1.把jdk-8u144-linux-x64.tar.gz上传到linux的/lx目录下 2. 解压jdk包&#xff0c;然后配置环境变量…

第四节、电机定角度转动【51单片机-TB6600驱动器-步进电机教程】

摘要&#xff1a;本节介绍用电机转动角度计算步骤&#xff0c;从而控制步进电机转角 一、 计算过程 1.1 驱动器接收一个脉冲后&#xff0c;步进电机转动一步&#xff0c;根据驱动器设置的细分值 计算一个脉冲对应电机转动的角度step_x s t e p x s t e p X … … ① step_{x…

如何终身使用 100% 免费的服务器

作为开发人员,我们需要在云服务上运行和托管后端。有许多 BaaS(后端即服务)可用,但它们有一些限制。 如果我说我已经免费使用基于 Linux 的服务器超过 4-5 年了,那会怎样?是的,你没听错。我正在使用这台安装了 Ubuntu 20、24 GB RAM、4 个 CPU 和 200 GB 存储空间的 Lin…

【计算机组成原理】期末复习题库

5&#xff0e;主存储器和CPU之间增加cache的目的是 。 A&#xff0e;解决CPU和主存之间的速度匹配问题 B&#xff0e;扩大主存储器的容量 C&#xff0e;扩大CPU中通用寄存器的数量 D&#xff0e;既扩大主存容量又扩大CPU中通用寄存器的数量 在计算机系统中&#xff0c;CPU的速…

SAP中Smartforms 翻译越南语

点击打印预览 打印预览中确实是越南语 转出成PDF 成了乱码 SPAD中查询LP01其实是简体中文 换成LP02试试 显示看上去正常的 SPAD中的LP02 SU3可以设置自己的默认打印参数 查查Smartforms中的字体样式 是宋体&#xff0c;看上去不用为了越南文刻意改字体样式成TIMES 看这篇文章…

26.删除有序数组中的重复项 python

删除有序数组中的重复项 题目题目描述示例 1&#xff1a;示例 2&#xff1a;提示&#xff1a;题目链接 题解解题思路python实现代码解释提交结果 题目 题目描述 给你一个 非严格递增排列 的数组 nums &#xff0c;请你 原地 删除重复出现的元素&#xff0c;使每个元素 只出现…

R语言 | 峰峦图 / 山脊图

目的&#xff1a;为展示不同数据分布的差异。 1. ggplot2 实现 # 准备数据 datmtcars[, c("mpg", "cyl")] colnames(dat)c("value", "type") head(dat) # value type #Mazda RX4 21.0 6 #Mazda RX4 Wag …

四川创新志成健康管理有限公司

四川创新志成健康管理有限公司 成都市青羊区广富路168号 公司简介 四川创新志成健康管理有限公司成立于2021年&#xff0c;公司专注体外诊断领域&#xff0c;致力为医学实验室、生产厂家、 经销商提供专业的学术、技术增值服务&#xff0c;涵盖免疫、生化、输血等检测领域&a…

系统级 I/O

Unix I/O **了解 Unix I/O 将帮助你理解其他的系统概念。**I/O 是系统操作不可或缺的一部分。我们经常遇到 I/O 和其他系统概念之间的循环依赖。例如&#xff0c;I/O 在进程的创建和执行中扮演着关键的角色。反过来&#xff0c;进程创建又在不同进程间的文件共享中扮演着关键角…

Elasticsearch:使用阿里 infererence API 及 semantic text 进行向量搜索

在之前的文章 “Elasticsearch 开放推理 API 新增阿里云 AI 搜索支持”&#xff0c;它详细描述了如何使用 Elastic inference API 来针对阿里的密集向量模型&#xff0c;稀疏向量模型&#xff0c; 重新排名及 completion 进行展示。在那篇文章里&#xff0c;它使用了很多的英文…

基于公网的无线全双工内部通话系统在演出行业可以用吗?

文旅名城再出发&#xff0c;更待“烟花”绽繁花 2024年4月将开业的扬州首个大型沉浸式剧场-《运河密城》 以运河为原点 追随河的记忆 从春秋时代的吴王夫差 到贯通南北的大运河成形 穿梭时空 探索扬州的前世今生 「运河第一锹」古运河旁 有一处新地标正在悄然兴起 如…