elasticstream/source/elastic/worker.go

67 lines
1.1 KiB
Go
Raw Permalink Normal View History

2024-10-07 16:26:41 +05:30
package elastic
2024-10-03 12:40:46 +05:30
import (
2024-10-07 22:41:59 +05:30
"fmt"
2024-10-07 09:50:00 +05:30
"log"
"time"
2024-10-07 22:41:59 +05:30
"elasticstream/opencdc"
2024-10-03 12:40:46 +05:30
)
type Worker struct {
2024-10-03 14:04:17 +05:30
client *Client
2024-10-07 09:50:00 +05:30
index string
offset int
2024-10-03 12:40:46 +05:30
}
2024-10-07 16:26:41 +05:30
func NewWorker(client *Client, index string, offset int) {
2024-10-03 12:40:46 +05:30
w := &Worker{
client: client,
index: index,
2024-10-07 09:50:00 +05:30
offset: offset,
2024-10-03 12:40:46 +05:30
}
go w.start()
}
func (w *Worker) start() {
2024-10-07 22:41:59 +05:30
defer w.client.wg.Done()
2024-10-03 12:40:46 +05:30
2024-10-07 09:50:00 +05:30
for {
2024-10-07 16:26:41 +05:30
log.Printf("worker index=%s offset=%d size=%d\n", w.index, w.offset, w.client.cfg.BatchSize)
2024-10-07 09:50:00 +05:30
2024-10-07 22:41:59 +05:30
searchResponse, err := search(w.client.es, w.index, &w.offset, &w.client.cfg.BatchSize)
if err != nil || len(searchResponse.Hits.Hits) == 0 {
// log.Println("search() err:", err)
select {
case <-w.client.shutdown:
fmt.Println("shuting donw..")
return
case <-time.After(time.Second):
continue
}
2024-10-07 09:50:00 +05:30
}
2024-10-07 22:41:59 +05:30
for _, hit := range searchResponse.Hits.Hits {
data := opencdc.Data{
Header: opencdc.Header{
ID: hit.ID,
Index: hit.Index,
Position: w.offset + 1,
},
Payload: hit.Source,
}
select {
case w.client.ch <- data:
w.offset++
case <-w.client.shutdown:
fmt.Println("Stopping worker...")
return
}
2024-10-07 09:50:00 +05:30
}
}
2024-10-03 12:40:46 +05:30
}