Building a Distributed Search Engine: The Search Coordinator in Go
February 2, 2026Building a Distributed Search Engine: The Search Coordinator in Go
In Part 3, we built the SearchWorker that processes documents. Now it's time to build the brain of our distributed search system: the SearchCoordinator.
The coordinator is the leader node that:
- Receives search queries from clients
- Discovers available workers
- Splits documents among workers
- Sends tasks concurrently
- Aggregates results and calculates final TF-IDF scores
- Returns ranked search results
Architecture Overview
┌─────────────────────────────────────────┐ │ SearchCoordinator │ │ │ Client ──────────►│ 1. Parse protobuf request │ (protobuf) │ 2. Tokenize query │ │ 3. Get workers from registry │ │ 4. Split documents among workers │ │ 5. Send tasks concurrently │ │ 6. Aggregate results │ │ 7. Calculate TF-IDF scores │ Client ◄──────────│ 8. Return ranked results │ (protobuf) │ │ └─────────────────────────────────────────┘ │ ┌─────────────────┼─────────────────┐ │ │ │ ▼ ▼ ▼ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ Worker 1 │ │ Worker 2 │ │ Worker N │ └──────────┘ └──────────┘ └──────────┘
Protocol Buffers for External API
While workers communicate using JSON internally, external clients use Protocol Buffers for efficiency:
syntax = "proto3";
package distributed.search;
message Request { string search_query = 1;}
message Response { repeated DocumentStats relevant_documents = 1;
message DocumentStats { string document_name = 1; double score = 2; int64 document_size = 3; string author = 4; }}
Generate Go code with:
protoc --go_out=. --go_opt=paths=source_relative proto/search.proto
The ServiceRegistry Interface
The coordinator needs to discover workers. We define an interface for service discovery:
package cluster
// ServiceRegistry defines the interface for service discovery.type ServiceRegistry interface { // GetAllServiceAddresses returns all registered service addresses. GetAllServiceAddresses() ([]string, error)
// RegisterToCluster registers this node's address in the registry. RegisterToCluster(address string) error
// UnregisterFromCluster removes this node from the registry. UnregisterFromCluster() error
// RegisterForUpdates sets up a watch to track registry changes. RegisterForUpdates()}
This interface will be implemented with ZooKeeper in a later post. For now, it allows us to test the coordinator in isolation.
Implementing the SearchCoordinator
The Core Structure
package search
type SearchCoordinator struct { workersRegistry cluster.ServiceRegistry client *networking.WebClient documents []string documentsDir string}
func NewSearchCoordinator( registry cluster.ServiceRegistry, client *networking.WebClient, documentsDir string,) *SearchCoordinator { sc := &SearchCoordinator{ workersRegistry: registry, client: client, documentsDir: documentsDir, } sc.documents = sc.readDocumentsList() return sc}
func (sc *SearchCoordinator) GetEndpoint() string { return "/search"}
Handling Search Requests
The HandleRequest method orchestrates the entire search flow:
func (sc *SearchCoordinator) HandleRequest(payload []byte) []byte { // 1. Parse the protobuf request request := &searchpb.Request{} if err := proto.Unmarshal(payload, request); err != nil { log.Printf("Error parsing protobuf request: %v", err) return sc.emptyResponse() }
// 2. Tokenize the search query searchTerms := GetWordsFromLine(request.SearchQuery) if len(searchTerms) == 0 { return sc.emptyResponse() }
// 3. Get available workers workers, err := sc.workersRegistry.GetAllServiceAddresses() if err != nil || len(workers) == 0 { return sc.emptyResponse() }
// 4. Create tasks for workers tasks := sc.createTasks(len(workers), searchTerms) if len(tasks) == 0 { return sc.emptyResponse() }
// 5. Send tasks to workers concurrently ctx := context.Background() results := sc.client.SendTasksConcurrently(ctx, workers, tasks)
// 6. Aggregate results and calculate TF-IDF scores aggregatedResults := sc.aggregateResults(results) scores := GetDocumentsScores(searchTerms, aggregatedResults)
// 7. Sort and build response sortedDocs := sc.sortDocumentsByScore(scores) return sc.buildResponse(sortedDocs, scores)}
Document Distribution
One of the key challenges is splitting documents evenly among workers. We use a round-robin approach:
func (sc *SearchCoordinator) splitDocumentList(numWorkers int) [][]string { if numWorkers <= 0 || len(sc.documents) == 0 { return nil }
// Don't create more splits than documents if numWorkers > len(sc.documents) { numWorkers = len(sc.documents) }
splits := make([][]string, numWorkers) for i := range splits { splits[i] = make([]string, 0) }
// Distribute documents round-robin style for i, doc := range sc.documents { workerIndex := i % numWorkers splits[workerIndex] = append(splits[workerIndex], doc) }
return splits}
This ensures each worker gets either floor(n/w) or ceil(n/w) documents, where n is the total documents and w is the number of workers.
Result Aggregation
After workers return their results, we merge them:
func (sc *SearchCoordinator) aggregateResults(results []*model.Result) map[string]*model.DocumentData { aggregated := make(map[string]*model.DocumentData)
for _, result := range results { if result == nil { continue // Handle failed workers gracefully } for docPath, docData := range result.GetDocumentToDocumentData() { aggregated[docPath] = docData } }
return aggregated}
Key design decision: nil results are skipped, not fatal. If one worker fails, we still return results from the others.
Building the Response
Finally, we sort documents by score and build the protobuf response:
func (sc *SearchCoordinator) sortDocumentsByScore(scores map[float64][]string) []DocumentScore { var sortedDocs []DocumentScore
// Collect and sort scores in descending order var scoreValues []float64 for score := range scores { scoreValues = append(scoreValues, score) } sort.Sort(sort.Reverse(sort.Float64Slice(scoreValues)))
// Build sorted document list for _, score := range scoreValues { docs := scores[score] sort.Strings(docs) // Alphabetical for same scores for _, doc := range docs { sortedDocs = append(sortedDocs, DocumentScore{ DocumentPath: doc, Score: score, }) } }
return sortedDocs}
Property-Based Testing
We use property-based tests to verify our coordinator's correctness.
Property 7: Document Distribution Balance
func TestDocumentDistributionBalance(t *testing.T) { properties := gopter.NewProperties(gopter.DefaultTestParameters())
// Total documents preserved after split properties.Property("total documents preserved", prop.ForAll( func(numDocs int, numWorkers int) bool { documents := generateDocuments(numDocs) splits := splitDocumentList(documents, numWorkers)
totalDocs := 0 for _, split := range splits { totalDocs += len(split) }
return totalDocs == numDocs }, gen.IntRange(1, 100), gen.IntRange(1, 20), ))
// Each worker gets floor or ceil documents properties.Property("balanced distribution", prop.ForAll( func(numDocs int, numWorkers int) bool { documents := generateDocuments(numDocs) splits := splitDocumentList(documents, numWorkers)
floor := numDocs / numWorkers ceil := int(math.Ceil(float64(numDocs) / float64(numWorkers)))
for _, split := range splits { if len(split) != floor && len(split) != ceil { return false } } return true }, gen.IntRange(1, 100), gen.IntRange(1, 20), ))
properties.TestingRun(t)}
Property 8: Result Aggregation Completeness
func TestResultAggregationCompleteness(t *testing.T) { properties := gopter.NewProperties(gopter.DefaultTestParameters())
properties.Property("all documents preserved", prop.ForAll( func(numResults int, docsPerResult int) bool { // Create results with unique documents results := make([]*model.Result, numResults) expectedDocs := make(map[string]bool)
for i := 0; i < numResults; i++ { result := model.NewResult() for j := 0; j < docsPerResult; j++ { docPath := fmt.Sprintf("result%d_doc%d", i, j) docData := model.NewDocumentData() result.AddDocumentData(docPath, docData) expectedDocs[docPath] = true } results[i] = result }
aggregated := aggregateResults(results)
// Verify all documents present return len(aggregated) == len(expectedDocs) }, gen.IntRange(1, 10), gen.IntRange(1, 10), ))
properties.TestingRun(t)}
Property 10: Query Tokenization Determinism
func TestQueryTokenizationDeterminism(t *testing.T) { properties := gopter.NewProperties(gopter.DefaultTestParameters())
properties.Property("tokenization is deterministic", prop.ForAll( func(query string) bool { result1 := GetWordsFromLine(query) result2 := GetWordsFromLine(query)
if len(result1) != len(result2) { return false } for i := range result1 { if result1[i] != result2[i] { return false } } return true }, gen.AnyString(), ))
properties.TestingRun(t)}
Test Results
=== RUN TestDocumentDistributionBalance+ total documents preserved after split: OK, passed 100 tests.+ each worker gets floor or ceil documents: OK, passed 100 tests.+ no documents are duplicated: OK, passed 100 tests.+ handles more workers than documents: OK, passed 100 tests.--- PASS: TestDocumentDistributionBalance (0.00s)
=== RUN TestResultAggregationCompleteness+ all documents from all results are preserved: OK, passed 100 tests.+ nil results are handled gracefully: OK, passed 100 tests.+ empty results are handled correctly: OK, passed 100 tests.+ document data is preserved correctly: OK, passed 100 tests.--- PASS: TestResultAggregationCompleteness (0.00s)
=== RUN TestQueryTokenizationDeterminism+ tokenization is deterministic: OK, passed 100 tests.+ all tokens are non-empty strings: OK, passed 100 tests.+ multiple calls produce identical results: OK, passed 100 tests.--- PASS: TestQueryTokenizationDeterminism (0.01s)
The Complete Flow
Let's trace a search request through the system:
1. Client sends protobuf Request: { search_query: "whale ocean captain" }
2. Coordinator tokenizes query: ["whale", "ocean", "captain"]
3. Coordinator gets workers from registry: ["http://worker1:8081/task", "http://worker2:8082/task"]
4. Coordinator splits 20 documents: Worker 1: [doc1, doc3, doc5, ..., doc19] (10 docs) Worker 2: [doc2, doc4, doc6, ..., doc20] (10 docs)
5. Coordinator sends tasks concurrently: Task 1 → Worker 1: { terms: [...], documents: [...] } Task 2 → Worker 2: { terms: [...], documents: [...] }
6. Workers return Results with term frequencies
7. Coordinator aggregates all DocumentData
8. Coordinator calculates TF-IDF scores: Moby Dick: 0.003961 Frankenstein: 0.000133 ...
9. Coordinator returns protobuf Response: { relevant_documents: [ { document_name: "Moby Dick.txt", score: 0.003961 }, { document_name: "Frankenstein.txt", score: 0.000133 }, ... ] }
Key Takeaways
- Interface-based design: The
ServiceRegistryinterface decouples the coordinator from ZooKeeper - Graceful degradation: Failed workers don't crash the system—we return partial results
- Balanced distribution: Round-robin ensures fair work distribution
- Protocol Buffers: Efficient binary serialization for external API
- Property-based testing: Verifies correctness across random inputs
What's Next?
In Part 5, we'll implement the ZooKeeper integration for:
- Leader election (who becomes coordinator)
- Service registry (discovering workers)
- Automatic failover
Get the Code
git clone git@github.com:UnplugCharger/distributed_doc_search.gitcd distributed-search-cluster-gogo test ./test/property/... -v
This post is part of the "Distributed Document Search" series. Follow along as we build a production-ready search cluster from scratch.