elasticstream/source/elastic/open.go

42 lines
683 B
Go

package elastic
import (
"context"
)
func Open(context.Context, []Position) error {
// Open a connection with ElasticSearch
cfg := elasticsearch.Config{
Addresses: []string{
c.cfg.Host,
},
}
var err error
c.es, err = elasticsearch.NewClient(cfg)
if err != nil {
return err
}
// create a buffer channel
c.ch = make(chan Data, c.cfg.BatchSize)
// open bolt db
c.db, err = bolt.Open(c.cfg.DBPath, 0644, nil)
if err != nil {
return err
}
for _, index := range c.config.Indexes {
offset, err := getOffset(c.db, index)
if err != nil {
return err
}
c.offsets[index] = offset
NewWorker(c.es, index, offset, c.config.BatchSize)
}
return nil
}