Go Native API
Go Native API
The Go Native API supports interaction with the database through both Session and SessionPool methods. Since Session is not thread-safe, using SessionPool is strongly recommended for programming. In multi-threaded concurrent scenarios, SessionPool can reasonably manage and allocate connection resources to enhance system performance and resource utilization efficiency.
This article focuses on the usage of SessionPool, covering the complete process from environment preparation and core operation steps to the full set of interfaces.
1. Environment Preparation
1.1 Prerequisites
- golang >= 1.13
- make >= 3.0
- curl >= 7.1.1
- thrift: 0.15.0
- Linux, MacOS, or other Unix-like systems
- Windows + bash (Git is needed to download the IoTDB Go client; any one of WSL, cygwin, or Git Bash is acceptable)
1.2 Installation Methods
Using go mod
# Switch to the HOME path of GOPATH and enable the Go Modules feature export GO111MODULE=on # Configure the GOPROXY environment variable export GOPROXY=https://goproxy.io # Create a named folder or directory and switch to it mkdir session_example && cd session_example # Save the file, which will automatically redirect to the new address curl -o session_example.go -L https://github.com/apache/iotdb-client-go/raw/main/example/session_example.go # Initialize the go module environment go mod init session_example # Download dependency packages go mod tidy # Compile and run the program go run session_example.goUsing GOPATH
# Get thrift 0.13.0 go get github.com/apache/thrift@0.13.0 # Recursively create the directory mkdir -p $GOPATH/src/iotdb-client-go-example/session_example # Switch to the current directory cd $GOPATH/src/iotdb-client-go-example/session_example # Save the file, which will automatically redirect to the new address curl -o session_example.go -L https://github.com/apache/iotdb-client-go/raw/main/example/session_example.go # Initialize the go module environment go mod init # Download dependency packages go mod tidy # Compile and run the program go run session_example.go
2. Core Steps
The three core steps for using the Go native interface to operate IoTDB are as follows:
- Create a connection pool instance: Initialize a
SessionPoolobject, configuring connection parameters and pool size. - Execute database operations:
GetSession()from the pool, perform operations like data writing or querying, and mustPutBack(session)upon completion. - Close connection pool resources: Call
sessionPool.Close()at the end of the program to release all connections.
The following sections illustrate the core development workflow and do not demonstrate all parameters and interfaces. For the complete functionality and parameters, please refer to: Full Interface Description or check: SessionPool Example Source Code
2.1 Create Connection Pool Instance
Single Instance
config := &client.PoolConfig{ Host: host, Port: port, UserName: user, Password: password, } sessionPool = client.NewSessionPool(config, 3, 60000, 60000, false) defer sessionPool.Close()Distributed or Active-Active
config := &client.PoolConfig{ UserName: user, Password: password, NodeUrls: strings.Split("127.0.0.1:6667,127.0.0.1:6668", ","), } sessionPool = client.NewSessionPool(config, 3, 60000, 60000, false) defer sessionPool.Close()
2.2 Database Operations
2.2.1 Data Insertion
session, err := sessionPool.GetSession()
defer sessionPool.PutBack(session)
status, err := session.InsertTablet(tablet, false)
tablet.Reset()
checkError(status, err)2.2.2 Data Query
var timeout int64 = 1000
session, err := sessionPool.GetSession()
defer sessionPool.PutBack(session)
if err != nil {
log.Print(err)
return
}
sessionDataSet, err := session.ExecuteQueryStatement(sql, &timeout)
if err == nil {
defer sessionDataSet.Close()
printDataSet(sessionDataSet)
} else {
log.Println(err)
}2.3 Usage Example
import (
"flag"
"fmt"
"log"
"math/rand"
"strings"
"time"
"github.com/apache/iotdb-client-go/v2/client"
"github.com/apache/iotdb-client-go/v2/common"
)
var (
host string
port string
user string
password string
)
var sessionPool client.SessionPool
func main() {
flag.StringVar(&host, "host", "127.0.0.1", "--host=192.168.1.100")
flag.StringVar(&port, "port", "6667", "--port=6667")
flag.StringVar(&user, "user", "root", "--user=root")
flag.StringVar(&password, "password", "root", "--password=root")
flag.Parse()
// 1. Create connection pool
config := &client.PoolConfig{
Host: host,
Port: port,
UserName: user,
Password: password,
}
sessionPool = client.NewSessionPool(config, 3, 60000, 60000, false)
defer sessionPool.Close()
// 2. Create storage group
setStorageGroup("root.sg1")
// 3. Create time series
createTimeseries("root.sg1.dev1.temperature")
// 4. Data insertion
insertTablet()
// 5. Data query
executeQueryStatement("select temperature from root.sg1.dev1")
// 6. Deletion
deleteTimeseries("root.sg1.dev1.temperature")
deleteStorageGroup("root.sg1")
}
// Set storage group
func setStorageGroup(sg string) {
session, err := sessionPool.GetSession()
defer sessionPool.PutBack(session)
if err == nil {
session.SetStorageGroup(sg)
}
}
// Delete storage group
func deleteStorageGroup(sg string) {
session, err := sessionPool.GetSession()
defer sessionPool.PutBack(session)
if err == nil {
checkError(session.DeleteStorageGroup(sg))
}
}
// Create time series
func createTimeseries(path string) {
var (
dataType = client.FLOAT
encoding = client.PLAIN
compressor = client.SNAPPY
)
session, err := sessionPool.GetSession()
defer sessionPool.PutBack(session)
if err == nil {
checkError(session.CreateTimeseries(path, dataType, encoding, compressor, nil, nil))
}
}
// Delete time series
func deleteTimeseries(paths ...string) {
session, err := sessionPool.GetSession()
defer sessionPool.PutBack(session)
if err == nil {
checkError(session.DeleteTimeseries(paths))
}
}
// Insert Tablet data
func insertTablet() {
session, err := sessionPool.GetSession()
defer sessionPool.PutBack(session)
if err == nil {
if tablet, err := createTablet(12); err == nil {
status, err := session.InsertTablet(tablet, false)
tablet.Reset()
checkError(status, err)
} else {
log.Fatal(err)
}
}
}
// Create Tablet
func createTablet(rowCount int) (*client.Tablet, error) {
tablet, err := client.NewTablet("root.sg1.dev1", []*client.MeasurementSchema{
{
Measurement: "temperature",
DataType: client.FLOAT,
},
}, rowCount)
if err != nil {
return nil, err
}
ts := time.Now().UTC().UnixNano() / 1000000
for row := 0; row < int(rowCount); row++ {
ts++
tablet.SetTimestamp(ts, row)
tablet.SetValueAt(rand.Float32(), 0, row)
tablet.RowSize++
}
return tablet, nil
}
// Execute query statement
func executeQueryStatement(sql string) {
var timeout int64 = 1000
session, err := sessionPool.GetSession()
defer sessionPool.PutBack(session)
if err != nil {
log.Print(err)
return
}
sessionDataSet, err := session.ExecuteQueryStatement(sql, &timeout)
if err == nil {
defer sessionDataSet.Close()
printDataSet(sessionDataSet)
} else {
log.Println(err)
}
}
// Print query results
func printDataSet(sds *client.SessionDataSet) {
columnNames := sds.GetColumnNames()
for _, value := range columnNames {
fmt.Printf("%s\t", value)
}
fmt.Println()
for next, err := sds.Next(); err == nil && next; next, err = sds.Next() {
for _, columnName := range columnNames {
isNull, _ := sds.IsNull(columnName)
if isNull {
fmt.Printf("%v\t\t", "null")
} else {
v, _ := sds.GetString(columnName)
fmt.Printf("%v\t\t", v)
}
}
fmt.Println()
}
}
// Check error
func checkError(status *common.TSStatus, err error) {
if err != nil {
log.Fatal(err)
}
if status != nil {
if err = client.VerifySuccess(status); err != nil {
log.Println(err)
}
}
}3. Full Interface List
3.1 SessionPool Management Interfaces
| Interface Name | Function Description | Parameter Description |
|---|---|---|
NewSessionPool(config *PoolConfig, maxSize, connTimeoutMs, waitTimeoutMs int, enableComp bool) SessionPool | Creates and returns a Session connection pool instance. | config: Pool configurationmaxSize: Maximum connections (≤0 uses CPU count * 5)connTimeoutMs: TCP connection timeout (ms)waitTimeoutMs: Session acquisition wait timeout (ms)enableComp: Whether to enable compression |
GetSession() (Session, error) | Gets an available Session from the pool. Blocks if the pool is full, returns error on timeout. Must be paired with PutBack. | None |
PutBack(session Session) | Returns a used Session back to the connection pool. | session: The instance obtained from GetSession |
Close() | Closes the connection pool, releasing all active connections. Must be called before program exit. | None |
3.2 Data Insertion Interfaces
The following interfaces are called via the obtained Session.
| Interface Name | Function Description | Parameter Description |
|---|---|---|
InsertRecord(deviceId string, measurements []string, dataTypes []TSDataType, values []interface{}, timestamp int64) (r common.TSStatus, err error) | Inserts a single record. | deviceId: Device ID measurements: Measurement listdataTypes: Data type listvalues: Value listtimestamp: Timestamp |
InsertAlignedRecord(deviceId string, measurements []string, dataTypes []TSDataType, values []interface{}, timestamp int64) (r common.TSStatus, err error) | Inserts a single aligned record. | deviceId: Device IDmeasurements: Measurement listdataTypes: Data type listvalues: Value listtimestamp: Timestamp |
InsertStringRecord(deviceId string, measurements []string, values []string, timestamp int64) (r common.TSStatus, err error) | Inserts a single record in string format. | deviceId: Device IDmeasurements: Measurement listvalues: String-type value listtimestamp: Timestamp |
InsertRecords(deviceIds []string, measurements [][]string, dataTypes [][]TSDataType, values [][]interface{}, timestamps []int64) (r common.TSStatus, err error) | Inserts multiple records for multiple devices. | deviceIds: Device ID listmeasurements: 2D measurement listdataTypes: 2D data type listvalues: 2D value listtimestamps: Timestamp list |
InsertAlignedRecords(deviceIds []string, measurements [][]string, dataTypes [][]TSDataType, values [][]interface{}, timestamps []int64) (r common.TSStatus, err error) | Inserts multiple records for multiple aligned devices. | deviceIds: Device ID listmeasurements: 2D measurement listdataTypes: 2D data type listvalues: 2D value listtimestamps: Timestamp list |
InsertTablet(tablet Tablet, sorted bool) (r common.TSStatus, err error) | Inserts multiple rows of data for a single device. | tablet: The Tablet data to insertsorted: Whether the data is sorted |
InsertAlignedTablet(tablet Tablet, sorted bool) (r common.TSStatus, err error) | Inserts multiple rows of data for a single aligned device. | tablet: The Tablet data to insertsorted: Whether the data is sorted |
InsertTablets(tablets []Tablet, sorted bool) (r common.TSStatus, err error) | Batch inserts multiple Tablet data. | tablets: Multiple Tablet data to insertsorted: Whether the data is sorted |
InsertAlignedTablets(tablets []Tablet, sorted bool) (r common.TSStatus, err error) | Batch inserts multiple aligned devices' data. | tablets: Multiple Tablet data to insertsorted: Whether the data is sorted |
3.3 SQL and Query Interfaces
The following interfaces are called via the obtained Session.
| Interface Name | Function Description | Parameter Description |
|---|---|---|
ExecuteStatement(sql string)(SessionDataSet, error) | Executes SQL (primarily for queries), returns a SessionDataSet. | sql: The SQL query statement to execute |
ExecuteQueryStatement(sql string, timeoutMs *int64) (SessionDataSet, error) | Executes a query SQL with optional timeout, returns a SessionDataSet. | sql: The SQL query statement to execute timeoutMs: Query timeout time (milliseconds) |
ExecuteNonQueryStatement(sql string) (r common.TSStatus, err error) | Executes SQL that does not return a result set (e.g., INSERT, CREATE, DELETE). | sql: The SQL statement to execute |
ExecuteRawDataQuery(paths []string, startTime int64, endTime int64) (*SessionDataSet, error) | Queries raw data for specified time series within a time range. | paths: Query path list startTime: Start timestampendTime: End timestamp |
ExecuteAggregationQuery(paths []string, aggregations []common.TAggregationType, startTime, endTime, interval, timeoutMs int64) (SessionDataSet, error) | Executes an aggregation query (COUNT, AVG, etc.). | paths: Query path listaggregations: Aggregation type liststartTime, endTime, interval: Start time, end time, and intervaltimeoutMs: Query timeout time |
ExecuteBatchStatement(sqls []string) (r common.TSStatus, err error) | Executes multiple SQL statements in batch. | sqls: The SQL statements to execute |
3.4 Metadata Operation Interfaces
The following interfaces are called via the obtained Session.
| Interface Name | Function Description | Parameter Description |
|---|---|---|
SetStorageGroup(storageGroupId string) (r common.TSStatus, err error) | Creates a database (storage group). | storageGroupId: Database (storage group) name |
DeleteStorageGroup(storageGroupId string) (r common.TSStatus, err error) | Deletes a database (storage group). | storageGroupId: The database (storage group) name to delete |
DeleteStorageGroups(storageGroupIds ...string) (r common.TSStatus, err error) | Deletes multiple databases (storage groups). | storageGroupIds: The list of database (storage group) names to delete |
CreateTimeseries(path string, dataType TSDataType, encoding TSEncoding, compressor TSCompressionType, attributes map[string]string, tags map[string]string) (r common.TSStatus, err error) | Creates a non-aligned time series. | path: Time series pathdataType: Data typeencoding: Encoding methodcompressor: Compression algorithmattributes: (Optional) Series attributestags: (Optional) Series tags |
CreateAlignedTimeseries(prefixPath string, measurements []string, dataTypes []TSDataType, encodings []TSEncoding, compressors []TSCompressionType, measurementAlias []string) (r common.TSStatus, err error) | Creates a group of aligned time series. | prefixPath: Time series path prefix measurements: Measurement name listdataTypes, encodings, compressors: Data type, encoding, and compressor list for each measurementmeasurementAlias: (Optional) Alias list for each measurement |
DeleteTimeseries(paths []string) (r common.TSStatus, err error) | Deletes multiple time series (including their data). | paths: The list of time series paths to delete |
DeleteData(paths []string, startTime int64, endTime int64) (r common.TSStatus, err error) | Deletes data within a time period for specified time series (metadata is preserved). | paths: The list of time series pathsstartTime: Start timestampendTime: End timestamp |
SetTimeZone(timeZone string) (r common.TSStatus, err error) | Sets the time zone for the current session. | timeZone: Time zone string, e.g., "UTC", "Asia/Shanghai", "GMT+8" |
GetTimeZone() (string, error) | Gets the time zone of the current session. | None |
3.5 Key Configuration Structure (PoolConfig)
| Field | Type | Required | Description |
|---|---|---|---|
Host | string | Choose one with NodeUrls | Single-node host address. |
Port | string | Choose one with NodeUrls | Single-node port. |
NodeUrls | []string | Choose one with Host/Port | Cluster node address list, format: "host:port". |
UserName | string | Yes | Username. |
Password | string | Yes | Password. |
FetchSize | int32 | No | Query result set fetch size, default 1024. |
TimeZone | string | No | Session time zone, e.g., "Asia/Shanghai". Default uses server time zone. |
Database | string | No | For table model; used to set the session's default database. |
