使用 Java SDK 连接 KaiwuDB
KaiwuDB SDK 是 KaiwuDB 官方提供的原生 Java 客户端开发工具包,基于 gRPC 协议与数据库通信。SDK 提供数据库管理、时序数据写入、测点数据查询等接口,适用于工业时序数据采集与查询场景。
前提条件
安装 Java JDK(推荐 JDK 8 或 JDK 11)。
安装 Maven(3.6 及以上版本)。
安装 KaiwuDB 数据库并完成 TLS 证书配置。SDK 当前仅支持 TLS 安全连接,需提供 CA 证书、客户端证书和 PKCS#8 格式的私钥文件(
.pk8)。KaiwuDB 默认生成的私钥为.key格式,需先执行以下命令转换:openssl pkcs8 -topk8 -inform PEM -outform DER \ -in client.root.key -out client.root.pk8 -nocrypt获取 KaiwuDB Java SDK 包(
kwdb-simple-sdk-<version>-fat.jar或kwdb-simple-sdk-<version>.jar)。
引入依赖
SDK 提供两种 jar 包:
- 胖 jar(
kwdb-simple-sdk-<version>-fat.jar):已内置所有依赖,直接加入项目 classpath 即可使用。推荐通过 Maven 管理时使用此方式。 - 瘦 jar(
kwdb-simple-sdk-<version>.jar):不含内置依赖,需同时将lib/目录下的所有依赖包加入 classpath。不建议将瘦 jar 安装到 Maven 本地仓库,依赖管理较为繁琐。瘦 jar 和lib/目录需从完整的 SDK install 包中获取。
如需通过 Maven 管理胖 jar 依赖,可将其安装到本地仓库后引用:
mvn install:install-file \
-Dfile=kwdb-simple-sdk-1.0.0-fat.jar \
-DgroupId=com.kwdb \
-DartifactId=kwdb-simple-sdk \
-Dversion=1.0.0 \
-Dpackaging=jar
连接数据库
创建客户端
通过 KwdbClient 构造函数指定服务器地址、端口和 TLS 配置,创建客户端实例。KwdbClient 实现了 Closeable 接口,建议使用 try-with-resources 管理其生命周期,确保连接在使用完毕后自动关闭。
ClientConfig.TlsConfig tlsConfig = ClientConfig.TlsConfig.builder()
.caCertFile("/path/to/ca.crt")
.clientCertFile("/path/to/client.root.crt")
.clientPk8File("/path/to/client.root.pk8")
.build();
try (KwdbClient client = new KwdbClient("127.0.0.1", 26257, tlsConfig)) {
// 在此处执行登录及后续操作
}
登录
创建客户端后,调用 Login 接口进行身份验证,并将返回的 Token 设置到客户端。Token 在当前连接周期内有效,调用 Logout 后 Token 失效,后续操作需重新登录并调用 SetToken 更新。
KwdbLoginResponse res = client.Login("username", "password");
if (res != null && res.getMessage().isEmpty()) {
client.SetToken(res.getToken());
}
接口说明
登录与登出接口
| 函数 | 功能 |
|---|---|
public KwdbLoginResponse Login(String username, String password) | 登录数据库,返回身份验证 Token。 |
public KwdbLogoutResponse Logout() | 登出数据库。 |
DDL 接口
| 函数 | 功能 |
|---|---|
public KwdbDdlResponse CreateDatabase(KwdbCreateDatabaseRequest request) | 创建时序数据库。 |
public KwdbDdlResponse DropDatabase(KwdbDropDatabaseRequest request) | 删除时序数据库。 |
public KwdbDdlResponse CreateTable(KwdbCreateTableRequest request) | 创建时序表。 |
public KwdbDdlResponse DropTable(KwdbDropTableRequest request) | 删除时序表。 |
public KwdbQueryResponse ShowDatabase(KwdbShowDatabaseRequest request) | 查看时序数据库信息。 |
public KwdbQueryResponse ShowTable(KwdbShowTableRequest request) | 查看时序表信息。 |
public KwdbQueryResponse ShowPoint(KwdbShowPointsRequest request) | 查看测点数据。 |
public KwdbQueryResponse ShowDevice(KwdbShowDevicesRequest request) | 查看设备数据。 |
建表注意事项:
- 时间戳列名称必须为
k_timestamp,类型必须设置为ColumnType.TIMESTAMP并调用setIsTimestamp(true)。 - 主标签列名称必须为
device_id,类型为ColumnType.VARCHAR,需同时调用setIsTag(true)和setIsPrimaryTag(true)。
插入接口
| 函数 | 功能 |
|---|---|
public KwdbInsertResponse Insert(KwdbGenericInsertRequest request) | 批量写入时序数据。 |
注意事项:
- 如果插入数据的时间戳与主标签值与已有记录完全相同,新数据将覆盖旧数据。
buildRowData的 Map key 必须与表中列名完全一致,device_id对应的 value 即为该设备的主标签值(测点路径,如a.b.c.d.1)。
查询接口
查询接口支持指定测点(ptag)、时间戳和补值策略,对单个或多个测点进行点查询。有关补值策略的详细说明,参见补值策略。
| 函数 | 功能 |
|---|---|
public KwdbQueryResponse QuerySinglePoint(String database, String table, String point, FillPolicy fillPolicy, String ts, FillParam fillParam, List<String> returnFields) | 单测点查询补值。 |
public KwdbQueryResponse QueryMultiplePoints(String database, String table, List<String> points, FillPolicy fillPolicy, String ts, FillParam fillParam, List<String> returnFields) | 多测点查询补值。 |
注意事项:
point/points参数传入的是主标签值(即device_id的值),不是列名。returnFields只能传入列名,不支持表达式或聚合函数。- 多测点查询时,所有
points必须属于同一张表。 ts参数为查询时间点,格式为yyyy-MM-dd HH:mm:ss。
KwdbQueryResponse 返回值说明:
| 方法 | 说明 |
|---|---|
getColumnsList() | 返回列名列表,顺序与 returnFields 一致。 |
getRowsList() | 返回数据行列表,每行通过 getValuesList() 获取各列的值(字符串格式)。 |
getMessage() | 返回执行结果信息,包括成功信息(如 create database succeeded)或错误信息。 |
补值策略
FillPolicy 和 FillParam 均为 KwdbClient 的内部类,使用时以 KwdbClient.FillPolicy 和 KwdbClient.FillParam 引用。
查询接口通过 FillPolicy 枚举指定补值策略,通过 FillParam 传递补值参数。传入 FillParam 构造方法的参数只能使用常量,不支持存储过程变量、PREPARE 执行的变量或其他表达式。
PREV、NEXT、CLOSER 三种策略共用 FillParam.forPrevNextCloser() 构造方法,实际补值方向由 FillPolicy 枚举值决定。
策略(FillPolicy) | 对应 FILL 子句 | 说明 | FillParam 构造方法及参数 |
|---|---|---|---|
EXACT | EXACT | 精确填充。无数据时,数值类型列返回 0,时间戳列返回查询时间点对应的时间戳。 | FillParam.forExact(),无需参数。 |
PREV | PREVIOUS | 前值填充。返回查询时间点之前最近的有效值。 | FillParam.forPrevNextCloser(long timeWindow),其中 timeWindow 为查找时间窗口,单位 ms,取值必须大于 0。 |
NEXT | NEXT | 后值填充。返回查询时间点之后最近的有效值。 | FillParam.forPrevNextCloser(long timeWindow),其中 timeWindow 为查找时间窗口,单位 ms,取值必须大于 0。 |
CLOSER | CLOSER | 最近值填充。返回前后时间点中距查询时间更近的有效值;前后距离相等时取前值。 | FillParam.forPrevNextCloser(long timeWindow),其中 timeWindow 为查找时间窗口,单位 ms,取值必须大于 0。 |
CONSTANT | CONSTANT | 常量填充。用指定常量填充各列的缺失值。 | FillParam.forConstant(String constantValue),其中 constantValue 为常量填充值。 |
LINEAR | LINEAR | 线性插值填充。根据前后有效值计算线性插值,仅支持数值类型列。前后有效值不同时存在时返回 0。 | FillParam.forLinear(long timeWindow, long step),其中 timeWindow 为向前查找时间窗口(before_range),step 为向后查找时间窗口(after_range),单位均为 ms,取值必须大于 0。 |
有关补值策略的详细说明,参见 FILL 子句。
配置示例
以下示例演示使用 Java SDK 进行登录、创建数据库和表、查看数据库和表信息、写入数据、查看测点和设备、单测点查询、多测点查询、删除表和数据库的完整流程。生产环境中请对各接口进行异常处理。如需了解更多 SQL 语句,参见 SQL 参考。
package com.kwdb.example;
import com.kwdb.Column;
import com.kwdb.ColumnType;
import com.kwdb.KwdbClient;
import com.kwdb.KwdbCreateDatabaseRequest;
import com.kwdb.KwdbCreateTableRequest;
import com.kwdb.KwdbDdlResponse;
import com.kwdb.KwdbDropDatabaseRequest;
import com.kwdb.KwdbDropTableRequest;
import com.kwdb.KwdbGenericInsertRequest;
import com.kwdb.KwdbInsertResponse;
import com.kwdb.KwdbLoginResponse;
import com.kwdb.KwdbQueryResponse;
import com.kwdb.KwdbShowDatabaseRequest;
import com.kwdb.KwdbShowDevicesRequest;
import com.kwdb.KwdbShowPointsRequest;
import com.kwdb.KwdbShowTableRequest;
import com.kwdb.Row;
import com.kwdb.config.ClientConfig;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
public class ExampleDemo {
public static void main(String[] args) {
String COL_TIMESTAMP = "k_timestamp";
String COL_VALUE = "value";
String COL_DEVID = "device_id";
List<String> returnFields = Arrays.asList(COL_TIMESTAMP, COL_VALUE, COL_DEVID);
// 1. 配置 TLS 连接
ClientConfig.TlsConfig tlsConfig = ClientConfig.TlsConfig.builder()
.caCertFile("/path/to/ca.crt")
.clientCertFile("/path/to/client.root.crt")
.clientPk8File("/path/to/client.root.pk8")
.build();
// KwdbClient 实现了 Closeable,使用 try-with-resources 确保连接自动关闭
try (KwdbClient client = new KwdbClient("127.0.0.1", 26257, tlsConfig)) {
// 2. 登录
KwdbLoginResponse loginRes = client.Login("username", "password");
if (loginRes != null && loginRes.getMessage().isEmpty()) {
client.SetToken(loginRes.getToken());
}
// 3. 创建时序数据库
KwdbCreateDatabaseRequest dbReq = KwdbCreateDatabaseRequest.newBuilder()
.setDatabase("tsdb")
.build();
KwdbDdlResponse dbCreateRes = client.CreateDatabase(dbReq);
System.out.println("CreateDatabase: " + dbCreateRes.getMessage());
// 4. 创建时序表
Column colDevId = Column.newBuilder()
.setName(COL_DEVID)
.setType(ColumnType.VARCHAR)
.setIsTag(true)
.setIsPrimaryTag(true)
.build();
Column colTs = Column.newBuilder()
.setName(COL_TIMESTAMP)
.setType(ColumnType.TIMESTAMP)
.setIsTimestamp(true)
.build();
Column colVal = Column.newBuilder()
.setName(COL_VALUE)
.setType(ColumnType.FLOAT)
.build();
KwdbCreateTableRequest tableReq = KwdbCreateTableRequest.newBuilder()
.setDatabase("tsdb")
.setTable("t1")
.addColumns(colDevId)
.addColumns(colTs)
.addColumns(colVal)
.build();
KwdbDdlResponse tableCreateRes = client.CreateTable(tableReq);
System.out.println("CreateTable: " + tableCreateRes.getMessage());
// 5. 查看时序数据库
KwdbQueryResponse showDbRes = client.ShowDatabase(
KwdbShowDatabaseRequest.newBuilder().setDatabase("tsdb").build());
printResult(showDbRes);
// 6. 查看时序表
KwdbQueryResponse showTableRes = client.ShowTable(
KwdbShowTableRequest.newBuilder().setDatabase("tsdb").setTable("t1").build());
printResult(showTableRes);
// 7. 写入数据
KwdbGenericInsertRequest insertReq = KwdbGenericInsertRequest.newBuilder()
.setDatabase("tsdb")
.setTable("t1")
.addData(KwdbClient.buildRowData(Map.of(
COL_TIMESTAMP, "2025-10-01 12:00:00",
COL_DEVID, "a.b.c.d.1",
COL_VALUE, "123.45")))
.addData(KwdbClient.buildRowData(Map.of(
COL_TIMESTAMP, "2025-10-01 12:00:00",
COL_DEVID, "a.b.c.d.2",
COL_VALUE, "67.89")))
.build();
KwdbInsertResponse insertRes = client.Insert(insertReq);
System.out.println("Inserted rows: " + insertRes.getRowsAffected());
// 8. 查看测点数据
KwdbQueryResponse showPointRes = client.ShowPoint(
KwdbShowPointsRequest.newBuilder().setDatabase("tsdb").setTable("t1").build());
printResult(showPointRes);
// 9. 查看设备数据
KwdbQueryResponse showDeviceRes = client.ShowDevice(
KwdbShowDevicesRequest.newBuilder().setDatabase("tsdb").setTable("t1").build());
printResult(showDeviceRes);
// 10. 单测点查询(精确匹配)
KwdbClient.FillParam exactParam = KwdbClient.FillParam.forExact();
KwdbQueryResponse singleRes = client.QuerySinglePoint(
"tsdb", "t1", "a.b.c.d.1",
KwdbClient.FillPolicy.EXACT,
"2025-10-01 12:00:00",
exactParam, returnFields);
printResult(singleRes);
// 11. 多测点查询(精确匹配)
List<String> points = new ArrayList<>();
points.add("a.b.c.d.1");
points.add("a.b.c.d.2");
KwdbQueryResponse multiRes = client.QueryMultiplePoints(
"tsdb", "t1", points,
KwdbClient.FillPolicy.EXACT,
"2025-10-01 12:00:00",
exactParam, returnFields);
printResult(multiRes);
// 12. 删除时序表
KwdbDdlResponse dropTableRes = client.DropTable(
KwdbDropTableRequest.newBuilder().setDatabase("tsdb").setTable("t1").build());
System.out.println("DropTable: " + dropTableRes.getMessage());
// 13. 删除时序数据库
KwdbDdlResponse dropDbRes = client.DropDatabase(
KwdbDropDatabaseRequest.newBuilder().setDatabase("tsdb").build());
System.out.println("DropDatabase: " + dropDbRes.getMessage());
// 14. 登出
client.Logout();
}
}
private static void printResult(KwdbQueryResponse res) {
System.out.println("Columns: " + res.getColumnsList());
for (Row row : res.getRowsList()) {
System.out.println("Row: " + row.getValuesList());
}
}
}
错误信息
| 错误信息 | 原因 | 处理方式 |
|---|---|---|
database "X" already exists | 创建的数据库已存在。 | 确认数据库名称是否重复,或捕获异常后跳过处理。 |
relation "X" already exists | 创建的表已存在。 | 确认表名称是否重复,或捕获异常后跳过处理。 |
relation "X" does not exist | 删除或操作的表不存在。 | 确认表名称是否正确,或捕获异常后跳过处理。 |
database "X" does not exist | 删除或操作的数据库不存在。 | 确认数据库名称是否正确,或捕获异常后跳过处理。 |
Database name cannot be empty | 请求未设置数据库名称。 | 检查请求构造时是否已调用 setDatabase()。 |
Table name cannot be empty | 请求未设置表名称。 | 检查请求构造时是否已调用 setTable()。 |
Column X must specify type | 建表时 timestamp 列未指定为时间戳类型。 | 确保 k_timestamp 列调用了 setIsTimestamp(true)。 |
Column X name cannot be empty | 建表时列名称为空。 | 检查对应列是否已调用 setName() 赋值。 |
Failed to create channel: ... | 连接失败,常见原因:数据库服务未启动、IP 或端口错误、.pk8 私钥文件权限不足。 | 确认数据库服务已启动,检查 IP 地址和端口(默认 26257);确认 .pk8 文件对当前用户可读(可执行 chmod 600 client.root.pk8 修复)。 |
TLS configuration is required for secure connection | 创建客户端时未传入证书配置。 | 在 KwdbClient 构造函数中传入完整的 TlsConfig,包含 CA 证书、客户端证书和私钥路径。 |
Exact fill (EXACT) does not require any fill parameters | 使用 EXACT 策略时传入了额外的补值参数。 | 使用 EXACT 策略时,通过 FillParam.forExact() 创建空参数对象传入,不要设置其他字段。 |