HBase DML操作
DML操作主要是关于对表格内部数据的增删改查。
- HbaseDML
package org.hbase;import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.ColumnValueFilter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.util.Bytes;import java.io.IOException;public class HbaseDML {public static Connection connection = HBaseConnection.connection;/*** 向指定的HBase表中插入一个单元格数据。** @param namespace 命名空间,HBase中用于组织表的逻辑分组。* @param tableName 表名,HBase中的表标识。* @param rowKey 行键,用于唯一标识表中的一行。* @param columnFamily 列族名,HBase表中列的分组。* @param columnName 列名,列族下的具体列。* @param value 要插入的单元格值。* @throws IOException 如果与HBase的通信出现问题或插入数据时发生错误,将抛出此异常。*/public static void putCell(String namespace, String tableName, String rowKey,String columnFamily, String columnName, String value) throws IOException {// 获取HBase连接中的指定表Table table = connection.getTable(TableName.valueOf(namespace, tableName));// 创建一个Put对象,用于存储要插入的行数据Put put = new Put(Bytes.toBytes(rowKey));// 向Put对象中添加列族、列名和对应的值put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(columnName), Bytes.toBytes(value));// 尝试将Put对象写入HBase表try {table.put(put);} catch (IOException e) {// 如果发生IO异常,将其封装为RuntimeException并抛出throw new RuntimeException("Failed to put cell data to HBase table", e);}// 关闭表连接,释放资源table.close();}/*** 从HBase表中获取指定行与列的数据。** @param namespace 命名空间,用于在HBase中逻辑上组织表。* @param tableName 表名,标识HBase中的具体表。* @param rowKey 行键,唯一标识表中的一行数据。* @param columnFamily 列族名,表示HBase表中列的分组。* @param columnName 列名,在指定列族下的具体列。* @throws IOException 若与HBase通信出错或数据检索失败,则抛出此异常。*/public static void getCells(String namespace, String tableName, String rowKey,String columnFamily, String columnName) throws IOException {// 根据命名空间和表名获取HBase连接中的对应表Table table = connection.getTable(TableName.valueOf(namespace, tableName));// 创建一个Get对象,用于指定要检索的行数据Get get = new Get(Bytes.toBytes(rowKey));// 添加要检索的列族与列名get.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(columnName));// 设置读取所有版本的数据(若表启用了版本控制)// 注:检查HBase的设置,如果VERSIONS,设置为1,这意味着HBase将只保留该列族中每个单元格的最新版本。get.readAllVersions();try {// 执行Get操作,从表中检索数据,并得到Result对象Result result = table.get(get);// 从Result对象中提取原始单元格数据Cell[] cells = result.rawCells();// 遍历并处理检索到的单元格数据for (Cell cell : cells) {// 使用CellUtil工具类从单元格中提取值,并转换为字符串String value = new String(CellUtil.cloneValue(cell));// 输出单元格值(实际应用中可能进行不同处理)System.out.println(value);}} catch (Exception e) {// 打印异常堆栈信息(生产环境中应进行更细致的错误处理)e.printStackTrace();}// 确保在方法结束时关闭表连接,释放资源table.close();}/*** 扫描HBase表中的行,并打印出每行中的单元格信息。** @param namespace 命名空间,HBase中表的命名空间,通常对应于数据库的概念。* @param tableName 表名,HBase中的表名。* @param startRow 扫描的起始行键,包含此行。* @param stopRow 扫描的停止行键,不包含此行(即扫描到stopRow之前的行)。* @throws IOException 如果与HBase的通信出现问题,则抛出此异常。*/public static void scanRows(String namespace, String tableName, String startRow, String stopRow) throws IOException {// 根据命名空间和表名获取HBase表的引用Table table = connection.getTable(TableName.valueOf(namespace, tableName));// 创建一个新的扫描对象Scan scan = new Scan();// 设置扫描的起始行键(包含)scan.withStartRow(Bytes.toBytes(startRow));// 设置扫描的停止行键(不包含)scan.withStopRow(Bytes.toBytes(stopRow));try {// 获取扫描器,用于迭代表中的行ResultScanner scanner = table.getScanner(scan);// 迭代扫描器中的每个结果(即每行数据)for (Result result : scanner) {// 获取当前行的所有单元格Cell[] cells = result.rawCells();// 迭代当前行的每个单元格,并打印其信息for (Cell cell : cells) {System.out.print(// 单元格的行键new String(CellUtil.cloneRow(cell)) + "-" +// 单元格的列族new String(CellUtil.cloneFamily(cell)) + "-" +// 单元格的列限定符(即列名)new String(CellUtil.cloneQualifier(cell)) + "-" +// 单元格的值new String(CellUtil.cloneValue(cell)) + "\t");}// 打印完当前行的所有单元格后,换行System.out.println();}// 关闭扫描器,释放资源scanner.close();} catch (IOException e) {// 如果出现IO异常,打印堆栈跟踪e.printStackTrace();}// 关闭表的引用,释放资源table.close();}/*** 根据指定的命名空间、表名、起始行、结束行、列族、列名和值对HBase表进行过滤扫描。** @param namespace HBase表的命名空间。* @param tableName HBase表的名称。* @param startRow 扫描的起始行键(包含)。* @param stopRow 扫描的停止行键(不包含)。* @param columnFamily 要过滤的列族名称。* @param columnName 要过滤的列名。* @param value 用于过滤的值。* @throws IOException 如果与HBase的交互发生IO异常。*/public static void filterScan(String namespace, String tableName, String startRow, String stopRow, String columnFamily, String columnName, String value) throws IOException {// 根据命名空间和表名获取HBase表的引用Table table = connection.getTable(TableName.valueOf(namespace, tableName));// 创建一个新的扫描对象Scan scan = new Scan();// 设置扫描的起始行键(包含)scan.withStartRow(Bytes.toBytes(startRow)); // 将字符串转换为字节数组// 设置扫描的停止行键(不包含)scan.withStopRow(Bytes.toBytes(stopRow)); // 将字符串转换为字节数组// 创建过滤器列表,可以添加多个过滤器FilterList filterList = new FilterList();// 创建过滤器:结果只保留当前列的数据,且列的值等于指定的值ColumnValueFilter columnValueFilter = new ColumnValueFilter(Bytes.toBytes(columnFamily), // 列族名转换为字节数组Bytes.toBytes(columnName), // 列名转换为字节数组CompareOperator.EQUAL, // 比较操作符:相等Bytes.toBytes(value) // 值转换为字节数组);// 创建过滤器:结果保留整行数据,但只有当指定列的值等于指定值时才返回该行// 注意:这与ColumnValueFilter不同,它不会过滤掉没有指定列的行SingleColumnValueFilter singleColumnValueFilter = new SingleColumnValueFilter(Bytes.toBytes(columnFamily), // 列族名转换为字节数组Bytes.toBytes(columnName), // 列名转换为字节数组CompareOperator.EQUAL, // 比较操作符:相等Bytes.toBytes(value) // 值转换为字节数组);// 将此过滤器加入到过滤器列表中// 注意:这里选择了singleColumnValueFilter,根据实际需求可以选择不同的过滤器filterList.addFilter(singleColumnValueFilter);// 添加过滤到扫描中scan.setFilter(filterList);try {// 获取扫描器,用于迭代表中的行ResultScanner scanner = table.getScanner(scan);// 迭代扫描器中的每个结果(即每行数据)for (Result result : scanner) {// 获取当前行的所有单元格Cell[] cells = result.rawCells();// 迭代当前行的每个单元格,并打印其信息for (Cell cell : cells) {System.out.print(// 单元格的行键new String(CellUtil.cloneRow(cell)) + "-" +// 单元格的列族new String(CellUtil.cloneFamily(cell)) + "-" +// 单元格的列限定符(即列名)new String(CellUtil.cloneQualifier(cell)) + "-" +// 单元格的值new String(CellUtil.cloneValue(cell)) + "\t");}// 打印完当前行的所有单元格后,换行System.out.println();}// 关闭扫描器,释放资源scanner.close();} catch (IOException e) {// 如果出现IO异常,打印堆栈跟踪e.printStackTrace();} finally {// 关闭表的引用,释放资源// 使用finally块确保表关闭,即使发生异常table.close();}}/*** 删除指定HBase表中特定行和列的数据。** @param namespace 表的命名空间(相当于数据库名)* @param tableName 表的名称* @param rowKey 行的键* @param columnFamily 列族名* @param columnName 列名* @throws IOException 如果操作过程中发生IO异常*/public static void deleteColumn(String namespace, String tableName, String rowKey, String columnFamily, String columnName) throws IOException {// 根据命名空间和表名获取HBase表对象Table table = connection.getTable(TableName.valueOf(namespace, tableName));// 创建一个Delete对象,用于删除指定行的数据Delete delete = new Delete(Bytes.toBytes(rowKey));// 添加要删除的列信息(删除指定版本)// 注意:此行代码只会删除指定列的一个版本,如果没有特别指定版本号,实际上效果可能不明显delete.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(columnName));// 添加要删除的列信息(删除所有版本)// 此行代码会删除指定列的所有版本的数据delete.addColumns(Bytes.toBytes(columnFamily), Bytes.toBytes(columnName));try {// 执行删除操作table.delete(delete);} catch (IOException e) {// 如果删除过程中发生IO异常,抛出运行时异常throw new RuntimeException("删除数据时发生IO异常", e);}// 确保在操作结束后关闭表连接,释放资源table.close();}public static void main(String[] args) throws IOException {//插入单条数据//putCell("bigdata", "student", "20240924", "info", "job", "45");//读取数据//getCells("bigdata", "student", "20240924", "info", "job");//扫描//scanRows("bigdata","student","0","20240925");//filterScan("bigdata","student","0","999999999","info","name","zz");//删除deleteColumn("bigdata", "student", "01", "info", "name");HBaseConnection.closeConnection();}}
- HbaseConnection
package org.hbase;import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import java.io.IOException;public class HBaseConnection {public static Connection connection =null;static {//创建连接//默认使用同步连接try {//读取本地文件connection = ConnectionFactory.createConnection();} catch (IOException e) {e.printStackTrace();}//异步连接创建//asyncConnectionCompletableFuture= ConnectionFactory.createAsyncConnection();}public static void closeConnection() throws IOException {if(connection!=null){connection.close();}}public static void main(String[] args) throws IOException {//使用连接System.out.println(HBaseConnection.connection);//关闭连接HBaseConnection.closeConnection();}
}
- pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.hbase</groupId><artifactId>Hbase01</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>RELEASE</version><scope>compile</scope></dependency><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-client</artifactId><version>2.4.17</version></dependency><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-client</artifactId><version>2.4.17</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.25</version></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.17</version></dependency></dependencies><build><plugins><plugin><!--声明--><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>3.3.0</version><!--具体配置--><configuration><archive><manifest><!--jar包的执行入口--><mainClass>org.hbase.HbaseDML</mainClass></manifest></archive><descriptorRefs><!--描述符,此处为预定义的,表示创建一个包含项目所有依赖的可执行 JAR 文件;允许自定义生成jar文件内容--><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><!--执行配置--><executions><execution><!--执行配置ID,可修改--><id>make-assembly</id><!--执行的生命周期--><phase>package</phase><goals><!--执行的目标,single表示创建一个分发包--><goal>single</goal></goals></execution></executions></plugin></plugins></build></project>