This commit is contained in:
Parikshit Gothwal 2024-10-07 22:41:59 +05:30
parent a6da23819b
commit 7b46b3b0e8
10 changed files with 105 additions and 54 deletions

View File

@ -15,5 +15,4 @@ func Close(c *gin.Context) {
return
}
c.Status(http.StatusOK)
}

View File

@ -26,6 +26,7 @@ func Open(c *gin.Context) {
DBPath: "index.db",
})
if err != nil {
log.Println("client.Configure() err:", err)
c.Status(http.StatusInternalServerError)
return
}

View File

@ -2,11 +2,28 @@ package elastic
import (
"context"
"fmt"
"elasticstream/source"
)
func (c *Client) Ack(ctx context.Context, position source.Position) error {
curr := c.offsets[position.Index]
fmt.Println("curr:", curr)
fmt.Println("asked:", position.Pos)
for _, p := range c.positions {
if p.Index == position.Index {
fmt.Println("initial:", p.Pos)
if p.Pos > position.Pos {
return fmt.Errorf("not acknowledged pos less than initial position")
}
}
}
if curr < position.Pos {
return fmt.Errorf("not acknowledged pos more than current position")
}
return nil
}

View File

@ -1,24 +1,29 @@
package elastic
import (
"sync"
"elasticstream/config"
"elasticstream/opencdc"
"elasticstream/source"
"github.com/boltdb/bolt"
"github.com/elastic/go-elasticsearch/v8"
)
type Client struct {
cfg *config.Config
es *elasticsearch.Client
db *bolt.DB
offsets map[string]int
ch chan opencdc.Data
cfg *config.Config
es *elasticsearch.Client
offsets map[string]int
positions []source.Position
ch chan opencdc.Data
shutdown chan struct{}
wg *sync.WaitGroup
}
func NewClient() *Client {
client := &Client{
offsets: make(map[string]int),
wg: &sync.WaitGroup{},
}
return client
}

View File

@ -9,7 +9,8 @@ import (
func (c *Client) Configure(ctx context.Context, cfg *config.Config) error {
if c == nil || c.ch == nil {
// if c == nil || c.ch == nil {
if c == nil {
return fmt.Errorf("error source not opened for reading")
}

View File

@ -29,8 +29,11 @@ func (c *Client) Open(ctx context.Context, positions []source.Position) error {
// create a buffer channel
c.ch = make(chan opencdc.Data, c.cfg.BatchSize)
c.shutdown = make(chan struct{})
for _, index := range c.cfg.Indexes {
c.wg.Add(1)
offset := 0
for _, position := range positions {
if index == position.Index {
@ -42,5 +45,7 @@ func (c *Client) Open(ctx context.Context, positions []source.Position) error {
NewWorker(c, index, offset)
}
c.positions = positions
return nil
}

View File

@ -5,15 +5,27 @@ import (
"encoding/json"
"fmt"
"strings"
"elasticstream/opencdc"
"time"
"github.com/elastic/go-elasticsearch/esapi"
"github.com/elastic/go-elasticsearch/v8"
)
type SearchResponse struct {
Hits struct {
Total struct {
Value int `json:"value"`
} `json:"total"`
Hits []struct {
Index string `json:"_index"`
ID string `json:"_id"`
Source map[string]interface{} `json:"_source"`
} `json:"hits"`
} `json:"hits"`
}
// search is calling Elastic Search search API
func search(client *elasticsearch.Client, index string, offset, size *int) ([]opencdc.Data, error) {
func search(client *elasticsearch.Client, index string, offset, size *int) (*SearchResponse, error) {
query := fmt.Sprintf(`{
"query": {
"match_all": {}
@ -29,7 +41,8 @@ func search(client *elasticsearch.Client, index string, offset, size *int) ([]op
}
// Perform the request
res, err := req.Do(context.Background(), client)
ctx, _ := context.WithTimeout(context.TODO(), 5*time.Second)
res, err := req.Do(ctx, client)
if err != nil {
return nil, fmt.Errorf("error getting response: %s", err)
}
@ -39,37 +52,10 @@ func search(client *elasticsearch.Client, index string, offset, size *int) ([]op
return nil, fmt.Errorf("res.IsError() error: %s", res.String())
}
// Parse the response
var result struct {
Hits struct {
Hits []struct {
Source map[string]interface{} `json:"_source"`
} `json:"hits"`
} `json:"hits"`
}
if err := json.NewDecoder(res.Body).Decode(&result); err != nil {
result := &SearchResponse{}
if err := json.NewDecoder(res.Body).Decode(result); err != nil {
return nil, fmt.Errorf("error parsing the response body: %s", err)
}
// Collect the records
newRecords := make([]map[string]interface{}, len(result.Hits.Hits))
for i, hit := range result.Hits.Hits {
newRecords[i] = hit.Source
}
header := opencdc.Header{Index: index}
var records []opencdc.Data
for _, v := range newRecords {
data := opencdc.Data{
Header: header,
Payload: v,
}
records = append(records, data)
}
// log.Println("records:", records)
return records, nil
return result, nil
}

View File

@ -3,13 +3,26 @@ package elastic
import (
"context"
"fmt"
"log"
)
// close the client
func (c *Client) Teardown(ctx context.Context) error {
log.Println(">>>> elastic.Teardown()")
defer log.Println("<<<< elastic.Teardown()")
if c == nil || c.ch == nil {
return fmt.Errorf("error source not opened for reading")
}
close(c.shutdown)
c.wg.Wait()
close(c.ch)
c.ch = nil
// c.es.Close()
return nil
}

View File

@ -1,8 +1,11 @@
package elastic
import (
"fmt"
"log"
"time"
"elasticstream/opencdc"
)
type Worker struct {
@ -22,21 +25,42 @@ func NewWorker(client *Client, index string, offset int) {
}
func (w *Worker) start() {
defer w.client.wg.Done()
for {
log.Printf("worker index=%s offset=%d size=%d\n", w.index, w.offset, w.client.cfg.BatchSize)
dataArray, err := search(w.client.es, w.index, &w.offset, &w.client.cfg.BatchSize)
if err != nil {
log.Println("search() err:", err)
time.Sleep(1 * time.Second)
continue
searchResponse, err := search(w.client.es, w.index, &w.offset, &w.client.cfg.BatchSize)
if err != nil || len(searchResponse.Hits.Hits) == 0 {
// log.Println("search() err:", err)
select {
case <-w.client.shutdown:
fmt.Println("shuting donw..")
return
case <-time.After(time.Second):
continue
}
}
for _, data := range dataArray {
w.client.ch <- data
w.offset++
for _, hit := range searchResponse.Hits.Hits {
data := opencdc.Data{
Header: opencdc.Header{
ID: hit.ID,
Index: hit.Index,
Position: w.offset + 1,
},
Payload: hit.Source,
}
select {
case w.client.ch <- data:
w.offset++
case <-w.client.shutdown:
fmt.Println("Stopping worker...")
return
}
}
}
}

View File

@ -8,9 +8,9 @@ import (
)
type Position struct {
ID string
Index string
Pos int
ID string `json:"id"`
Index string `json:"index"`
Pos int `json:"pos"`
}
type Source interface {