tiny-rdm/backend/services/connection_service.go

378 lines
9.7 KiB
Go

package services
import (
"context"
"crypto/tls"
"crypto/x509"
"errors"
"fmt"
"github.com/redis/go-redis/v9"
"golang.org/x/crypto/ssh"
"net"
"os"
"strings"
"sync"
"time"
. "tinyrdm/backend/storage"
"tinyrdm/backend/types"
)
type cmdHistoryItem struct {
Timestamp int64 `json:"timestamp"`
Server string `json:"server"`
Cmd string `json:"cmd"`
Cost int64 `json:"cost"`
}
type connectionService struct {
ctx context.Context
conns *ConnectionsStorage
}
var connection *connectionService
var onceConnection sync.Once
func Connection() *connectionService {
if connection == nil {
onceConnection.Do(func() {
connection = &connectionService{
conns: NewConnections(),
}
})
}
return connection
}
func (c *connectionService) Start(ctx context.Context) {
c.ctx = ctx
}
func (c *connectionService) buildOption(config types.ConnectionConfig) (*redis.Options, error) {
var sshClient *ssh.Client
if config.SSH.Enable {
sshConfig := &ssh.ClientConfig{
User: config.SSH.Username,
Auth: []ssh.AuthMethod{ssh.Password(config.SSH.Password)},
HostKeyCallback: ssh.InsecureIgnoreHostKey(),
Timeout: time.Duration(config.ConnTimeout) * time.Second,
}
switch config.SSH.LoginType {
case "pwd":
sshConfig.Auth = []ssh.AuthMethod{ssh.Password(config.SSH.Password)}
case "pkfile":
key, err := os.ReadFile(config.SSH.PKFile)
if err != nil {
return nil, err
}
var signer ssh.Signer
if len(config.SSH.Passphrase) > 0 {
signer, err = ssh.ParsePrivateKeyWithPassphrase(key, []byte(config.SSH.Passphrase))
} else {
signer, err = ssh.ParsePrivateKey(key)
}
if err != nil {
return nil, err
}
sshConfig.Auth = []ssh.AuthMethod{ssh.PublicKeys(signer)}
default:
return nil, errors.New("invalid login type")
}
var err error
sshClient, err = ssh.Dial("tcp", fmt.Sprintf("%s:%d", config.SSH.Addr, config.SSH.Port), sshConfig)
if err != nil {
return nil, err
}
}
var tlsConfig *tls.Config
if config.SSL.Enable {
// setup tls config
var certs []tls.Certificate
if len(config.SSL.CertFile) > 0 && len(config.SSL.KeyFile) > 0 {
if cert, err := tls.LoadX509KeyPair(config.SSL.CertFile, config.SSL.KeyFile); err != nil {
return nil, err
} else {
certs = []tls.Certificate{cert}
}
}
var caCertPool *x509.CertPool
if len(config.SSL.CAFile) > 0 {
ca, err := os.ReadFile(config.SSL.CAFile)
if err != nil {
return nil, err
}
caCertPool = x509.NewCertPool()
caCertPool.AppendCertsFromPEM(ca)
}
tlsConfig = &tls.Config{
RootCAs: caCertPool,
InsecureSkipVerify: config.SSL.AllowInsecure,
Certificates: certs,
ServerName: strings.TrimSpace(config.SSL.SNI),
}
}
option := &redis.Options{
Addr: fmt.Sprintf("%s:%d", config.Addr, config.Port),
Username: config.Username,
Password: config.Password,
DialTimeout: time.Duration(config.ConnTimeout) * time.Second,
ReadTimeout: time.Duration(config.ExecTimeout) * time.Second,
WriteTimeout: time.Duration(config.ExecTimeout) * time.Second,
TLSConfig: tlsConfig,
}
if config.LastDB > 0 {
option.DB = config.LastDB
}
if sshClient != nil {
option.Dialer = func(ctx context.Context, network, addr string) (net.Conn, error) {
return sshClient.Dial(network, addr)
}
option.ReadTimeout = -2
option.WriteTimeout = -2
}
return option, nil
}
func (c *connectionService) createRedisClient(config types.ConnectionConfig) (redis.UniversalClient, error) {
option, err := c.buildOption(config)
if err != nil {
return nil, err
}
if config.Sentinel.Enable {
// get master address via sentinel node
sentinel := redis.NewSentinelClient(option)
defer sentinel.Close()
var addr []string
addr, err = sentinel.GetMasterAddrByName(c.ctx, config.Sentinel.Master).Result()
if err != nil {
return nil, err
}
if len(addr) < 2 {
return nil, errors.New("cannot get master address")
}
option.Addr = fmt.Sprintf("%s:%s", addr[0], addr[1])
option.Username = config.Sentinel.Username
option.Password = config.Sentinel.Password
}
rdb := redis.NewClient(option)
if config.Cluster.Enable {
// connect to cluster
var slots []redis.ClusterSlot
if slots, err = rdb.ClusterSlots(c.ctx).Result(); err == nil {
clusterOptions := &redis.ClusterOptions{
//NewClient: nil,
//MaxRedirects: 0,
//RouteByLatency: false,
//RouteRandomly: false,
//ClusterSlots: nil,
Dialer: option.Dialer,
OnConnect: option.OnConnect,
Protocol: option.Protocol,
Username: option.Username,
Password: option.Password,
MaxRetries: option.MaxRetries,
MinRetryBackoff: option.MinRetryBackoff,
MaxRetryBackoff: option.MaxRetryBackoff,
DialTimeout: option.DialTimeout,
ContextTimeoutEnabled: option.ContextTimeoutEnabled,
PoolFIFO: option.PoolFIFO,
PoolSize: option.PoolSize,
PoolTimeout: option.PoolTimeout,
MinIdleConns: option.MinIdleConns,
MaxIdleConns: option.MaxIdleConns,
ConnMaxIdleTime: option.ConnMaxIdleTime,
ConnMaxLifetime: option.ConnMaxLifetime,
TLSConfig: option.TLSConfig,
DisableIndentity: option.DisableIndentity,
}
if option.Dialer != nil {
clusterOptions.Dialer = option.Dialer
clusterOptions.ReadTimeout = -2
clusterOptions.WriteTimeout = -2
}
var addrs []string
for _, slot := range slots {
for _, node := range slot.Nodes {
addrs = append(addrs, node.Addr)
}
}
clusterOptions.Addrs = addrs
clusterClient := redis.NewClusterClient(clusterOptions)
return clusterClient, nil
} else {
return nil, err
}
}
return rdb, nil
}
// ListSentinelMasters list all master info by sentinel
func (c *connectionService) ListSentinelMasters(config types.ConnectionConfig) (resp types.JSResp) {
option, err := c.buildOption(config)
if err != nil {
resp.Msg = err.Error()
return
}
if option.DialTimeout > 0 {
option.DialTimeout = 10 * time.Second
}
sentinel := redis.NewSentinelClient(option)
defer sentinel.Close()
var retInfo []map[string]string
masterInfos, err := sentinel.Masters(c.ctx).Result()
if err != nil {
resp.Msg = err.Error()
return
}
for _, info := range masterInfos {
if infoMap, ok := info.(map[any]any); ok {
retInfo = append(retInfo, map[string]string{
"name": infoMap["name"].(string),
"addr": fmt.Sprintf("%s:%s", infoMap["ip"].(string), infoMap["port"].(string)),
})
}
}
resp.Data = retInfo
resp.Success = true
return
}
func (c *connectionService) TestConnection(config types.ConnectionConfig) (resp types.JSResp) {
client, err := c.createRedisClient(config)
if err != nil {
resp.Msg = err.Error()
return
}
defer client.Close()
if _, err = client.Ping(c.ctx).Result(); err != nil && err != redis.Nil {
resp.Msg = err.Error()
} else {
resp.Success = true
}
return
}
// ListConnection list all saved connection in local profile
func (c *connectionService) ListConnection() (resp types.JSResp) {
resp.Success = true
resp.Data = c.conns.GetConnections()
return
}
func (c *connectionService) getConnection(name string) *types.Connection {
return c.conns.GetConnection(name)
}
// GetConnection get connection profile by name
func (c *connectionService) GetConnection(name string) (resp types.JSResp) {
conn := c.getConnection(name)
resp.Success = conn != nil
resp.Data = conn
return
}
// SaveConnection save connection config to local profile
func (c *connectionService) SaveConnection(name string, param types.ConnectionConfig) (resp types.JSResp) {
var err error
if strings.ContainsAny(param.Name, "/") {
err = errors.New("connection name contains illegal characters")
} else {
if len(name) > 0 {
// update connection
err = c.conns.UpdateConnection(name, param)
} else {
err = c.conns.CreateConnection(param)
}
}
if err != nil {
resp.Msg = err.Error()
} else {
resp.Success = true
}
return
}
// DeleteConnection remove connection by name
func (c *connectionService) DeleteConnection(name string) (resp types.JSResp) {
err := c.conns.DeleteConnection(name)
if err != nil {
resp.Msg = err.Error()
return
}
resp.Success = true
return
}
// SaveSortedConnection save sorted connection after drag
func (c *connectionService) SaveSortedConnection(sortedConns types.Connections) (resp types.JSResp) {
err := c.conns.SaveSortedConnection(sortedConns)
if err != nil {
resp.Msg = err.Error()
return
}
resp.Success = true
return
}
// CreateGroup create a new group
func (c *connectionService) CreateGroup(name string) (resp types.JSResp) {
err := c.conns.CreateGroup(name)
if err != nil {
resp.Msg = err.Error()
return
}
resp.Success = true
return
}
// RenameGroup rename group
func (c *connectionService) RenameGroup(name, newName string) (resp types.JSResp) {
err := c.conns.RenameGroup(name, newName)
if err != nil {
resp.Msg = err.Error()
return
}
resp.Success = true
return
}
// DeleteGroup remove a group by name
func (c *connectionService) DeleteGroup(name string, includeConn bool) (resp types.JSResp) {
err := c.conns.DeleteGroup(name, includeConn)
if err != nil {
resp.Msg = err.Error()
return
}
resp.Success = true
return
}
// SaveLastDB save last selected database index
func (c *connectionService) SaveLastDB(name string, db int) (resp types.JSResp) {
param := c.conns.GetConnection(name)
if param == nil {
resp.Msg = "no connection named \"" + name + "\""
return
}
param.LastDB = db
if err := c.conns.UpdateConnection(name, param.ConnectionConfig); err != nil {
resp.Msg = "save connection fail:" + err.Error()
return
}
resp.Success = true
return
}