some refactoring
This commit is contained in:
parent
b4c9386577
commit
1e2975ad0f
12
client.go
12
client.go
|
@ -7,6 +7,7 @@ import (
|
||||||
type Client struct {
|
type Client struct {
|
||||||
es *elasticsearch.Client
|
es *elasticsearch.Client
|
||||||
config *Config
|
config *Config
|
||||||
|
ch chan Data
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewClient(config *Config) (*Client, error) {
|
func NewClient(config *Config) (*Client, error) {
|
||||||
|
@ -30,11 +31,14 @@ func NewClient(config *Config) (*Client, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) Open() error {
|
func (c *Client) Open() error {
|
||||||
// create a buffer channel
|
|
||||||
ch := make(chan Data, 1)
|
|
||||||
|
|
||||||
for index, from := range c.config.Indexes {
|
// Open a connection with ElasticSearch
|
||||||
NewWorker(c.es, index, from, c.config.BatchSize, ch)
|
|
||||||
|
// create a buffer channel
|
||||||
|
c.ch = make(chan Data, 1)
|
||||||
|
|
||||||
|
for _, index := range c.config.Indexes {
|
||||||
|
NewWorker(c, index)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
|
|
@ -9,8 +9,8 @@ import (
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
config := &elasticstream.Config{
|
config := &elasticstream.Config{
|
||||||
Host: "http://localhost:9200",
|
Host: "http://test.urantiacloud.com:9200",
|
||||||
Indexes: map[string]int{"users": 0, "students": 0, "teachers": 0},
|
Indexes: []string{"IndexA", "IndexB", "IndexC"},
|
||||||
BatchSize: 10,
|
BatchSize: 10,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -3,6 +3,6 @@ package elasticstream
|
||||||
type Config struct {
|
type Config struct {
|
||||||
Host string
|
Host string
|
||||||
// map of index name and position from where data is to be read.
|
// map of index name and position from where data is to be read.
|
||||||
Indexes map[string]int
|
Indexes []string
|
||||||
BatchSize int
|
BatchSize int
|
||||||
}
|
}
|
||||||
|
|
14
worker.go
14
worker.go
|
@ -1,29 +1,21 @@
|
||||||
package elasticstream
|
package elasticstream
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/elastic/go-elasticsearch/v8"
|
// "github.com/elastic/go-elasticsearch/v8"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Worker struct {
|
type Worker struct {
|
||||||
client *elasticsearch.Client
|
client *Client
|
||||||
index string // name of the indexes worker pulls data from
|
index string // name of the indexes worker pulls data from
|
||||||
from int // from where to start read data
|
|
||||||
size int // batch size
|
|
||||||
buffer chan Data
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewWorker(client *elasticsearch.Client, index string, from, size int, buffer chan Data) *Worker {
|
func NewWorker(client *Client, index string) {
|
||||||
w := &Worker{
|
w := &Worker{
|
||||||
client: client,
|
client: client,
|
||||||
index: index,
|
index: index,
|
||||||
from: from,
|
|
||||||
size: size,
|
|
||||||
buffer: buffer,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
go w.start()
|
go w.start()
|
||||||
|
|
||||||
return w
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *Worker) start() {
|
func (w *Worker) start() {
|
||||||
|
|
Loading…
Reference in New Issue