Building a Distributed Search Engine: The Search Worker in Go
February 2, 2026Building a Distributed Search Engine: The Search Worker in Go
In Part 1, we built the TF-IDF algorithm. In Part 2, we created the HTTP networking layer. Now it's time to build the SearchWorker—the component that does the actual document processing.
By the end of this post, you'll have a worker that can:
- Receive search tasks over HTTP
- Read and parse documents from the filesystem
- Calculate term frequencies for each document
- Return results to the coordinator
- Handle errors gracefully without crashing
The Worker's Role
In our distributed architecture, workers are the muscle. They:
┌─ ────────────────┐ ┌─────────────────┐│ Coordinator │ ──── Task ────► │ Worker ││ │ │ ││ "Find 'whale' │ │ 1. Read docs ││ in these 5 │ │ 2. Parse text ││ documents" │ │ 3. Calculate ││ │ ◄─── Result ─── │ TF for ││ │ │ each term │└─────────────────┘ └─────────────────┘
Each worker processes a subset of documents. The coordinator aggregates all results to compute final TF-IDF scores.
Implementing the SearchWorker
The worker implements our RequestHandler interface from Part 2:
package search
import ( "bufio" "log" "os"
"github.com/UnplugCharger/distributed_doc_search/internal/model")
// SearchWorker processes document search tasks.type SearchWorker struct{}
func NewSearchWorker() *SearchWorker { return &SearchWorker{}}
// GetEndpoint returns "/task" - the HTTP endpoint for workersfunc (sw *SearchWorker) GetEndpoint() string { return "/task"}
Handling Incoming Tasks
When a task arrives, we:
- Deserialize the JSON payload into a
Task - Process each document
- Serialize and return the
Result
func (sw *SearchWorker) HandleRequest(payload []byte) []byte { // Deserialize the incoming task task, err := model.DeserializeTask(payload) if err != nil { log.Printf("Error deserializing task: %v", err) // Return empty result on error - don't crash! emptyResult := model.NewResult() data, _ := model.Serialize(emptyResult) return data }
// Process each document and collect results result := model.NewResult() for _, docPath := range task.Documents { docData := sw.processDocument(docPath, task.SearchTerms) if docData != nil { result.AddDocumentData(docPath, docData) } }
// Serialize and return the result data, err := model.Serialize(result) if err != nil { log.Printf("Error serializing result: %v", err) emptyResult := model.NewResult() data, _ = model.Serialize(emptyResult) return data }
return data}
Notice the error handling philosophy: never crash, always return something. If deserialization fails, we return an empty result. The coordinator can handle partial results.
Processing Documents
The core work happens in processDocument:
func (sw *SearchWorker) processDocument(path string, terms []string) *model.DocumentData { // Open the document file file, err := os.Open(path) if err != nil { log.Printf("Warning: Could not open document %s: %v", path, err) return nil // Skip unreadable documents } defer file.Close()
// Read all lines from the document var lines []string scanner := bufio.NewScanner(file) for scanner.Scan() { lines = append(lines, scanner.Text()) }
if err := scanner.Err(); err != nil { log.Printf("Warning: Error reading document %s: %v", path, err) return nil }
// Extract words from the document words := GetWordsFromDocument(lines)
// Calculate term frequencies and create DocumentData return CreateDocumentData(words, terms)}
Key design decisions:
- Line-by-line reading: Using
bufio.Scanneris memory-efficient for large files - Graceful failure: Missing or unreadable files are skipped, not fatal errors
- Reusing TF-IDF code: We leverage
GetWordsFromDocumentandCreateDocumentDatafrom Part 1
The Data Flow
Let's trace a complete request:
1. Coordinator sends Task: { "search_terms": ["whale", "ocean"], "documents": ["moby_dick.txt", "frankenstein.txt"] }
2. Worker processes each document: - moby_dick.txt → words: ["Call", "me", "Ishmael", ...] - Calculate TF("whale") = 0.0023 - Calculate TF("ocean") = 0.0008
3. Worker returns Result: { "document_to_document_data": { "moby_dick.txt": { "term_to_frequency": { "whale": 0.0023, "ocean": 0.0008 } }, "frankenstein.txt": { "term_to_frequency": { "whale": 0.0001, "ocean": 0.0002 } } } }
Testing the Worker
We write comprehensive unit tests covering:
- Normal operation with valid documents
- Missing files (should skip, not crash)
- Invalid JSON payloads
- Multiple documents
- Empty documents
Test: Valid Task Processing
func TestSearchWorker_HandleRequest_ValidTask(t *testing.T) { // Create a temporary test document tmpDir := t.TempDir() docPath := filepath.Join(tmpDir, "test_doc.txt") content := "The quick brown fox jumps over the lazy dog. The fox is quick." os.WriteFile(docPath, []byte(content), 0644)
// Create and serialize a task task := model.NewTask([]string{"fox", "quick"}, []string{docPath}) taskData, _ := model.Serialize(task)
// Process the task worker := NewSearchWorker() responseData := worker.HandleRequest(taskData)
// Deserialize and verify the result result, _ := model.DeserializeResult(responseData)
docData := result.DocumentToDocumentData[docPath]
// "fox" appears 2 times in 13 words = 0.1538... foxFreq := docData.GetFrequency("fox") if foxFreq <= 0 { t.Errorf("Expected positive frequency for 'fox', got %f", foxFreq) }}
Test: Missing File Handling
func TestSearchWorker_HandleRequest_MissingFile(t *testing.T) { task := model.NewTask([]string{"test"}, []string{"/nonexistent/file.txt"}) taskData, _ := model.Serialize(task)
worker := NewSearchWorker() responseData := worker.HandleRequest(taskData)
result, _ := model.DeserializeResult(responseData)
// Result should be empty - missing file was skipped if len(result.DocumentToDocumentData) != 0 { t.Errorf("Expected empty result for missing file") }}
Test: Invalid Payload
func TestSearchWorker_HandleRequest_InvalidPayload(t *testing.T) { worker := NewSearchWorker()
// Send garbage data responseData := worker.HandleRequest([]byte("not valid json"))
// Should return empty result, not crash result, err := model.DeserializeResult(responseData) if err != nil { t.Fatalf("Failed to deserialize result: %v", err) }
if len(result.DocumentToDocumentData) != 0 { t.Errorf("Expected empty result for invalid payload") }}
Test: Multiple Documents
func TestSearchWorker_HandleRequest_MultipleDocuments(t *testing.T) { tmpDir := t.TempDir()
// Create three test documents doc1 := filepath.Join(tmpDir, "doc1.txt") doc2 := filepath.Join(tmpDir, "doc2.txt") doc3 := filepath.Join(tmpDir, "doc3.txt")
os.WriteFile(doc1, []byte("apple banana cherry"), 0644) os.WriteFile(doc2, []byte("banana cherry date"), 0644) os.WriteFile(doc3, []byte("cherry date elderberry"), 0644)
task := model.NewTask( []string{"cherry", "banana"}, []string{doc1, doc2, doc3}, ) taskData, _ := model.Serialize(task)
worker := NewSearchWorker() responseData := worker.HandleRequest(taskData)
result, _ := model.DeserializeResult(responseData)
// All three documents should be processed if len(result.DocumentToDocumentData) != 3 { t.Errorf("Expected 3 documents, got %d", len(result.DocumentToDocumentData)) }
// "cherry" appears in all documents for _, docPath := range []string{doc1, doc2, doc3} { docData := result.DocumentToDocumentData[docPath] if docData.GetFrequency("cherry") <= 0 { t.Errorf("Expected positive frequency for 'cherry' in %s", docPath) } }}
Running the Tests
$ go test ./internal/search/... -v
=== RUN TestNewSearchWorker--- PASS: TestNewSearchWorker (0.00s)=== RUN TestSearchWorker_GetEndpoint--- PASS: TestSearchWorker_GetEndpoint (0.00s)=== RUN TestSearchWorker_HandleRequest_ValidTask--- PASS: TestSearchWorker_HandleRequest_ValidTask (0.00s)=== RUN TestSearchWorker_HandleRequest_MissingFile--- PASS: TestSearchWorker_HandleRequest_MissingFile (0.00s)=== RUN TestSearchWorker_HandleRequest_InvalidPayload--- PASS: TestSearchWorker_HandleRequest_InvalidPayload (0.00s)=== RUN TestSearchWorker_HandleRequest_MultipleDocuments--- PASS: TestSearchWorker_HandleRequest_MultipleDocuments (0.01s)=== RUN TestSearchWorker_HandleRequest_EmptyDocument--- PASS: TestSearchWorker_HandleRequest_EmptyDocument (0.00s)PASSok github.com/UnplugCharger/distributed_doc_search/internal/search 0.420s
All tests pass!
Wiring It Up
To run a worker node, we combine the SearchWorker with our WebServer:
func main() { port := 8081
worker := search.NewSearchWorker() server := networking.NewWebServer(port, worker)
log.Printf("Starting worker on port %d", port) if err := server.Start(); err != nil { log.Fatalf("Server failed: %v", err) }}
Now you can send tasks to http://localhost:8081/task:
curl -X POST http://localhost:8081/task \ -H "Content-Type: application/json" \ -d '{"search_terms":["whale"],"documents":["./resources/books/Moby Dick.txt"]}'
Key Takeaways
- Graceful error handling: Workers should never crash. Log errors, return partial results.
- Interface compliance: Implementing
RequestHandlerlets us plug into the HTTP layer seamlessly. - Separation of concerns: The worker doesn't know about HTTP—it just processes payloads.
- Testability: Using
t.TempDir()for test files keeps tests isolated and cleanup automatic.
What's Next?
In Part 4, we'll build the SearchCoordinator that:
- Splits documents among available workers
- Sends tasks concurrently using our
WebClient - Aggregates results and calculates final TF-IDF scores
- Returns ranked search results
Get the Code
git clone git@github.com:UnplugCharger/distributed_doc_search.gitcd distributed-search-cluster-gogo test ./internal/search/... -v
This post is part of the "Distributed Document Search" series. Follow along as we build a production-ready search cluster from scratch.