Go 原生接口
2023/7/10大约 9 分钟
Go 原生接口
Go 原生 API 支持通过 Session 和 SessionPool 两种方式与数据库进行交互。由于 Session 非线程安全,因此强烈推荐使用 SessionPool 编程。在多线程并发的情形下,SessionPool 能够合理地管理和分配连接资源,以提升系统性能与资源利用效率。
本文将围绕 SessionPool 的使用进行说明,涵盖从环境准备、核心操作步骤到全量接口的完整内容。
1. 环境准备
1.1 前置依赖
- golang >= 1.13
- make >= 3.0
- curl >= 7.1.1
- thrift: 0.15.0
- Linux、Macos 或其他类 unix 系统
- Windows+bash (下载 IoTDB Go client 需要 git ,通过 WSL、cygwin、Git Bash 任意一种方式均可)
1.2 安装方法
- 使用 go mod
# 切换到 GOPATH 的 HOME 路径,启用 Go Modules 功能
export GO111MODULE=on
# 配置 GOPROXY 环境变量
export GOPROXY=https://goproxy.io
# 创建命名的文件夹或目录,并切换当前目录
mkdir session_example && cd session_example
# 保存文件,自动跳转到新的地址
curl -o session_example.go -L https://github.com/apache/iotdb-client-go/raw/main/example/session_example.go
# 初始化 go module 环境
go mod init session_example
# 下载依赖包
go mod tidy
# 编译并运行程序
go run session_example.go- 使用 GOPATH
# get thrift 0.13.0
go get github.com/apache/thrift@0.13.0
# 递归创建目录
mkdir -p $GOPATH/src/iotdb-client-go-example/session_example
# 切换到当前目录
cd $GOPATH/src/iotdb-client-go-example/session_example
# 保存文件,自动跳转到新的地址
curl -o session_example.go -L https://github.com/apache/iotdb-client-go/raw/main/example/session_example.go
# 初始化 go module 环境
go mod init
# 下载依赖包
go mod tidy
# 编译并运行程序
go run session_example.go2. 核心步骤
使用 Go 原生接口操作 IoTDB 的三个核心步骤如下:
- 创建连接池实例:初始化一个
SessionPool对象,配置连接参数和池大小。 - 执行数据库操作:从连接池中
GetSession(),执行数据写入或查询等操作,完成后必须PutBack(session)。 - 关闭连接池资源:程序结束时调用
sessionPool.Close(),释放所有连接。
下面的章节用于说明开发的核心流程,并未演示所有的参数和接口,如需了解全部功能及参数请参见: 全量接口说明 或 查阅: SessionPool 示例源码
2.1 创建连接池实例
- 单实例
config := &client.PoolConfig{
Host: host,
Port: port,
UserName: user,
Password: password,
}
sessionPool = client.NewSessionPool(config, 3, 60000, 60000, false)
defer sessionPool.Close()- 分布式或双活
config := &client.PoolConfig{
UserName: user,
Password: password,
NodeUrls: strings.Split("127.0.0.1:6667,127.0.0.1:6668", ","),
}
sessionPool = client.NewSessionPool(config, 3, 60000, 60000, false)
defer sessionPool.Close()2.2 数据库操作
2.2.1 数据写入
session, err := sessionPool.GetSession()
defer sessionPool.PutBack(session)
status, err := session.InsertTablet(tablet, false)
tablet.Reset()
checkError(status, err)2.2.2 数据查询
var timeout int64 = 1000
session, err := sessionPool.GetSession()
defer sessionPool.PutBack(session)
if err != nil {
log.Print(err)
return
}
sessionDataSet, err := session.ExecuteQueryStatement(sql, &timeout)
if err == nil {
defer sessionDataSet.Close()
printDataSet(sessionDataSet)
} else {
log.Println(err)
}2.3 使用示例
import (
"flag"
"fmt"
"log"
"math/rand"
"strings"
"time"
"github.com/apache/iotdb-client-go/v2/client"
"github.com/apache/iotdb-client-go/v2/common"
)
var (
host string
port string
user string
password string
)
var sessionPool client.SessionPool
func main() {
flag.StringVar(&host, "host", "127.0.0.1", "--host=192.168.1.100")
flag.StringVar(&port, "port", "6667", "--port=6667")
flag.StringVar(&user, "user", "root", "--user=root")
flag.StringVar(&password, "password", "root", "--password=root")
flag.Parse()
//1.创建连接池
config := &client.PoolConfig{
Host: host,
Port: port,
UserName: user,
Password: password,
}
sessionPool = client.NewSessionPool(config, 3, 60000, 60000, false)
defer sessionPool.Close()
//2.创建存储组
setStorageGroup("root.sg1")
//3. 创建时间序列
createTimeseries("root.sg1.dev1.temperature")
//4.数据写入
insertTablet()
//5. 数据查询
executeQueryStatement("select temperature from root.sg1.dev1")
//6. 删除
deleteTimeseries("root.sg1.dev1.temperature")
deleteStorageGroup("root.sg1")
}
// 设置存储组
func setStorageGroup(sg string) {
session, err := sessionPool.GetSession()
defer sessionPool.PutBack(session)
if err == nil {
session.SetStorageGroup(sg)
}
}
// 删除存储组
func deleteStorageGroup(sg string) {
session, err := sessionPool.GetSession()
defer sessionPool.PutBack(session)
if err == nil {
checkError(session.DeleteStorageGroup(sg))
}
}
// 创建时间序列
func createTimeseries(path string) {
var (
dataType = client.FLOAT
encoding = client.PLAIN
compressor = client.SNAPPY
)
session, err := sessionPool.GetSession()
defer sessionPool.PutBack(session)
if err == nil {
checkError(session.CreateTimeseries(path, dataType, encoding, compressor, nil, nil))
}
}
// 删除时间序列
func deleteTimeseries(paths ...string) {
session, err := sessionPool.GetSession()
defer sessionPool.PutBack(session)
if err == nil {
checkError(session.DeleteTimeseries(paths))
}
}
// 插入Tablet数据
func insertTablet() {
session, err := sessionPool.GetSession()
defer sessionPool.PutBack(session)
if err == nil {
if tablet, err := createTablet(12); err == nil {
status, err := session.InsertTablet(tablet, false)
tablet.Reset()
checkError(status, err)
} else {
log.Fatal(err)
}
}
}
//创建Tablet
func createTablet(rowCount int) (*client.Tablet, error) {
tablet, err := client.NewTablet("root.sg1.dev1", []*client.MeasurementSchema{
{
Measurement: "temperature",
DataType: client.FLOAT,
},
}, rowCount)
if err != nil {
return nil, err
}
ts := time.Now().UTC().UnixNano() / 1000000
for row := 0; row < int(rowCount); row++ {
ts++
tablet.SetTimestamp(ts, row)
tablet.SetValueAt(rand.Float32(), 0, row)
tablet.RowSize++
}
return tablet, nil
}
// 执行查询语句
func executeQueryStatement(sql string) {
var timeout int64 = 1000
session, err := sessionPool.GetSession()
defer sessionPool.PutBack(session)
if err != nil {
log.Print(err)
return
}
sessionDataSet, err := session.ExecuteQueryStatement(sql, &timeout)
if err == nil {
defer sessionDataSet.Close()
printDataSet(sessionDataSet)
} else {
log.Println(err)
}
}
// 打印查询结果
func printDataSet(sds *client.SessionDataSet) {
columnNames := sds.GetColumnNames()
for _, value := range columnNames {
fmt.Printf("%s\t", value)
}
fmt.Println()
for next, err := sds.Next(); err == nil && next; next, err = sds.Next() {
for _, columnName := range columnNames {
isNull, _ := sds.IsNull(columnName)
if isNull {
fmt.Printf("%v\t\t", "null")
} else {
v, _ := sds.GetString(columnName)
fmt.Printf("%v\t\t", v)
}
}
fmt.Println()
}
}
// 检查错误
func checkError(status *common.TSStatus, err error) {
if err != nil {
log.Fatal(err)
}
if status != nil {
if err = client.VerifySuccess(status); err != nil {
log.Println(err)
}
}
}3. 全量接口
3.1 SessionPool 管理接口
| 接口名称 | 功能描述 | 参数说明 |
|---|---|---|
NewSessionPool(config *PoolConfig, maxSize, connTimeoutMs, waitTimeoutMs int, enableComp bool) SessionPool | 创建并返回一个Session连接池实例。 | config: 连接池配置 maxSize: 最大连接数(≤0时取CPU数*5) connTimeoutMs: TCP连接超时(ms) waitTimeoutMs: 获取Session等待超时(ms) enableComp: 是否启用压缩 |
GetSession() (Session, error) | 从池中获取一个可用Session。若池满则阻塞等待,超时返回错误。必须与PutBack配对使用。 | 无 |
PutBack(session Session) | 将使用完毕的Session归还到连接池中。 | session: 从GetSession获取的实例 |
Close() | 关闭连接池,释放所有活跃连接。程序退出前必须调用。 | 无 |
3.2 数据写入接口
以下接口需通过获取的 Session 进行调用
| 接口名称 | 功能描述 | 参数说明 |
|---|---|---|
InsertRecord(deviceId string, measurements []string, dataTypes []TSDataType, values []interface{}, timestamp int64) (r common.TSStatus, err error) | 插入单条记录。 | deviceId: 设备ID measurements: 测点列表 dataTypes: 数据类型列表values: 值列表timestamp: 时间戳 |
InsertAlignedRecord(deviceId string, measurements []string, dataTypes []TSDataType, values []interface{}, timestamp int64) (r common.TSStatus, err error) | 插入单条对齐记录。 | deviceId: 设备IDmeasurements: 测点列表dataTypes: 数据类型列表values: 值列表timestamp: 时间戳 |
InsertStringRecord(deviceId string, measurements []string, values []string, timestamp int64) (r common.TSStatus, err error) | 插入字符串格式的单条记录。 | deviceId: 设备IDmeasurements: 测点列表values: 字符串类型的值列表timestamp: 时间戳 |
InsertRecords(deviceIds []string, measurements [][]string, dataTypes [][]TSDataType, values [][]interface{}, timestamps []int64) (r common.TSStatus, err error) | 插入多条记录。 | deviceIds: 设备ID列表measurements:二维测点列表dataTypes: 二维数据类型列表values: 二维值列表timestamps: 时间戳列表 |
InsertAlignedRecords(deviceIds []string, measurements [][]string, dataTypes [][]TSDataType, values [][]interface{}, timestamps []int64) (r common.TSStatus, err error) | 插入多个对齐设备的多条记录。 | deviceIds: 设备ID列表measurements:二维测点列表dataTypes: 二维数据类型列表values: 二维值列表timestamps: 时间戳列表 |
InsertTablet(tablet Tablet, sorted bool) (r common.TSStatus, err error) | 插入单个设备的多条数据。 | tablet: 要插入的Tablet数据sorted: 数据是否已排序 |
InsertAlignedTablet(tablet Tablet, sorted bool) (r common.TSStatus, err error) | 插入单个对齐设备的多条数据。 | tablet: 要插入的Tablet数据sorted: 数据是否已排序 |
InsertTablets``(tablets []Tablet, sorted bool) (r common.TSStatus, err error) | 批量插入多个 Tablet 数据。 | tablets: 要插入的多个Tablet 数据sorted: 数据是否已排序 |
InsertAlignedTablets(tablets []Tablet, sorted bool) (r common.TSStatus, err error) | 批量插入多个对齐设备的数据。 | tablets: 要插入的多个Tablet 数据sorted: 数据是否已排序 |
3.3 SQL与查询接口
以下接口需通过获取的 Session 进行调用
| 接口名称 | 功能描述 | 参数说明 |
|---|---|---|
ExecuteStatement(sql string)(SessionDataSet, error) | 执行SQL(主要查询),返回SessionDataSet。 | sql:要执行的SQL查询语句 |
ExecuteQueryStatement(sql string, timeoutMs int64) (SessionDataSet, error) | 执行查询SQL,可指定超时,返回SessionDataSet。 | sql:要执行的SQL查询语句timeoutMs: 查询超时时间(毫秒) |
ExecuteNonQueryStatementExecuteNonQueryStatement(sql string) (r common.TSStatus, err error) | 执行不返回结果集的SQL(如INSERT, CREATE, DELETE)。 | sql:要执行的SQL语句 |
ExecuteRawDataQuery(paths []string, startTime int64, endTime int64) (*SessionDataSet, error) | 查询指定时间序列在时间范围内的原始数据。 | paths: 查询路径列表startTime: 起始时间戳endTime: 结束时间戳 |
ExecuteAggregationQuery(paths []string, aggregations []common.TAggregationType, startTime, endTime, interval, timeoutMs int64) (SessionDataSet, error) | 执行聚合查询(COUNT, AVG等)。 | paths: 查询路径列表aggregations: 聚合类型列表startTime, endTime, interval: 起始时间、结束时间和间隔时间timeoutMs: 查询超时时间 |
ExecuteBatchStatement(sqls []string) (r common.TSStatus, err error) | 批量执行多条SQL语句。 | sqls:要执行的SQL语句 |
3.4 元数据操作接口
以下接口需通过获取的 Session 进行调用
| 接口名称 | 功能描述 | 参数说明 |
|---|---|---|
SetStorageGroup(storageGroupId string) (r common.TSStatus, err error) | 创建数据库(存储组)。 | storageGroupId:数据库(存储组)名称 |
DeleteStorageGroup(storageGroupId string) (r common.TSStatus, err error) | 删除一个数据库(存储组)。 | storageGroupId:要删除的数据库(存储组)名称 |
DeleteStorageGroups(storageGroupIds ...string) (r common.TSStatus, err error) | 删除多个数据库(存储组)。 | storageGroupIds:要删除的数据库(存储组)名称列表 |
CreateTimeseries(path string, dataType TSDataType, encoding TSEncoding, compressor TSCompressionType, attributes map[string]string, tags map[string]string) (r common.TSStatus, err error) | 创建非对齐时间序列。 | path: 时间序列路径dataType: 数据类型encoding: 编码方式compressor: 压缩算法attributes: (可选)序列属性tags: (可选)序列标签 |
CreateAlignedTimeseries(prefixPath string, measurements []string, dataTypes []TSDataType, encodings []TSEncoding, compressors []TSCompressionType, measurementAlias []string) (r common.TSStatus, err error) | 创建一组对齐时间序列。 | prefixPath: 时间序列路径前缀measurements: 测点名称列表dataTypes, encodings, compressors: 每个测点对应的数据类型、编码和压缩算法列表measurementAlias: (可选)每个测点的别名列表 |
DeleteTimeseries(paths []string) (r common.TSStatus, err error) | 删除多条时间序列(含数据)。 | paths:要删除的时间序列路径列表 |
DeleteData(paths []string, startTime int64, endTime int64) (r common.TSStatus, err error) | 删除指定时间序列在时间段内的数据(保留元数据)。 | paths: 要删除的时间序列路径列表startTime: 起始时间戳endTime: 结束时间戳。 |
SetTimeZone(timeZone string) (r common.TSStatus, err error) | 设置当前会话时区。 | timeZone: 时区字符串,例如 ”UTC”, ”Asia/Shanghai”, ”GMT+8” |
GetTimeZone() (string, error) | 获取当前会话时区。 | 无 |
3.5 关键配置结构 (PoolConfig)
| 字段 | 类型 | 必填 | 描述 |
|---|---|---|---|
Host | string | 与NodeUrls二选一 | 单节点主机地址。 |
Port | string | 与NodeUrls二选一 | 单节点端口。 |
NodeUrls | []string | 与Host/Port二选一 | 集群节点地址列表,格式为”host:port”。 |
UserName | string | 是 | 用户名。 |
Password | string | 是 | 密码。 |
FetchSize | int32 | 否 | 查询结果集获取大小,默认1024。 |
TimeZone | string | 否 | 会话时区,如”Asia/Shanghai”,默认使用服务端时区。 |
Database | string | 否 | 表模型适用,用于设置会话默认数据库。 |
