All posts

Debugging Go Routine leaks

Before beginning with Debugging Goroutines leaks, let me give a small introduction of some fundamentals which will give you a broader perspective of the problem.

Concurrent Programming.

  • Concurrent programming deals with concurrent execution of a program wherein multiple sequential streams of execution run at a time resulting in faster execution of your computations.
  • It helps in better utilization of multicore capabilities of the processors to achieve faster results concurrent/parallel program is necessary.

Goroutines

Goroutines are light weight threads managed by Go runtime.

Goroutines make concurrent execution possible.

Simple programming adding up numbers concurrently.

package main

import "fmt"

// function to add an array of numbers.
func sum(s []int, c chan int) {
	sum := 0
	for _, v := range s {
		sum += v
	}
	// writes the sum to the go routines.
	c <- sum // send sum to c
}

func main() {
	s := []int{7, 2, 8, -9, 4, 0}

	c1 := make(chan int)
	c2 := make(chan int)
	// spin up a goroutine.
	go sum(s[:len(s)/2], c1)
	// spin up a goroutine.
	go sum(s[len(s)/2:], c2)
	x, y := <-c1, <-c2 // receive from c1 aND C2

	fmt.Println(x, y, x+y)
}

Concurrent programming is no more optional, its necessary to develop modern softwares that runs on multi core processors.

Like in case of any coordinated effort towards a common goal, there needs to synchronization and communication.

In the above program, after each go routine calculate the sums, they need to coordinate with the main goroutine to return back result to compute the final value.


Go’s approach towards synchronization .

Instead of explicitly using locks to mediate access to shared data, Go encourages the use of channels to pass references to data between goroutines. This approach ensures that only one goroutine has access to the data at a given time.

Now, Lets start concurrent programming in Go.

If you’ve reached this far, you understand that writing concurrent programs is no more an option, and Go makes this easy for you. Also, you’re aware of Go channels and use them for synchronization between Goroutines. Let us now move to the harder part of synchronizing Goroutines.


Synchronization can Go wrong!

That sounds scary !!!! But what can possibly go wrong ?!!!!

Well, there many ways the coordination between go routines can go wrong.

This could result in some of the goroutines waiting forever!

Be aware of how the Go routine will exit every time you use a go keyword.

Writing into a channel without Receiver.

Here is an simple example of how writing into a channel without receiver can cause a go routine to be blocked forever.

package main

import (
        "fmt"
        "log"
        "net/http"
        "strconv"
)

// function to add an array of numbers.
func sum(s []int, c chan int) {
        sum := 0
        for _, v := range s {
                sum += v
        }
        // writes the sum to the go routines.
        c <- sum // send sum to c
}

// HTTP handler for /sum
func sumConcurrent(w http.ResponseWriter, r *http.Request) {
        s := []int{7, 2, 8, -9, 4, 0}

        c1 := make(chan int)
        c2 := make(chan int)
        // spin up a goroutine.
        go sum(s[:len(s)/2], c1)
        // spin up a goroutine.
        go sum(s[len(s)/2:], c2)
        // not reading from c2.
        // go routine writing to c2 will be blocked.
        x := <-c1
        // write the response.
        fmt.Fprintf(w, strconv.Itoa(x))
}

func main() {
        http.HandleFunc("/sum", sumConcurrent)   // set router
        err := http.ListenAndServe(":8001", nil) // set listen port
        if err != nil {
                log.Fatal("ListenAndServe: ", err)
        }
}

Note: In real this is not how you would write programs. Its a simple illustration on how leaks are introduced and we’ll be using this code further to identify the leak and debug the application

Receiving from channel without a writer.

Example 1: Blocked on a for-select .

for {
   select { 
        case <-c: 
         // process here
    }
}

Example 2: Looping on a channel.

go func() {
for range ch { }
}()


Good practices

Using timeOut channel

timeout := make(chan bool, 1)
go func() {
    time.Sleep(1e9) // one second
    timeout <- true
}()

select {
case <- ch:
    // a read from ch has occurred
case <- timeout:
    // the read from ch has timed out
}
           OR 
select {
    case res := <-c1:
        fmt.Println(res)
    case <-time.After(time.Second * 1):
        fmt.Println("timeout 1")
}

Using context package.

Golang context package can be used to gracefully end go routines and even to time out.


Leak Detection.

The formula for detecting leaks in webservers is to add instrumentation endpoints and use them alongside load tests.

// get the count of number of go routines in the system.
func countGoRoutines() int {
        return runtime.NumGoroutine()
}      

func getGoroutinesCountHandler(w http.ResponseWriter, r *http.Request) {
        // Get the count of number of go routines running.
        count := countGoRoutines()
        w.Write([]byte(strconv.Itoa(count)))
}
func main()
   http.HandleFunc("/_count", getGoroutinesCountHandler)
}

Use instrumentation endpoint which responds with number of goroutines alive in the system before and after your load test.

Here is the flow of your load test program:

Step 1: Call the instrumentation endpoint and get the count of number of goroutines alive in your webserver.
Step 2: Perform load test.Lets the load be concurrent. 
     for i := 0; i < 100 ; i++ {
          go callEndpointUnderInvestigation()
     }
Step 3: Call the instrumentation endpoint and get the count of number of goroutines alive in your webserver.

There is an evidence of existence of leak if there is an unusual increase in number of goroutines alive in the system after the load test.

Here is a small example with webserver having a leaky endpoint. With a simple test we figure identify the existence of leak the server.

package main

import (
	"fmt"
	"log"
	"net/http"
	"runtime"
	"strconv"
)

// get the count of number of go routines in the system.
func countGoRoutines() int {
	return runtime.NumGoroutine()
}

func getGoroutinesCountHandler(w http.ResponseWriter, r *http.Request) {
	// Get the count of number of go routines running.
	count := countGoRoutines()
	w.Write([]byte(strconv.Itoa(count)))
}

// function to add an array of numbers.
func sum(s []int, c chan int) {
	sum := 0
	for _, v := range s {
		sum += v
	}
	// writes the sum to the go routines.
	c <- sum // send sum to c
}

// HTTP handler for /sum
func sumConcurrent(w http.ResponseWriter, r *http.Request) {
	s := []int{7, 2, 8, -9, 4, 0}

	c1 := make(chan int)
	c2 := make(chan int)
	// spin up a goroutine.
	go sum(s[:len(s)/2], c1)
	// spin up a goroutine.
	go sum(s[len(s)/2:], c2)
	// not reading from c2.
	// go routine writing to c2 will be blocked.
  // Since we are not reading from c2, 
  // the goroutine attempting to write to c2 
  // will be blocked forever resulting in leak.
	x := <-c1
	// write the response.
	fmt.Fprintf(w, strconv.Itoa(x))
}

func main() {
	// get the sum of numbers.
	http.HandleFunc("/sum", sumConcurrent)
	// get the count of number of go routines in the system.
	http.HandleFunc("/_count", getGoroutinesCountHandler)
	err := http.ListenAndServe(":8001", nil)
	if err != nil {
		log.Fatal("ListenAndServe: ", err)
	}
}
package main

import (
	"io/ioutil"
	"log"
	"net/http"
	"strconv"
	"sync"
)

const (
	leakyServer = "http://localhost:8001"
)

// get the count of the number of go routines in the server.
func getRoutineCount() (int, error) {
	body, err := getReq("/_count")

	if err != nil {
		return -1, err
	}
	count, err := strconv.Atoi(string(body))
	if err != nil {
		return -1, err
	}
	return count, nil
}

// Send get request and return the repsonse body.
func getReq(endPoint string) ([]byte, error) {
	response, err := http.Get(leakyServer + endPoint)
	if err != nil {
		return []byte{}, err
	}
	defer response.Body.Close()
	body, err := ioutil.ReadAll(response.Body)

	if err != nil {
		return []byte{}, err
	}
	return body, nil
}


func main() {
	// get the number of go routines in the leaky server.
	count, err := getRoutineCount()
	if err != nil {
		log.Fatal(err)
	}
	log.Printf("\n %d Go routines before the load test in the system.", count)

	var wg sync.WaitGroup
	// send 50 concurrent request to the leaky endpoint.
	for i := 0; i < 50; i++ {
		wg.Add(1)

		go func() {
			defer wg.Done()
			_, err = getReq("/sum")
			if err != nil {
				log.Fatal(err)
			}

		}()
	}
	wg.Wait()
	// get the cout of number of goroutines in the system after the load test.
	count, err = getRoutineCount()
	if err != nil {
		log.Fatal(err)
	}
	log.Printf("\n %d Go routines after the load test in the system.", count)
}
// First run the leaky server 
$ go run leaky-server.go
// Run the load test now.
$ go run load.go
 3 Go routines before the load test in the system.
 54 Go routines after the load test in the system.

You can clearly see that with 50 concurrent request to the leaky endpoint there’s a increase of 50 go routines in the system.

Lets run the load test again.

$ go run load.go
 53 Go routines before the load test in the system.
 104 Go routines after the load test in the system.

Its clear that with every run of the load test the number of go routines in the server is increasing and its not dipping down. That’s a clear evidence of a leak.

Identifying the origin of leaks.

Using stack trace instrumentation.

Once you’ve identified that the leaks exist in your web server, now you need to identify the origin of the leak.

Adding endpoint which would return the stack trace of your webserver can help you identify the origin of the leak.

import (
  "runtime/debug"
  "runtime/pprof"
)
func getStackTraceHandler(w http.ResponseWriter, r *http.Request) {
       stack := debug.Stack()
       w.Write(stack)
       pprof.Lookup("goroutine").WriteTo(w, 2)
}
func main() {
http.HandleFunc("/_stack", getStackTraceHandler)
}

After you identify the existence of leaks, use the endpoint to obtain the stack trace before and after your load to identify the origin of the leak.

Adding the stack trace instrumentation to the leaky-server and performing the load test again. Here is the code:

package main

import (
	"fmt"
	"log"
	"net/http"
	"runtime"
	"runtime/debug"
	"runtime/pprof"
	"strconv"
)

// get the count of number of go routines in the system.
func countGoRoutines() int {
	return runtime.NumGoroutine()
}

// respond with number of go routines in the system.
func getGoroutinesCountHandler(w http.ResponseWriter, r *http.Request) {
	// Get the count of number of go routines running.
	count := countGoRoutines()
	w.Write([]byte(strconv.Itoa(count)))
}

// respond with the stack trace of the system.
func getStackTraceHandler(w http.ResponseWriter, r *http.Request) {
	stack := debug.Stack()
	w.Write(stack)
	pprof.Lookup("goroutine").WriteTo(w, 2)
}

// function to add an array of numbers.
func sum(s []int, c chan int) {
	sum := 0
	for _, v := range s {
		sum += v
	}
	// writes the sum to the go routines.
	c <- sum // send sum to c
}

// HTTP handler for /sum
func sumConcurrent(w http.ResponseWriter, r *http.Request) {
	s := []int{7, 2, 8, -9, 4, 0}

	c1 := make(chan int)
	c2 := make(chan int)
	// spin up a goroutine.
	go sum(s[:len(s)/2], c1)
	// spin up a goroutine.
	go sum(s[len(s)/2:], c2)
	// not reading from c2.
	// go routine writing to c2 will be blocked.
	x := <-c1
	// write the response.
	fmt.Fprintf(w, strconv.Itoa(x))
}

func main() {
	// get the sum of numbers.
	http.HandleFunc("/sum", sumConcurrent)
	// get the count of number of go routines in the system.
	http.HandleFunc("/_count", getGoroutinesCountHandler)
	// respond with the stack trace of the system.
	http.HandleFunc("/_stack", getStackTraceHandler)
	err := http.ListenAndServe(":8001", nil)
	if err != nil {
		log.Fatal("ListenAndServe: ", err)
	}
}
package main

import (
	"io/ioutil"
	"log"
	"net/http"
	"strconv"
	"sync"
)

const (
	leakyServer = "http://localhost:8001"
)

// get the count of the number of go routines in the server.
func getRoutineCount() (int, error) {
	body, err := getReq("/_count")

	if err != nil {
		return -1, err
	}
	count, err := strconv.Atoi(string(body))
	if err != nil {
		return -1, err
	}
	return count, nil
}

// Send get request and return the repsonse body.
func getReq(endPoint string) ([]byte, error) {
	response, err := http.Get(leakyServer + endPoint)
	if err != nil {
		return []byte{}, err
	}
	defer response.Body.Close()
	body, err := ioutil.ReadAll(response.Body)

	if err != nil {
		return []byte{}, err
	}
	return body, nil
}

// obtain stack trace of the server.
func getStackTrace() (string, error) {
	body, err := getReq("/_stack")

	if err != nil {
		return "", err
	}
	return string(body), nil
}

func main() {
	// get the number of go routines in the leaky server.
	count, err := getRoutineCount()
	if err != nil {
		log.Fatal(err)
	}
	log.Printf("\n %d Go routines before the load test in the system.", count)

	var wg sync.WaitGroup
	// send 50 concurrent request to the leaky endpoint.
	for i := 0; i < 50; i++ {
		wg.Add(1)

		go func() {
			defer wg.Done()
			_, err = getReq("/sum")
			if err != nil {
				log.Fatal(err)
			}

		}()
	}
	wg.Wait()
	// get the cout of number of goroutines in the system after the load test.
	count, err = getRoutineCount()
	if err != nil {
		log.Fatal(err)
	}
	log.Printf("\n %d Go routines after the load test in the system.", count)
	// obtain the stack trace of the system.
	trace, err := getStackTrace()
	if err != nil {
		log.Fatal(err)
	}
	log.Printf("\nStack trace after the load test : \n %s",trace)
}
// First run the leaky server
$ go run leaky-server.go
// Run the load test now.
$ go run load.go
 3 Go routines before the load test in the system.
 54 Go routines after the load test in the system.
 goroutine 149 [chan send]:
main.sum(0xc420122e58, 0x3, 0x3, 0xc420112240)
        /home/karthic/gophercon/count-instrument.go:39 +0x6c
created by main.sumConcurrent
        /home/karthic/gophercon/count-instrument.go:51 +0x12b

goroutine 243 [chan send]:
main.sum(0xc42021a0d8, 0x3, 0x3, 0xc4202760c0)
        /home/karthic/gophercon/count-instrument.go:39 +0x6c
created by main.sumConcurrent
        /home/karthic/gophercon/count-instrument.go:51 +0x12b

goroutine 259 [chan send]:
main.sum(0xc4202700d8, 0x3, 0x3, 0xc42029c0c0)
        /home/karthic/gophercon/count-instrument.go:39 +0x6c
created by main.sumConcurrent
        /home/karthic/gophercon/count-instrument.go:51 +0x12b

goroutine 135 [chan send]:
main.sum(0xc420226348, 0x3, 0x3, 0xc4202363c0)
        /home/karthic/gophercon/count-instrument.go:39 +0x6c
created by main.sumConcurrent
        /home/karthic/gophercon/count-instrument.go:51 +0x12b

goroutine 166 [chan send]:
main.sum(0xc4202482b8, 0x3, 0x3, 0xc42006b8c0)
        /home/karthic/gophercon/count-instrument.go:39 +0x6c
created by main.sumConcurrent
        /home/karthic/gophercon/count-instrument.go:51 +0x12b

goroutine 199 [chan send]:
main.sum(0xc420260378, 0x3, 0x3, 0xc420256480)
        /home/karthic/gophercon/count-instrument.go:39 +0x6c
created by main.sumConcurrent
        /home/karthic/gophercon/count-instrument.go:51 +0x12b
........

The stack trace clearly points to the epi-center of the leak.

Using profiling.

Since the leaked goroutines are usually blocked trying to read or write into channel or may be even sleeping, profiling will help you identify the origin of the leak.

Here’s my talk talk from Gophercon 2016 on benchmarks and profiling.


Its important the instrumentation tests and profiling is done while the endpoint under investigation is under load.

Avoiding Leaks, catching them early

Instrumentation in unit and functional tests can help identify leaks early.

Count number of goroutines before and after test.

func TestMyFunc() {
 // get count of go routines.
 perform the test.
 // get the count diff.
 // alert if there's an unexpected rise.
}

Stack Diff in tests.

Stack Diff is a simple program which does a diff on the stack trace before and after the test and alerts in case of any undesired goroutines remaining the system. Integrating it with your unit and functional tests and can help identify leaks during development.

import (
    github.com/fortytw2/leaktest
)
func TestMyFunc(t *testing.T) {
    defer leaktest.Check(t)()

    go func() {
        for {
            time.Sleep(time.Second)
        }
    }()
}

Safety by Design

Microservices architecture with end services/end-points running as separate container/process can save the entire system being affected by leaks or resource outage in one of the endpoints/services. Its great if orchestration is managed by tools like Kubernetes, Mesosphere and Docker Swarm.

Imaging getting the stack trace of the entire system and trying to identify which service is causing the leaks amidst tens of hundreds of services!!! Its really scary!!!!

Goroutine leaks are like Slow killer. They accumulate slowly over a period wasting your computational resources, and you wouldn’t even notice. Its really important you understand that its important to be aware of leaks and debug them early!