elasticstream/source/elastic/open.go

52 lines
897 B
Go
Raw Permalink Normal View History

2024-10-07 15:25:13 +05:30
package elastic
import (
"context"
2024-10-07 16:49:36 +05:30
"log"
2024-10-07 16:26:41 +05:30
"elasticstream/opencdc"
"elasticstream/source"
"github.com/elastic/go-elasticsearch/v8"
2024-10-07 15:25:13 +05:30
)
2024-10-07 16:26:41 +05:30
func (c *Client) Open(ctx context.Context, positions []source.Position) error {
2024-10-07 16:49:36 +05:30
log.Println(">>>> elastic.Open()")
defer log.Println("<<<< elastic.Open()")
2024-10-07 15:25:13 +05:30
// Open a connection with ElasticSearch
cfg := elasticsearch.Config{
Addresses: []string{
c.cfg.Host,
},
}
var err error
c.es, err = elasticsearch.NewClient(cfg)
if err != nil {
return err
}
// create a buffer channel
2024-10-07 16:26:41 +05:30
c.ch = make(chan opencdc.Data, c.cfg.BatchSize)
2024-10-07 22:41:59 +05:30
c.shutdown = make(chan struct{})
2024-10-07 16:26:41 +05:30
for _, index := range c.cfg.Indexes {
2024-10-07 22:41:59 +05:30
c.wg.Add(1)
2024-10-07 16:26:41 +05:30
offset := 0
for _, position := range positions {
if index == position.Index {
offset = position.Pos
}
2024-10-07 15:25:13 +05:30
}
2024-10-07 16:26:41 +05:30
2024-10-07 15:25:13 +05:30
c.offsets[index] = offset
2024-10-07 16:26:41 +05:30
NewWorker(c, index, offset)
2024-10-07 15:25:13 +05:30
}
2024-10-07 22:41:59 +05:30
c.positions = positions
2024-10-07 15:25:13 +05:30
return nil
}