package elasticstream import ( "fmt" "github.com/elastic/go-elasticsearch/v8" ) type Client struct { es *elasticsearch.Client config *Config ch chan Data } func NewClient(config *Config) (*Client, error) { client := &Client{ config: config, } return client, nil } func (c *Client) Open() error { // Open a connection with ElasticSearch cfg := elasticsearch.Config{ Addresses: []string{ c.config.Host, }, } var err error c.es, err = elasticsearch.NewClient(cfg) if err != nil { return err } // create a buffer channel c.ch = make(chan Data, 1) for _, index := range c.config.Indexes { NewWorker(c, index) } return nil } func (c *Client) Read() (*Data, error) { data, ok := <-c.ch if !ok { return nil, fmt.Errorf("error reading data from channel") } return &data, nil } // func (c *Client) Ack(ctx context.Context, position int) error { // return nil // } // close the client func (c *Client) Teardown() error { return nil }