Building a Distributed Search Engine: The Search Worker in Go

February 2, 2026

Building a Distributed Search Engine: The Search Worker in Go

In Part 1, we built the TF-IDF algorithm. In Part 2, we created the HTTP networking layer. Now it's time to build the SearchWorker—the component that does the actual document processing.

By the end of this post, you'll have a worker that can:

  • Receive search tasks over HTTP
  • Read and parse documents from the filesystem
  • Calculate term frequencies for each document
  • Return results to the coordinator
  • Handle errors gracefully without crashing

The Worker's Role

In our distributed architecture, workers are the muscle. They:

┌─────────────────┐ ┌─────────────────┐
│ Coordinator │ ──── Task ────► │ Worker │
│ │ │ │
│ "Find 'whale' │ │ 1. Read docs │
│ in these 5 │ │ 2. Parse text │
│ documents" │ │ 3. Calculate │
│ │ ◄─── Result ─── │ TF for │
│ │ │ each term │
└─────────────────┘ └─────────────────┘

Each worker processes a subset of documents. The coordinator aggregates all results to compute final TF-IDF scores.

Implementing the SearchWorker

The worker implements our RequestHandler interface from Part 2:

package search
import (
"bufio"
"log"
"os"
"github.com/UnplugCharger/distributed_doc_search/internal/model"
)
// SearchWorker processes document search tasks.
type SearchWorker struct{}
func NewSearchWorker() *SearchWorker {
return &SearchWorker{}
}
// GetEndpoint returns "/task" - the HTTP endpoint for workers
func (sw *SearchWorker) GetEndpoint() string {
return "/task"
}

Handling Incoming Tasks

When a task arrives, we:

  1. Deserialize the JSON payload into a Task
  2. Process each document
  3. Serialize and return the Result
func (sw *SearchWorker) HandleRequest(payload []byte) []byte {
// Deserialize the incoming task
task, err := model.DeserializeTask(payload)
if err != nil {
log.Printf("Error deserializing task: %v", err)
// Return empty result on error - don't crash!
emptyResult := model.NewResult()
data, _ := model.Serialize(emptyResult)
return data
}
// Process each document and collect results
result := model.NewResult()
for _, docPath := range task.Documents {
docData := sw.processDocument(docPath, task.SearchTerms)
if docData != nil {
result.AddDocumentData(docPath, docData)
}
}
// Serialize and return the result
data, err := model.Serialize(result)
if err != nil {
log.Printf("Error serializing result: %v", err)
emptyResult := model.NewResult()
data, _ = model.Serialize(emptyResult)
return data
}
return data
}

Notice the error handling philosophy: never crash, always return something. If deserialization fails, we return an empty result. The coordinator can handle partial results.

Processing Documents

The core work happens in processDocument:

func (sw *SearchWorker) processDocument(path string, terms []string) *model.DocumentData {
// Open the document file
file, err := os.Open(path)
if err != nil {
log.Printf("Warning: Could not open document %s: %v", path, err)
return nil // Skip unreadable documents
}
defer file.Close()
// Read all lines from the document
var lines []string
scanner := bufio.NewScanner(file)
for scanner.Scan() {
lines = append(lines, scanner.Text())
}
if err := scanner.Err(); err != nil {
log.Printf("Warning: Error reading document %s: %v", path, err)
return nil
}
// Extract words from the document
words := GetWordsFromDocument(lines)
// Calculate term frequencies and create DocumentData
return CreateDocumentData(words, terms)
}

Key design decisions:

  1. Line-by-line reading: Using bufio.Scanner is memory-efficient for large files
  2. Graceful failure: Missing or unreadable files are skipped, not fatal errors
  3. Reusing TF-IDF code: We leverage GetWordsFromDocument and CreateDocumentData from Part 1

The Data Flow

Let's trace a complete request:

1. Coordinator sends Task:
{
"search_terms": ["whale", "ocean"],
"documents": ["moby_dick.txt", "frankenstein.txt"]
}
2. Worker processes each document:
- moby_dick.txt → words: ["Call", "me", "Ishmael", ...]
- Calculate TF("whale") = 0.0023
- Calculate TF("ocean") = 0.0008
3. Worker returns Result:
{
"document_to_document_data": {
"moby_dick.txt": {
"term_to_frequency": {
"whale": 0.0023,
"ocean": 0.0008
}
},
"frankenstein.txt": {
"term_to_frequency": {
"whale": 0.0001,
"ocean": 0.0002
}
}
}
}

Testing the Worker

We write comprehensive unit tests covering:

  • Normal operation with valid documents
  • Missing files (should skip, not crash)
  • Invalid JSON payloads
  • Multiple documents
  • Empty documents

Test: Valid Task Processing

func TestSearchWorker_HandleRequest_ValidTask(t *testing.T) {
// Create a temporary test document
tmpDir := t.TempDir()
docPath := filepath.Join(tmpDir, "test_doc.txt")
content := "The quick brown fox jumps over the lazy dog. The fox is quick."
os.WriteFile(docPath, []byte(content), 0644)
// Create and serialize a task
task := model.NewTask([]string{"fox", "quick"}, []string{docPath})
taskData, _ := model.Serialize(task)
// Process the task
worker := NewSearchWorker()
responseData := worker.HandleRequest(taskData)
// Deserialize and verify the result
result, _ := model.DeserializeResult(responseData)
docData := result.DocumentToDocumentData[docPath]
// "fox" appears 2 times in 13 words = 0.1538...
foxFreq := docData.GetFrequency("fox")
if foxFreq <= 0 {
t.Errorf("Expected positive frequency for 'fox', got %f", foxFreq)
}
}

Test: Missing File Handling

func TestSearchWorker_HandleRequest_MissingFile(t *testing.T) {
task := model.NewTask([]string{"test"}, []string{"/nonexistent/file.txt"})
taskData, _ := model.Serialize(task)
worker := NewSearchWorker()
responseData := worker.HandleRequest(taskData)
result, _ := model.DeserializeResult(responseData)
// Result should be empty - missing file was skipped
if len(result.DocumentToDocumentData) != 0 {
t.Errorf("Expected empty result for missing file")
}
}

Test: Invalid Payload

func TestSearchWorker_HandleRequest_InvalidPayload(t *testing.T) {
worker := NewSearchWorker()
// Send garbage data
responseData := worker.HandleRequest([]byte("not valid json"))
// Should return empty result, not crash
result, err := model.DeserializeResult(responseData)
if err != nil {
t.Fatalf("Failed to deserialize result: %v", err)
}
if len(result.DocumentToDocumentData) != 0 {
t.Errorf("Expected empty result for invalid payload")
}
}

Test: Multiple Documents

func TestSearchWorker_HandleRequest_MultipleDocuments(t *testing.T) {
tmpDir := t.TempDir()
// Create three test documents
doc1 := filepath.Join(tmpDir, "doc1.txt")
doc2 := filepath.Join(tmpDir, "doc2.txt")
doc3 := filepath.Join(tmpDir, "doc3.txt")
os.WriteFile(doc1, []byte("apple banana cherry"), 0644)
os.WriteFile(doc2, []byte("banana cherry date"), 0644)
os.WriteFile(doc3, []byte("cherry date elderberry"), 0644)
task := model.NewTask(
[]string{"cherry", "banana"},
[]string{doc1, doc2, doc3},
)
taskData, _ := model.Serialize(task)
worker := NewSearchWorker()
responseData := worker.HandleRequest(taskData)
result, _ := model.DeserializeResult(responseData)
// All three documents should be processed
if len(result.DocumentToDocumentData) != 3 {
t.Errorf("Expected 3 documents, got %d", len(result.DocumentToDocumentData))
}
// "cherry" appears in all documents
for _, docPath := range []string{doc1, doc2, doc3} {
docData := result.DocumentToDocumentData[docPath]
if docData.GetFrequency("cherry") <= 0 {
t.Errorf("Expected positive frequency for 'cherry' in %s", docPath)
}
}
}

Running the Tests

$ go test ./internal/search/... -v
=== RUN TestNewSearchWorker
--- PASS: TestNewSearchWorker (0.00s)
=== RUN TestSearchWorker_GetEndpoint
--- PASS: TestSearchWorker_GetEndpoint (0.00s)
=== RUN TestSearchWorker_HandleRequest_ValidTask
--- PASS: TestSearchWorker_HandleRequest_ValidTask (0.00s)
=== RUN TestSearchWorker_HandleRequest_MissingFile
--- PASS: TestSearchWorker_HandleRequest_MissingFile (0.00s)
=== RUN TestSearchWorker_HandleRequest_InvalidPayload
--- PASS: TestSearchWorker_HandleRequest_InvalidPayload (0.00s)
=== RUN TestSearchWorker_HandleRequest_MultipleDocuments
--- PASS: TestSearchWorker_HandleRequest_MultipleDocuments (0.01s)
=== RUN TestSearchWorker_HandleRequest_EmptyDocument
--- PASS: TestSearchWorker_HandleRequest_EmptyDocument (0.00s)
PASS
ok github.com/UnplugCharger/distributed_doc_search/internal/search 0.420s

All tests pass!

Wiring It Up

To run a worker node, we combine the SearchWorker with our WebServer:

func main() {
port := 8081
worker := search.NewSearchWorker()
server := networking.NewWebServer(port, worker)
log.Printf("Starting worker on port %d", port)
if err := server.Start(); err != nil {
log.Fatalf("Server failed: %v", err)
}
}

Now you can send tasks to http://localhost:8081/task:

curl -X POST http://localhost:8081/task \
-H "Content-Type: application/json" \
-d '{"search_terms":["whale"],"documents":["./resources/books/Moby Dick.txt"]}'

Key Takeaways

  1. Graceful error handling: Workers should never crash. Log errors, return partial results.
  2. Interface compliance: Implementing RequestHandler lets us plug into the HTTP layer seamlessly.
  3. Separation of concerns: The worker doesn't know about HTTP—it just processes payloads.
  4. Testability: Using t.TempDir() for test files keeps tests isolated and cleanup automatic.

What's Next?

In Part 4, we'll build the SearchCoordinator that:

  • Splits documents among available workers
  • Sends tasks concurrently using our WebClient
  • Aggregates results and calculates final TF-IDF scores
  • Returns ranked search results

Get the Code

git clone git@github.com:UnplugCharger/distributed_doc_search.git
cd distributed-search-cluster-go
go test ./internal/search/... -v

This post is part of the "Distributed Document Search" series. Follow along as we build a production-ready search cluster from scratch.