diff --git a/controller/close.go b/controller/close.go index 039961a..73ca912 100644 --- a/controller/close.go +++ b/controller/close.go @@ -15,5 +15,4 @@ func Close(c *gin.Context) { return } c.Status(http.StatusOK) - } diff --git a/controller/open.go b/controller/open.go index 836edf1..8343328 100644 --- a/controller/open.go +++ b/controller/open.go @@ -26,6 +26,7 @@ func Open(c *gin.Context) { DBPath: "index.db", }) if err != nil { + log.Println("client.Configure() err:", err) c.Status(http.StatusInternalServerError) return } diff --git a/source/elastic/ack.go b/source/elastic/ack.go index dc6d0c5..45c02e1 100644 --- a/source/elastic/ack.go +++ b/source/elastic/ack.go @@ -2,11 +2,28 @@ package elastic import ( "context" + "fmt" "elasticstream/source" ) func (c *Client) Ack(ctx context.Context, position source.Position) error { + curr := c.offsets[position.Index] + fmt.Println("curr:", curr) + fmt.Println("asked:", position.Pos) + + for _, p := range c.positions { + if p.Index == position.Index { + fmt.Println("initial:", p.Pos) + if p.Pos > position.Pos { + return fmt.Errorf("not acknowledged pos less than initial position") + } + } + } + + if curr < position.Pos { + return fmt.Errorf("not acknowledged pos more than current position") + } return nil } diff --git a/source/elastic/client.go b/source/elastic/client.go index 4c78b09..f234138 100644 --- a/source/elastic/client.go +++ b/source/elastic/client.go @@ -1,24 +1,29 @@ package elastic import ( + "sync" + "elasticstream/config" "elasticstream/opencdc" + "elasticstream/source" - "github.com/boltdb/bolt" "github.com/elastic/go-elasticsearch/v8" ) type Client struct { - cfg *config.Config - es *elasticsearch.Client - db *bolt.DB - offsets map[string]int - ch chan opencdc.Data + cfg *config.Config + es *elasticsearch.Client + offsets map[string]int + positions []source.Position + ch chan opencdc.Data + shutdown chan struct{} + wg *sync.WaitGroup } func NewClient() *Client { client := &Client{ offsets: make(map[string]int), + wg: &sync.WaitGroup{}, } return client } diff --git a/source/elastic/configure.go b/source/elastic/configure.go index 64d4b29..1ac2afb 100644 --- a/source/elastic/configure.go +++ b/source/elastic/configure.go @@ -9,7 +9,8 @@ import ( func (c *Client) Configure(ctx context.Context, cfg *config.Config) error { - if c == nil || c.ch == nil { + // if c == nil || c.ch == nil { + if c == nil { return fmt.Errorf("error source not opened for reading") } diff --git a/source/elastic/open.go b/source/elastic/open.go index 2d07439..eb2fa33 100644 --- a/source/elastic/open.go +++ b/source/elastic/open.go @@ -29,8 +29,11 @@ func (c *Client) Open(ctx context.Context, positions []source.Position) error { // create a buffer channel c.ch = make(chan opencdc.Data, c.cfg.BatchSize) + c.shutdown = make(chan struct{}) for _, index := range c.cfg.Indexes { + c.wg.Add(1) + offset := 0 for _, position := range positions { if index == position.Index { @@ -42,5 +45,7 @@ func (c *Client) Open(ctx context.Context, positions []source.Position) error { NewWorker(c, index, offset) } + c.positions = positions + return nil } diff --git a/source/elastic/search.go b/source/elastic/search.go index dd14d78..79ced6f 100644 --- a/source/elastic/search.go +++ b/source/elastic/search.go @@ -5,15 +5,27 @@ import ( "encoding/json" "fmt" "strings" - - "elasticstream/opencdc" + "time" "github.com/elastic/go-elasticsearch/esapi" "github.com/elastic/go-elasticsearch/v8" ) +type SearchResponse struct { + Hits struct { + Total struct { + Value int `json:"value"` + } `json:"total"` + Hits []struct { + Index string `json:"_index"` + ID string `json:"_id"` + Source map[string]interface{} `json:"_source"` + } `json:"hits"` + } `json:"hits"` +} + // search is calling Elastic Search search API -func search(client *elasticsearch.Client, index string, offset, size *int) ([]opencdc.Data, error) { +func search(client *elasticsearch.Client, index string, offset, size *int) (*SearchResponse, error) { query := fmt.Sprintf(`{ "query": { "match_all": {} @@ -29,7 +41,8 @@ func search(client *elasticsearch.Client, index string, offset, size *int) ([]op } // Perform the request - res, err := req.Do(context.Background(), client) + ctx, _ := context.WithTimeout(context.TODO(), 5*time.Second) + res, err := req.Do(ctx, client) if err != nil { return nil, fmt.Errorf("error getting response: %s", err) } @@ -39,37 +52,10 @@ func search(client *elasticsearch.Client, index string, offset, size *int) ([]op return nil, fmt.Errorf("res.IsError() error: %s", res.String()) } - // Parse the response - var result struct { - Hits struct { - Hits []struct { - Source map[string]interface{} `json:"_source"` - } `json:"hits"` - } `json:"hits"` - } - - if err := json.NewDecoder(res.Body).Decode(&result); err != nil { + result := &SearchResponse{} + if err := json.NewDecoder(res.Body).Decode(result); err != nil { return nil, fmt.Errorf("error parsing the response body: %s", err) } - // Collect the records - newRecords := make([]map[string]interface{}, len(result.Hits.Hits)) - for i, hit := range result.Hits.Hits { - newRecords[i] = hit.Source - } - - header := opencdc.Header{Index: index} - - var records []opencdc.Data - for _, v := range newRecords { - data := opencdc.Data{ - Header: header, - Payload: v, - } - records = append(records, data) - } - - // log.Println("records:", records) - - return records, nil + return result, nil } diff --git a/source/elastic/teardown.go b/source/elastic/teardown.go index 3a41497..eeccf30 100644 --- a/source/elastic/teardown.go +++ b/source/elastic/teardown.go @@ -3,13 +3,26 @@ package elastic import ( "context" "fmt" + "log" ) // close the client func (c *Client) Teardown(ctx context.Context) error { + log.Println(">>>> elastic.Teardown()") + defer log.Println("<<<< elastic.Teardown()") + if c == nil || c.ch == nil { return fmt.Errorf("error source not opened for reading") } + close(c.shutdown) + + c.wg.Wait() + + close(c.ch) + + c.ch = nil + // c.es.Close() + return nil } diff --git a/source/elastic/worker.go b/source/elastic/worker.go index bc8777e..fe44ba3 100644 --- a/source/elastic/worker.go +++ b/source/elastic/worker.go @@ -1,8 +1,11 @@ package elastic import ( + "fmt" "log" "time" + + "elasticstream/opencdc" ) type Worker struct { @@ -22,21 +25,42 @@ func NewWorker(client *Client, index string, offset int) { } func (w *Worker) start() { + defer w.client.wg.Done() for { log.Printf("worker index=%s offset=%d size=%d\n", w.index, w.offset, w.client.cfg.BatchSize) - dataArray, err := search(w.client.es, w.index, &w.offset, &w.client.cfg.BatchSize) - if err != nil { - log.Println("search() err:", err) - time.Sleep(1 * time.Second) - continue + searchResponse, err := search(w.client.es, w.index, &w.offset, &w.client.cfg.BatchSize) + if err != nil || len(searchResponse.Hits.Hits) == 0 { + // log.Println("search() err:", err) + select { + case <-w.client.shutdown: + fmt.Println("shuting donw..") + return + + case <-time.After(time.Second): + continue + } } - for _, data := range dataArray { - w.client.ch <- data - w.offset++ + for _, hit := range searchResponse.Hits.Hits { + data := opencdc.Data{ + Header: opencdc.Header{ + ID: hit.ID, + Index: hit.Index, + Position: w.offset + 1, + }, + Payload: hit.Source, + } + + select { + case w.client.ch <- data: + w.offset++ + + case <-w.client.shutdown: + fmt.Println("Stopping worker...") + return + } } } - } diff --git a/source/interface.go b/source/interface.go index 59a55d7..a2e9db2 100644 --- a/source/interface.go +++ b/source/interface.go @@ -8,9 +8,9 @@ import ( ) type Position struct { - ID string - Index string - Pos int + ID string `json:"id"` + Index string `json:"index"` + Pos int `json:"pos"` } type Source interface {