Building a Distributed Search Engine: The Search Coordinator in Go

February 2, 2026

Building a Distributed Search Engine: The Search Coordinator in Go

In Part 3, we built the SearchWorker that processes documents. Now it's time to build the brain of our distributed search system: the SearchCoordinator.

The coordinator is the leader node that:

  • Receives search queries from clients
  • Discovers available workers
  • Splits documents among workers
  • Sends tasks concurrently
  • Aggregates results and calculates final TF-IDF scores
  • Returns ranked search results

Architecture Overview

┌─────────────────────────────────────────┐
│ SearchCoordinator │
│ │
Client ──────────►│ 1. Parse protobuf request │
(protobuf) │ 2. Tokenize query │
│ 3. Get workers from registry │
│ 4. Split documents among workers │
│ 5. Send tasks concurrently │
│ 6. Aggregate results │
│ 7. Calculate TF-IDF scores │
Client ◄──────────│ 8. Return ranked results │
(protobuf) │ │
└─────────────────────────────────────────┘
┌─────────────────┼─────────────────┐
│ │ │
▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐
│ Worker 1 │ │ Worker 2 │ │ Worker N │
└──────────┘ └──────────┘ └──────────┘

Protocol Buffers for External API

While workers communicate using JSON internally, external clients use Protocol Buffers for efficiency:

syntax = "proto3";
package distributed.search;
message Request {
string search_query = 1;
}
message Response {
repeated DocumentStats relevant_documents = 1;
message DocumentStats {
string document_name = 1;
double score = 2;
int64 document_size = 3;
string author = 4;
}
}

Generate Go code with:

protoc --go_out=. --go_opt=paths=source_relative proto/search.proto

The ServiceRegistry Interface

The coordinator needs to discover workers. We define an interface for service discovery:

package cluster
// ServiceRegistry defines the interface for service discovery.
type ServiceRegistry interface {
// GetAllServiceAddresses returns all registered service addresses.
GetAllServiceAddresses() ([]string, error)
// RegisterToCluster registers this node's address in the registry.
RegisterToCluster(address string) error
// UnregisterFromCluster removes this node from the registry.
UnregisterFromCluster() error
// RegisterForUpdates sets up a watch to track registry changes.
RegisterForUpdates()
}

This interface will be implemented with ZooKeeper in a later post. For now, it allows us to test the coordinator in isolation.

Implementing the SearchCoordinator

The Core Structure

package search
type SearchCoordinator struct {
workersRegistry cluster.ServiceRegistry
client *networking.WebClient
documents []string
documentsDir string
}
func NewSearchCoordinator(
registry cluster.ServiceRegistry,
client *networking.WebClient,
documentsDir string,
) *SearchCoordinator {
sc := &SearchCoordinator{
workersRegistry: registry,
client: client,
documentsDir: documentsDir,
}
sc.documents = sc.readDocumentsList()
return sc
}
func (sc *SearchCoordinator) GetEndpoint() string {
return "/search"
}

Handling Search Requests

The HandleRequest method orchestrates the entire search flow:

func (sc *SearchCoordinator) HandleRequest(payload []byte) []byte {
// 1. Parse the protobuf request
request := &searchpb.Request{}
if err := proto.Unmarshal(payload, request); err != nil {
log.Printf("Error parsing protobuf request: %v", err)
return sc.emptyResponse()
}
// 2. Tokenize the search query
searchTerms := GetWordsFromLine(request.SearchQuery)
if len(searchTerms) == 0 {
return sc.emptyResponse()
}
// 3. Get available workers
workers, err := sc.workersRegistry.GetAllServiceAddresses()
if err != nil || len(workers) == 0 {
return sc.emptyResponse()
}
// 4. Create tasks for workers
tasks := sc.createTasks(len(workers), searchTerms)
if len(tasks) == 0 {
return sc.emptyResponse()
}
// 5. Send tasks to workers concurrently
ctx := context.Background()
results := sc.client.SendTasksConcurrently(ctx, workers, tasks)
// 6. Aggregate results and calculate TF-IDF scores
aggregatedResults := sc.aggregateResults(results)
scores := GetDocumentsScores(searchTerms, aggregatedResults)
// 7. Sort and build response
sortedDocs := sc.sortDocumentsByScore(scores)
return sc.buildResponse(sortedDocs, scores)
}

Document Distribution

One of the key challenges is splitting documents evenly among workers. We use a round-robin approach:

func (sc *SearchCoordinator) splitDocumentList(numWorkers int) [][]string {
if numWorkers <= 0 || len(sc.documents) == 0 {
return nil
}
// Don't create more splits than documents
if numWorkers > len(sc.documents) {
numWorkers = len(sc.documents)
}
splits := make([][]string, numWorkers)
for i := range splits {
splits[i] = make([]string, 0)
}
// Distribute documents round-robin style
for i, doc := range sc.documents {
workerIndex := i % numWorkers
splits[workerIndex] = append(splits[workerIndex], doc)
}
return splits
}

This ensures each worker gets either floor(n/w) or ceil(n/w) documents, where n is the total documents and w is the number of workers.

Result Aggregation

After workers return their results, we merge them:

func (sc *SearchCoordinator) aggregateResults(results []*model.Result) map[string]*model.DocumentData {
aggregated := make(map[string]*model.DocumentData)
for _, result := range results {
if result == nil {
continue // Handle failed workers gracefully
}
for docPath, docData := range result.GetDocumentToDocumentData() {
aggregated[docPath] = docData
}
}
return aggregated
}

Key design decision: nil results are skipped, not fatal. If one worker fails, we still return results from the others.

Building the Response

Finally, we sort documents by score and build the protobuf response:

func (sc *SearchCoordinator) sortDocumentsByScore(scores map[float64][]string) []DocumentScore {
var sortedDocs []DocumentScore
// Collect and sort scores in descending order
var scoreValues []float64
for score := range scores {
scoreValues = append(scoreValues, score)
}
sort.Sort(sort.Reverse(sort.Float64Slice(scoreValues)))
// Build sorted document list
for _, score := range scoreValues {
docs := scores[score]
sort.Strings(docs) // Alphabetical for same scores
for _, doc := range docs {
sortedDocs = append(sortedDocs, DocumentScore{
DocumentPath: doc,
Score: score,
})
}
}
return sortedDocs
}

Property-Based Testing

We use property-based tests to verify our coordinator's correctness.

Property 7: Document Distribution Balance

func TestDocumentDistributionBalance(t *testing.T) {
properties := gopter.NewProperties(gopter.DefaultTestParameters())
// Total documents preserved after split
properties.Property("total documents preserved", prop.ForAll(
func(numDocs int, numWorkers int) bool {
documents := generateDocuments(numDocs)
splits := splitDocumentList(documents, numWorkers)
totalDocs := 0
for _, split := range splits {
totalDocs += len(split)
}
return totalDocs == numDocs
},
gen.IntRange(1, 100),
gen.IntRange(1, 20),
))
// Each worker gets floor or ceil documents
properties.Property("balanced distribution", prop.ForAll(
func(numDocs int, numWorkers int) bool {
documents := generateDocuments(numDocs)
splits := splitDocumentList(documents, numWorkers)
floor := numDocs / numWorkers
ceil := int(math.Ceil(float64(numDocs) / float64(numWorkers)))
for _, split := range splits {
if len(split) != floor && len(split) != ceil {
return false
}
}
return true
},
gen.IntRange(1, 100),
gen.IntRange(1, 20),
))
properties.TestingRun(t)
}

Property 8: Result Aggregation Completeness

func TestResultAggregationCompleteness(t *testing.T) {
properties := gopter.NewProperties(gopter.DefaultTestParameters())
properties.Property("all documents preserved", prop.ForAll(
func(numResults int, docsPerResult int) bool {
// Create results with unique documents
results := make([]*model.Result, numResults)
expectedDocs := make(map[string]bool)
for i := 0; i < numResults; i++ {
result := model.NewResult()
for j := 0; j < docsPerResult; j++ {
docPath := fmt.Sprintf("result%d_doc%d", i, j)
docData := model.NewDocumentData()
result.AddDocumentData(docPath, docData)
expectedDocs[docPath] = true
}
results[i] = result
}
aggregated := aggregateResults(results)
// Verify all documents present
return len(aggregated) == len(expectedDocs)
},
gen.IntRange(1, 10),
gen.IntRange(1, 10),
))
properties.TestingRun(t)
}

Property 10: Query Tokenization Determinism

func TestQueryTokenizationDeterminism(t *testing.T) {
properties := gopter.NewProperties(gopter.DefaultTestParameters())
properties.Property("tokenization is deterministic", prop.ForAll(
func(query string) bool {
result1 := GetWordsFromLine(query)
result2 := GetWordsFromLine(query)
if len(result1) != len(result2) {
return false
}
for i := range result1 {
if result1[i] != result2[i] {
return false
}
}
return true
},
gen.AnyString(),
))
properties.TestingRun(t)
}

Test Results

=== RUN TestDocumentDistributionBalance
+ total documents preserved after split: OK, passed 100 tests.
+ each worker gets floor or ceil documents: OK, passed 100 tests.
+ no documents are duplicated: OK, passed 100 tests.
+ handles more workers than documents: OK, passed 100 tests.
--- PASS: TestDocumentDistributionBalance (0.00s)
=== RUN TestResultAggregationCompleteness
+ all documents from all results are preserved: OK, passed 100 tests.
+ nil results are handled gracefully: OK, passed 100 tests.
+ empty results are handled correctly: OK, passed 100 tests.
+ document data is preserved correctly: OK, passed 100 tests.
--- PASS: TestResultAggregationCompleteness (0.00s)
=== RUN TestQueryTokenizationDeterminism
+ tokenization is deterministic: OK, passed 100 tests.
+ all tokens are non-empty strings: OK, passed 100 tests.
+ multiple calls produce identical results: OK, passed 100 tests.
--- PASS: TestQueryTokenizationDeterminism (0.01s)

The Complete Flow

Let's trace a search request through the system:

1. Client sends protobuf Request:
{ search_query: "whale ocean captain" }
2. Coordinator tokenizes query:
["whale", "ocean", "captain"]
3. Coordinator gets workers from registry:
["http://worker1:8081/task", "http://worker2:8082/task"]
4. Coordinator splits 20 documents:
Worker 1: [doc1, doc3, doc5, ..., doc19] (10 docs)
Worker 2: [doc2, doc4, doc6, ..., doc20] (10 docs)
5. Coordinator sends tasks concurrently:
Task 1 → Worker 1: { terms: [...], documents: [...] }
Task 2 → Worker 2: { terms: [...], documents: [...] }
6. Workers return Results with term frequencies
7. Coordinator aggregates all DocumentData
8. Coordinator calculates TF-IDF scores:
Moby Dick: 0.003961
Frankenstein: 0.000133
...
9. Coordinator returns protobuf Response:
{
relevant_documents: [
{ document_name: "Moby Dick.txt", score: 0.003961 },
{ document_name: "Frankenstein.txt", score: 0.000133 },
...
]
}

Key Takeaways

  1. Interface-based design: The ServiceRegistry interface decouples the coordinator from ZooKeeper
  2. Graceful degradation: Failed workers don't crash the system—we return partial results
  3. Balanced distribution: Round-robin ensures fair work distribution
  4. Protocol Buffers: Efficient binary serialization for external API
  5. Property-based testing: Verifies correctness across random inputs

What's Next?

In Part 5, we'll implement the ZooKeeper integration for:

  • Leader election (who becomes coordinator)
  • Service registry (discovering workers)
  • Automatic failover

Get the Code

git clone git@github.com:UnplugCharger/distributed_doc_search.git
cd distributed-search-cluster-go
go test ./test/property/... -v

This post is part of the "Distributed Document Search" series. Follow along as we build a production-ready search cluster from scratch.