mysql-springboot netty-flink-kafka-spark(paimon)-minio

1、下载spark源码并编译

mkdir -p /home/bigdata && cd /home/bigdata

wget https://archive.apache.org/dist/spark/spark-3.4.3/spark-3.4.3.tgz

解压文件

tar -zxf spark-3.4.3.tgz 

cd spark-3.4.3

wget https://raw.githubusercontent.com/apache/incubator-celeborn/v0.4.0-incubating/assets/spark-patch/Celeborn_Dynamic_Allocation_spark3_4.patch
git apply Celeborn_Dynamic_Allocation_spark3_4.patch

源码构建编译

./dev/make-distribution.sh --name lukeyan --pip --tgz -Dhadoop.version=3.3.6 -Phive -Phive-thriftserver -Pkubernetes -Pvolcano
 

编译成功

构建完成的进行解压操作并添加相应的jar文件

解压编译的文件

tar -zxvf spark-3.4.3-bin-lukeyan.tgz 

cd spark-3.4.3-bin-lukeyan
 

添加jar文件

cd jars/
 

ls
wget  https://repo1.maven.org/maven2/org/apache/spark/spark-hadoop-cloud_2.12/3.4.3/spark-hadoop-cloud_2.12-3.4.3.jar
wget  https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-cloud-storage/3.3.6/hadoop-cloud-storage-3.3.6.jar
wget  https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.3.6/hadoop-aws-3.3.6.jar
wget https://maven.aliyun.com/repository/public/com/amazonaws/aws-java-sdk-bundle/1.12.367/aws-java-sdk-bundle-1.12.367.jar
# 添加 Paimon集成相关依赖
wget  https://repo1.maven.org/maven2/org/apache/paimon/paimon-spark-3.4/0.9.0/paimon-spark-3.4-0.9.0.jar
# 如果Kubernetes 的发行版使用的是 K3s 、RKE2等,还需要加入以下依赖
wget  https://repo1.maven.org/maven2/org/bouncycastle/bcpkix-jdk18on/1.77/bcpkix-jdk18on-1.77.jar
wget  https://repo1.maven.org/maven2/org/bouncycastle/bcprov-jdk18on/1.77/bcprov-jdk18on-1.77.jar
cd ..
 

构建docker镜像

docker buildx build --load --platform linux/arm64 --tag spark-paimon-s3:3.4.3_2.12 .

查看镜像架构

docker inspect --format '{{.Architecture}}' azul/zulu-openjdk:17.0.9-17.46.19-jre

docker images
docker save -o jdk.tar azul/zulu-openjdk:17.0.9-17.46.19-jre
docker save -o flink.tar flink:1.19-scala_2.12-java17
docker pull --platform linux/arm64 azul/zulu-openjdk:17.0.9-17.46.19-jre
docker inspect --format '{{.Architecture}}' azul/zulu-openjdk:17.0.9-17.46.19-jre
docker buildx ls
 

x86上构建Arm镜像参考地址Centos7的x86上构建arm镜像docker_centos7 arm镜像-CSDN博客

将Dockerfile拷贝到当前目录下

FROM azul/zulu-openjdk:17.0.9-17.46.19-jre
ARG spark_uid=185

ENV HADOOP_CONF_DIR=/etc/hadoop/conf


# Before building the docker image, first build and make a Spark distribution following
# the instructions in https://spark.apache.org/docs/latest/building-spark.html.
# If this docker file is being used in the context of building your images from a Spark
# distribution, the docker build command should be invoked from the top level directory
# of the Spark distribution. E.g.:
# docker build -t spark:latest -f kubernetes/dockerfiles/spark/Dockerfile .

RUN set -ex && \
    apt-get update && \
    ln -s /lib /lib64 && \
    apt install -y bash tini libc6 libpam-modules krb5-user libnss3 procps net-tools && \
    mkdir -p /opt/spark && \
    mkdir -p /opt/spark/examples && \
    mkdir -p /opt/spark/work-dir && \
    touch /opt/spark/RELEASE && \
    rm /bin/sh && \
    ln -sv /bin/bash /bin/sh && \
    echo "auth required pam_wheel.so use_uid" >> /etc/pam.d/su && \
    chgrp root /etc/passwd && chmod ug+rw /etc/passwd && \
    rm -rf /var/cache/apt/* && rm -rf /var/lib/apt/lists/*

COPY jars /opt/spark/jars
# Copy RELEASE file if exists
COPY RELEAS[E] /opt/spark/RELEASE
COPY bin /opt/spark/bin
COPY sbin /opt/spark/sbin
COPY kubernetes/dockerfiles/spark/entrypoint.sh /opt/
COPY kubernetes/dockerfiles/spark/decom.sh /opt/
COPY examples /opt/spark/examples
COPY kubernetes/tests /opt/spark/tests
COPY data /opt/spark/data


ENV SPARK_HOME /opt/spark

WORKDIR /opt/spark/work-dir
RUN chmod g+w /opt/spark/work-dir
RUN chmod a+x /opt/decom.sh

ENTRYPOINT [ "/opt/entrypoint.sh" ]

# Specify the User that the actual main process will run as
USER ${spark_uid}

执行构建镜像的命令

docker buildx build --load --platform linux/arm64 --tag spark-paimon-s3:3.4.3_2.12 .

得到基础镜像spark-paimon-s3:3.4.3_2.12

参考地址ApachePaimon 实践系列1-环境准备 (qq.com)
 

2、编写程序 

KafkaSparkPaimonS3

使用spark读取消费kafka,将固定格式的数据保存到S3协议的对象存储上,

这里s3使用了Minio

程序代码

package com.example.cloud;

import org.apache.spark.sql.streaming.{DataStreamReader, StreamingQuery}
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

object KafkaSparkPaimonS3 {
  def main(args: Array[String]): Unit = {
    val kafkaConsumer: String = "kafka-service:9092"
    val kafkaTopic: String = "mysql-flink-cdc-kafka"
    val startingOffsets: String = "latest"
    val kafkaGroupId: String = "KafkaSparkPaimonS3Group"
    val failOnDataLoss: Boolean = false
    val maxOffsetsPerTrigger: Int = 3000
    val lakePath: String = "s3a://paimon/warehouse"
    val checkpointLocation: String = "s3a://spark/checkpoints"
    val s3endpoint: String = "http://minio:9000"
    val s3access: String = "uotAvnxXwcz90yNxWhq2"
    val s3secret: String = "MlDBAOfRDG9lwFTUo9Qic9dLbuFfHsxJfwkjFD4v"
    val schema_base = StructType(List(
      StructField("before", StringType),
      StructField("after", StringType),
      StructField("source", MapType(StringType, StringType)),
      StructField("op", StringType),
      StructField("ts_ms", LongType),
      StructField("transaction", StringType)
    ))
    println("create spark session ..........................................................")
    val sparkConf = SparkSession.builder()
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .config("sspark.sql.catalog.paimon.metastore", "filesystem")
      .config("spark.sql.catalog.paimon.warehouse", lakePath)
      .config("spark.sql.catalog.paimon.s3.endpoint", s3endpoint)
      .config("spark.sql.catalog.paimon.s3.access-key", s3access)
      .config("spark.sql.catalog.paimon.s3.secret-key", s3secret)
      .config("spark.sql.catalog.paimon", "org.apache.paimon.spark.SparkCatalog")
      .config("spark.sql.catalog.paimon.s3.path-style.access", "true")
      .config("spark.sql.extensions", "org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions")
      .config("spark.sql.catalog.paimon.s3.path-style.access", "true")
      .config("spark.delta.logStore.class", "org.apache.spark.sql.delta.storage.S3SingleDriverLogStore")
      .config("spark.hadoop.fs.s3a.multipart.size", "104857600")
      .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
      .config("spark.hadoop.fs.s3a.access.key", s3access)
      .config("spark.hadoop.fs.s3a.secret.key", s3secret)
      .config("spark.hadoop.fs.s3a.endpoint", s3endpoint)
      .config("spark.hadoop.fs.s3a.connection.timeout", "200000")
    val sparkSession: SparkSession = sparkConf.getOrCreate()
    println("get spark DataStreamReader start  ..........................................................")
    val dsr: DataStreamReader = sparkSession
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", kafkaConsumer)
      .option("subscribe", kafkaTopic)
      .option("startingOffsets", startingOffsets)
      .option("failOnDataLoss", failOnDataLoss)
      .option("maxOffsetsPerTrigger", maxOffsetsPerTrigger)
      .option("kafka.group.id", kafkaGroupId)
      .option("includeHeaders", "true")
    println("get spark DataStreamReader end  ..........................................................")
    val df: DataFrame = dsr.load()
    println("配置kafka消费流 spark DataFrame end  ..........................................................")
    import org.apache.spark.sql.functions._
    import sparkSession.implicits._
    val frame: Dataset[Row] = df.select(from_json('value.cast("string"), schema_base) as "value").select($"value.*")
      .alias("data")
      .select(
        get_json_object($"data.after", "$.uuid").as("uuid"),
        get_json_object($"data.after", "$.product").as("product"),
        get_json_object($"data.after", "$.promotion").as("promotion"),
        get_json_object($"data.after", "$.value_added_service").as("value_added_service"),
        get_json_object($"data.after", "$.logistics").as("logistics"),
        get_json_object($"data.after", "$.weight").as("weight"),
        get_json_object($"data.after", "$.color").as("color"),
        get_json_object($"data.after", "$.version").as("version"),
        get_json_object($"data.after", "$.shop").as("shop"),
        get_json_object($"data.after", "$.evaluate").as("evaluate"),
        get_json_object($"data.after", "$.order_num").as("order_num"),
        get_json_object($"data.after", "$.rider").as("rider"),
        get_json_object($"data.after", "$.order_time").as("order_time"),
        get_json_object($"data.after", "$.create_time").as("create_time"),
        get_json_object($"data.after", "$.pay_price").as("pay_price"),
        get_json_object($"data.after", "$.pay_type").as("pay_type"),
        get_json_object($"data.after", "$.address").as("address")
      )
    println("get spark Dataset from kafka  ..........................................................")
    sparkSession.sql("USE paimon;")
    println("spark engine use paimon catalog ..........................................................")
    sparkSession.sql("create database m31094;")
    println("create my favourite database for u ..........................................................")
    val tablePath = "paimon.m31094.my_table"
    println("create table to store data  ..........................................................")
    sparkSession.sql("use m31094;")
    sparkSession.sql(
      s"""
          CREATE TABLE IF NOT EXISTS $tablePath (
              uuid STRING,
              product STRING,
              promotion STRING,
              value_added_service STRING,
              logistics STRING,
              weight STRING,
              color STRING,
              version STRING,
              shop STRING,
              evaluate STRING,
              order_num STRING,
              rider STRING,
              order_time STRING,
              create_time STRING,
              pay_price STRING,
              pay_type STRING,
              address STRING
          ) TBLPROPERTIES (
                'partitioned_by' = 'uuid'
            )
      """)
    println("将 DataFrame 写入 Paimon 表  ..........................................................")

    println("尽可能的详细打印数据吧哈哈哈哈 ..........................................................")

    val query: StreamingQuery = frame //是一个已经创建的 Dataset[Row],通常是从流数据源(如 Kafka、文件等)获得的数据。
      .writeStream //开始一个流式写入操作。
      .foreachBatch { (batchDF: Dataset[Row], batchId: Long) =>
        println(s"处理批量流的UID是 batch ID: $batchId")
        // 打印当前批次的数据
        println("莫醒醒..........................................................")
        batchDF.show(truncate = false) // 设置 truncate = false 以完整显示列内容
      }
      .format("paimon")
      //指定数据输出格式为 Paimon。
      .option("write.merge-schema", "true")
      //允许在写入时合并模式(schema),即动态更新表的模式以适应新数据。
      .option("write.merge-schema.explicit-cast", "true")
      //在合并模式时,明确转换数据类型,以确保兼容性和正确性。
      .outputMode("append")
      //指定输出模式为追加模式,表示只将新的数据行添加到目标表中,不会更新或删除已有的数据。
      .option("checkpointLocation", checkpointLocation)
      //设置检查点位置,这对于流处理非常重要,有助于在故障恢复时重新启动流处理任务。
      .start("s3a://paimon/warehouse/m31094.db/my_table") //启动流式查询并将数据写入指定的 S3 路径
    println("spark流通过paimon方式写入数据湖 ..........................................................")
    println("查看数据内容和结构  ..........................................................")
    println(df.schema) // 打印 Schema
    println("打印 Schema  ..........................................................")
    println("Stream processing started...")
    query.awaitTermination() //使当前线程等待,直到流查询结束。这意味着程序会持续运行,直到手动停止或出现错误。
    println("流处理已结束,程序终止。")
  }
}
 

 pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.example.cloud</groupId>
    <artifactId>KafkaSparkPaimonS3</artifactId>
    <version>2.4.5</version>
    <name>KafkaSparkPaimonS3</name>
    <properties>
        <java.version>1.8</java.version>
        <scala.binary.version>2.12</scala.binary.version>
        <spark.version>3.4.1</spark.version>
        <paimon.version>0.9.0</paimon.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>com.amazonaws</groupId>
            <artifactId>aws-java-sdk-bundle</artifactId>
            <version>1.12.367</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-aws</artifactId>
            <version>3.3.6</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>3.3.6</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.14.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-text</artifactId>
            <version>1.8</version>
        </dependency>
        <dependency>
            <groupId>org.apache.paimon</groupId>
            <artifactId>paimon-spark-common</artifactId>
            <version>${paimon.version}</version>
        </dependency>
       <dependency>
            <groupId>org.apache.paimon</groupId>
            <artifactId>paimon-s3</artifactId>
            <version>${paimon.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.paimon</groupId>
            <artifactId>paimon-spark-3.4</artifactId>
            <version>${paimon.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.paimon</groupId>
            <artifactId>paimon-s3-impl</artifactId>
            <version>${paimon.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.12</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.12</artifactId>
            <version>${spark.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.commons</groupId>
                    <artifactId>commons-text</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.12</artifactId>
            <version>${spark.version}</version>
            <exclusions>
                <exclusion>
                    <artifactId>audience-annotations</artifactId>
                    <groupId>org.apache.yetus</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql-kafka-0-10_2.12</artifactId>
            <version>${spark.version}</version>
        </dependency>
       <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-token-provider-kafka-0-10_2.12</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>5.8.20</version>
        </dependency>
    </dependencies>
    <build>
        <finalName>${project.artifactId}</finalName>
        <resources>
            <resource>
                <directory>src/main/resources</directory>
            </resource>
        </resources>
        <plugins>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <executions>
                    <execution>
                        <id>scala-compile-first</id>
                        <phase>process-resources</phase>
                        <goals>
                            <goal>add-source</goal>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                    <execution>
                        <id>scala-test-compile</id>
                        <phase>process-test-resources</phase>
                        <goals>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <archive>
                        <manifest>
                            <mainClass>com.example.cloud.KafkaSparkPaimonS3</mainClass>
                        </manifest>
                    </archive>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-dependency-plugin</artifactId>
                <version>3.1.0</version>
                <executions>
                    <execution>
                        <phase>prepare-package</phase>
                        <goals>
                            <goal>copy-dependencies</goal>
                        </goals>
                        <configuration>
                            <outputDirectory>${project.build.directory}/lib</outputDirectory>
                            <excludeTransitive>false</excludeTransitive>
                            <stripVersion>false</stripVersion>
                            <includeScope>runtime</includeScope>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-resources-plugin</artifactId>
                <executions>
                    <execution>
                        <id>copy-resources</id>
                        <phase>package</phase>
                        <goals>
                            <goal>copy-resources</goal>
                        </goals>
                        <configuration>
                            <encoding>UTF-8</encoding>
                            <outputDirectory>
                                ${project.build.directory}/config
                            </outputDirectory>
                            <resources>
                                <resource>
                                    <directory>src/main/resources/</directory>
                                </resource>
                            </resources>
                        </configuration>
                    </execution>
                    <execution>
                        <id>copy-sh</id>
                        <phase>package</phase>
                        <goals>
                            <goal>copy-resources</goal>
                        </goals>
                        <configuration>
                            <encoding>UTF-8</encoding>
                            <outputDirectory>
                                ${project.build.directory}
                            </outputDirectory>
                            <resources>
                                <resource>
                                    <directory>bin/</directory>
                                </resource>
                            </resources>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>8</source>
                    <target>8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>
 

Dockerfile

FROM spark-paimon-s3:3.4.3_2.12
RUN mkdir -p /opt/spark/examples/jars
COPY target /opt/spark/examples/jars  

构建镜像的命令

docker buildx build --load --platform linux/arm64 --tag  spark-paimon-s3-app:3.4.3_2.12 --no-cache .
docker save -o spark-paimon-s3-app.tar spark-paimon-s3-app:3.4.3_2.12 

3、配置minio

minio.yaml

apiVersion: apps/v1
kind: Deployment
metadata:
  name: minio
  namespace: default
spec:
  replicas: 1
  selector:
    matchLabels:
      app: minio
  template:
    metadata:
      labels:
        app: minio
    spec:
      containers:
        - name: minio
          image: minio/minio:latest
          imagePullPolicy: IfNotPresent
          args:
            - server
            - /data
          env:
            - name: MINIO_ROOT_USER
              value: "admin"
            - name: MINIO_ROOT_PASSWORD
              value: "密码"
          command:
            - /bin/sh
            - -c
            - minio server /data --console-address ":5000"
          ports:
            - name: api
              protocol: TCP
              containerPort: 9000
            - name: ui
              protocol: TCP
              containerPort: 5000
          volumeMounts:
            - name: minio-storage
              mountPath: /data
      volumes:
        - name: minio-storage
          persistentVolumeClaim:
            claimName: minio-pvc
---
apiVersion: v1
kind: Service
metadata:
  name: minio
  namespace: default
spec:
  selector:
    app: minio
  type: NodePort
  ports:
    - name: api
      protocol: TCP
      port: 9000
      targetPort: 9000
    - name: ui
      protocol: TCP
      port: 5000
      targetPort: 5000

minio-pvc.yaml

apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: minio-pvc  # PVC 的名称
  namespace: default
spec:
  accessModes:
    - ReadWriteMany  # 访问模式,此处为单节点读写
  resources:
    requests:
      storage: 100Gi  # 请求的存储容量大小
  storageClassName: nfs-client  # 存储类,根据需要选择 

4、运行程序

4.1、springboot -mysql产生原始数据

产生的MySQL原始数据

4.2 数据从MySQL到kafka

mysql->flink cdc->kafka

MysqlFlinkCdcToKafka

在k8s上提交flink任务

/home/d/flink/bin/flink run-application --target kubernetes-application -Dkubernetes.namespace=default -Dkubernetes.cluster-id=flink-cdc-mysql -Dkubernetes.container.image.ref=flinkcdctokafka:0.1-snapshot -Dkubernetes.container.image.pull-policy=IfNotPresent -Dkubernetes.service-account=default -Dkubernetes.rest-service.exposed.type=NodePort -Djobmanager.memory.process.size=2048mb -Dtaskmanager.memory.process.size=2024mb -Dtaskmanager.numberOfTaskSlots=1 -Dhigh-availability.type=kubernetes -Dhigh-availability.storageDir=s3a://flink-cdc/recovery -Dstate.checkpoints.dir=s3a://flink-cdc/flink_cp -Dstate.savepoints.dir=s3a://flink-cdc/flink_sp -Dstate.backend.incremental=true -Ds3.access-key=uotAvnxXwcz90yNxWhq2 -Ds3.secret-key=MlDBAOfRDG9lwFTUo9Qic9dLbuFfHsxJfwkjFD4v -Ds3.path.style.access=true -Ds3.endpoint=http://minio:9000 -Duser.timezone=Asia/Shanghai -c "com.example.cloud.MysqlFlinkCdcToKafka" local:///opt/flink/usrlib/MysqlFlinkCdcToKafka-jar-with-dependencies.jar

通过flink cdc将MySQL的数据写入到kafka的指定topic 

4.3 kafka到minio

kafka-spark-minio

spark提交命令,提交spark任务到k8s集群中运行

/opt/streaming/spark-3.4.3-bin-hadoop3/bin/spark-submit --name KafkaSparkPaimonS3 --master spark://10.10.10.99:7077 --deploy-mode client --driver-cores 2 --driver-memory 4g --num-executors 2 --executor-cores 2 --executor-memory 4g --class com.example.cloud.KafkaSparkPaimonS3 --conf spark.driver.extraClassPath=/opt/streaming/spark-3.4.3-bin-hadoop3/jars --conf spark.executor.extraClassPath=/opt/streaming/spark-3.4.3-bin-hadoop3/jars --jars /opt/lib/kafka-clients-3.8.0.jar,/opt/lib/spark-sql-kafka-0-10_2.13-3.4.3.jar,/opt/lib/spark-token-provider-kafka-0-10_2.13-3.4.3.jar /opt/KafkaSparkPaimonS3-jar-with-dependencies.jar

本地spark运行,可以通过spark sql查询数据的情况

本地执行spark-sql

/opt/streaming/spark-3.4.3-bin-hadoop3/bin/spark-sql --jars /opt/lib/paimon-spark-3.4-0.9.0.jar --conf 'spark.sql.catalog.paimon.metastore=filesystem' --conf 'spark.sql.catalog.paimon.warehouse=s3a://paimon/warehouse' --conf 'spark.sql.catalog.paimon.s3.endpoint=http://10.10.10.99:31212' --conf 'spark.sql.catalog.paimon.s3.access-key=uotAvnxXwcz90yNxWhq2' --conf 'spark.sql.catalog.paimon.s3.secret-key=MlDBAOfRDG9lwFTUo9Qic9dLbuFfHsxJfwkjFD4v' --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' --conf 'spark.sql.catalog.paimon=org.apache.paimon.spark.SparkCatalog' --conf 'spark.sql.extensions=org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions' --conf 'spark.sql.catalog.paimon.s3.path-style.access=true' --conf 'spark.delta.logStore.class=org.apache.spark.sql.delta.storage.S3SingleDriverLogStore' --conf 'spark.hadoop.fs.s3a.multipart.size=104857600' --conf 'spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem' --conf 'spark.hadoop.fs.s3a.access.key=uotAvnxXwcz90yNxWhq2' --conf 'spark.hadoop.fs.s3a.secret.key=MlDBAOfRDG9lwFTUo9Qic9dLbuFfHsxJfwkjFD4v' --conf 'spark.hadoop.fs.s3a.endpoint=http://10.10.10.99:31212' --conf 'spark.hadoop.fs.s3a.connectiopaimonn.timeout=200000'

 执行上面的本地spark-sql,开启spark终端后

use paimon;

use databases;

5、运行效果

 6、minio上存储

flink数据同步

 k8s上部署的容器服务

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/10607.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

系统安全第七次作业题目及答案

一、 1.RBAC0 RBAC1 RBAC2 RBAC3 2.属性 身份标识 3.接入访问控制 资源访问控制 网络端口和节点的访问控制 二、 1.B 2.A 3.ABE 4.BCD 5.ABC 三、 1. 答&#xff1a;基于属性的访问控制&#xff08;ABAC&#xff09;是通过对实体属性添加约束策略的方式实现主、客体之…

【GESP】C++一级真题练习(202312)luogu-B3922,小杨报数

GESP一级真题练习。为2023年12月一级认证真题。for循环和取余计算应用。 题目题解详见&#xff1a;https://www.coderli.com/gesp-1-luogu-b3922/ 【GESP】C一级真题练习(202312)luogu-B3922&#xff0c;小杨报数 | OneCoderGESP一级真题练习。为2023年12月一级认证真题。for…

国科大现代信息检索技术第一次作业

第一次作业 题目1&#xff1a;考虑以下文档 文档名内容文档1new home sales top forecasts文档2home prices rise in june文档3increase in home sales in june文档4july new home sales rise 1、画出文档集对应的词项-文档矩阵 文档1文档2文档3文档4forecasts1000home1111…

计算机视觉实验四:特征检测与匹配

特征检测与匹配 1 角点检测算法实验 1.1 实验目的与要求 &#xff08;1&#xff09;了解及掌握角点检测算法原理。 &#xff08;2&#xff09;掌握在MATLAB中角点算法的编程。 &#xff08;3&#xff09;掌握Moravec&#xff0c;Harris与SUSAN算法的差异。 1.2 实验原理及…

十八:Spring Boot 依赖(3)-- spring-boot-starter-data-jpa 依赖详解

目录 1. 理解 JPA&#xff08;Java Persistence API&#xff09; 1.1 什么是 JPA&#xff1f; 1.2 JPA 与 Hibernate 的关系 1.3 JPA 的基本注解&#xff1a;Entity, Table, Id, GeneratedValue 1.4 JPA 与数据库表的映射 2. Spring Data JPA 概述 2.1 什么是 Spring Dat…

如何用C++代码实现一颗闪烁的爱心?

要用 C 实现爱心闪烁效果&#xff0c;我们可以使用控制台输出文本&#xff0c;并通过在控制台中刷新屏幕来模拟闪烁的效果。由于 C 本身没有类似 turtle 这样的图形库&#xff0c;操作控制台输出的方式比较简单&#xff0c;主要通过字符绘制和时间延迟来实现。 这里给出一个基…

基于美颜SDK的实时视频美颜平台开发:技术难点与解决方案

美颜SDK作为视频美颜平台的核心&#xff0c;提供了多种美颜功能。这些功能通过调整参数实现对人脸特征的优化。在架构设计上&#xff0c;美颜SDK主要包括以下几部分&#xff1a; 1.人脸检测与特征点识别&#xff1a;通过深度学习模型&#xff0c;识别人脸并标记出关键特征点&a…

web实操4——servlet体系结构

servlet体系结构 我们基本都只实现service方法&#xff0c;其余几个都不用&#xff0c; 之前我们直接实现servlet接口&#xff0c;所有的方法都必须实现&#xff0c;不用也得写&#xff0c;不然报错&#xff0c;写了又不用当摆设。 能不能只要定义一个service方法就可以&…

数据分析反馈:提升决策质量的关键指南

内容概要 在当今快节奏的商业环境中&#xff0c;数据分析与反馈已成为提升决策质量的重要工具。数据分析不仅能为企业提供全面的市场洞察&#xff0c;还能帮助管理层深入了解客户需求与行为模式。掌握数据收集的有效策略和工具&#xff0c;企业能够确保获得准确且相关的信息&a…

香港航空 m端 腾讯滑块分析

声明: 本文章中所有内容仅供学习交流使用&#xff0c;不用于其他任何目的&#xff0c;抓包内容、敏感网址、数据接口等均已做脱敏处理&#xff0c;严禁用于商业用途和非法用途&#xff0c;否则由此产生的一切后果均与作者无关&#xff01; 有相关问题请第一时间头像私信联系我删…

[2024最新] macOS 发起 Bilibili 直播(不使用 OBS)

文章目录 1、B站账号 主播认证2、开启直播3、直播设置添加素材、隐私设置指定窗口添加/删除 窗口 4、其它说明官方直播帮助中心直播工具教程 目前搜到的 macOS 直播教程都比较古早&#xff0c;大部分都使用 OBS&#xff0c;一番探索下来&#xff0c;发现目前已经不需要 OBS了&a…

大数据-210 数据挖掘 机器学习理论 - 逻辑回归 scikit-learn 实现 penalty solver

点一下关注吧&#xff01;&#xff01;&#xff01;非常感谢&#xff01;&#xff01;持续更新&#xff01;&#xff01;&#xff01; 目前已经更新到了&#xff1a; Hadoop&#xff08;已更完&#xff09;HDFS&#xff08;已更完&#xff09;MapReduce&#xff08;已更完&am…

【Linux】【线程操作与同步】汇总整理

线程&#xff08;Threads&#xff09;是现代操作系统中用于并发执行的基本单元。一个进程可以包含一个或多个线程&#xff0c;每个线程都可以独立执行一段程序代码&#xff0c;共享进程的资源&#xff08;如内存&#xff09;&#xff0c;但拥有自己的栈空间和寄存器状态。下面是…

免费送源码:Java+springboott+MySQL+Tomcat 游戏攻略网站设计与实现 计算机毕业设计原创定制

摘 要 随着国民生活水平的逐渐提高&#xff0c;每逢假期或空闲时节走出家门游山玩水已渐渐成为人们生活的一部分。互联网的普及给人们带来的便利不需多说&#xff0c;因此如果把游戏产业与互联网结合起来&#xff0c;利用Java技术建设游戏攻略网站&#xff0c;实现游戏资讯管理…

从0开始的STM32之旅8 串口通信(II)

目录 在开始理解底层原理之前&#xff0c;我们先尝试一下 怎么做 进一步理解 HAL_UART_Transmit HAL_UART_Receive 在开始理解底层原理之前&#xff0c;我们先尝试一下 现在我们综合一下&#xff0c;要求完成如下的事情&#xff1a; 在主程序中存在一个flag变量描述当前有…

springboot的增删改查商城小实践(b to c)

首先准备一张表&#xff0c;根据业务去设计表 订单编号是参与业务的&#xff0c;他那订单编号里面是有特殊意义的&#xff0c;比如说像什么一些年月日什么的&#xff0c;一些用户的ID都在那编号里面呢&#xff1f;不能拿这种东西当主件啊 根据数据量去决定数据类型 价格需要注意…

AndroidStudio-视图基础

一、设置视图的宽高 1.在XML文件中设置视图宽高 视图宽度通过属性android:layout_width表达&#xff0c;视图高度通过属性android:layout_height表达&#xff0c;宽高的取值主要有下列三种: &#xff08;1&#xff09;wrap_content:表示与内容自适应。对于文本视图来说&…

电子科大、同济大学与新加坡国立大学联合发布Math-LLaVA:增强多模态大语言模型的数学推理能力

一、结论写在前面 下面介绍的论文来自&#xff1a;电子科技大学、新加坡科技设计大学、同济大学、新加坡国立大学。 论文标题&#xff1a;Math-LLaVA: Bootstrapping Mathematical Reasoning for Multimodal Large Language Models 论文链接&#xff1a;https://arxiv.org/p…

高校体育场管理系统+ssm

摘 要 随着我国经济迅速发展&#xff0c;人们对手机的需求越来越大&#xff0c;各种手机软件也都在被广泛应用&#xff0c;但是对于手机进行数据信息管理&#xff0c;对于手机的各种软件也是备受用户的喜爱&#xff0c;高校体育场管理系统被用户普遍使用&#xff0c;为方便用户…

杂谈:业务说的场景金融是什么?

引言&#xff1a;市场格局的转变 在供应短缺的年代&#xff0c;是典型的卖方市场。为了保证稳定供货&#xff0c;买方会提前一段时间下单&#xff0c;也几乎没什么议价能力。卖方只需等着接单就行。 现在很多领域的供应商数量越来越多&#xff0c;而且随着互联网的普及&#…