goroutine and channel

  1. 云栖社区>
  2. 博客>
  3. 正文

goroutine and channel

游客byz6gwrxm2jos 2019-04-08 21:21:48 浏览697
展开阅读全文

[TOC]

1 goroutine

package main

import (
    "fmt"
    "time"
)

func spinner() {
    for {
        for _, c := range `\|/` {
            fmt.Printf("\r%c", c)
            time.Sleep(100 * time.Millisecond)
        }
    }
}

func fabi(n int) int {
    if n < 2 {
        return n
    }

    return fabi(n-1) + fabi(n-2)
}

func main() {
    go spinner()

    res := fabi(45)
    fmt.Println(res)

}

goroutine 是golang并发编程的概念。主routine就是main函数所在的routine,当主routine退出时,所有的其他routine会自动被退出。除了主routine和程序退出之外,没有其他办法能让一个routine退出。不过后续可以通过channel的方式通知routine自动退出。

2 并发的clock

并发的clock分为server和client,其中server监听某个端口,当有client连接后,server会向client每个1s发送time消息。

// client.go
package main

import (
    "fmt"
    "io"
    "log"
    "net"
    "os"
)

func main() {
    conn, err := net.Dial("tcp", "localhost:8000")
    if err != nil {
        log.Fatal("Failed to Dial")
    }

    defer conn.Close()
    mustCopy(os.Stdout, conn)
}

func mustCopy(w io.Writer, r io.Reader) {
    // opy copies from src to dst until either EOF is reached on src or an error occurs
    len, err := io.Copy(w, r)
    if err != nil {
        log.Fatal(err)
    }

    fmt.Printf("%d bytes are received from the server", len)
}
// server.go
package main

import (
    "flag"
    "io"
    "log"
    "net"
    "os"
    "strconv"
    "time"
)

func main() {
    port := flag.Uint("port", 8000, "The port of the clock server")
    flag.Parse()
    listener, err := net.Listen("tcp", "localhost:"+strconv.Itoa(int(*port)))
    if err != nil {
        log.Fatal(err)
    }

    for {
        conn, err := listener.Accept()
        if err != nil {
            log.Print(err)
            continue
        }

        defer conn.Close()
        go handleConn(conn)
    }
}

func handleConn(c net.Conn) {
    defer c.Close()
    for {
        timeStr := os.Getenv("TZ") + "--" + time.Now().Format("15:04:05\n")
        _, err := io.WriteString(c, timeStr)
        if err != nil {
            return
        }

        time.Sleep(1 * time.Second)
    }
}
//clockWall.go 见习题8.1
package main

import (
    "fmt"
    "io"
    "log"
    "net"
    "os"
    "strings"
    "time"
)

func main() {
    for _, cityAndServer := range os.Args[1:] {
        tmp := strings.Split(cityAndServer, "=")
        if len(tmp) != 2 {
            fmt.Println("The input is incorrect!")
            continue
        }

        city := tmp[0]
        server := tmp[1]
        fmt.Printf("city:%s -- server:%s\n", city, server)

        go clientDial(city, server)
    }

    // 这儿sleep的原因是如果不sleep的话,main routine走到这就直接退出了,从而导致所有的子routine也一起退出
    time.Sleep(100 * time.Minute)

}

func clientDial(city string, server string) {
    conn, err := net.Dial("tcp", server)
    if err != nil {
        fmt.Printf("Failed tp Dial!err:%s", err.Error())
        return
    }

    fmt.Println("Dial success")

    defer conn.Close()
    copyData(os.Stdout, conn)
}

func copyData(w io.Writer, r io.Reader) {
    if _, err := io.Copy(w, r); err != nil {
        log.Fatal(err)
    }
}

3 Channel

Channel是routine间的通信机制。

chan := make(chan int) // the type of chan is chan int. 无缓冲的channel
chan := make(chan int, 3) // 带缓冲的channel

注意上面的chan的type是chan int. 同map一样,chan的传递也是引用传递,即赋值和参数赋值都是传递的引用,也就是说所有的对象其实都指向同一个底层的对象。
channel的发送和接收都比较简单。

chan := make(chan int)
chan <- struct // 发送
i := <-chan // 接收

chan还支持close操作。不能在一个close的channel上进行发送操作,否则会产生异常。但是在接收端还可以接收已经发送成功的数据,当数据传输完成后,会得到1个零值。

带缓冲的channel

package main

import (
    "fmt"
    "io"
    "log"
    "net"
    "os"
)

func main() {
    conn, err := net.Dial("tcp", "localhost:8080")
    if err != nil {
        log.Fatalf("Failed to Dial!err:%s", err.Error())
    }

    defer conn.Close()
    done := make(chan struct{}) // chan struct{} 是个类型,不是两个入参
    go func() {
        io.Copy(os.Stdout, conn)
        fmt.Println("done")
        done <- struct{}{}
    }()

    mustCopy(conn, os.Stdin)
    conn.Close()

    <-done
}

func mustCopy(dst io.Writer, src io.Reader) {
    if _, err := io.Copy(dst, src); err != nil {
        log.Fatal("Failed to copy")
    }
}

如下实现了main groutine 和 子groutine间的通信,当子groutine结束时,需要通知main groutine也stop掉。
对于无缓冲的channel,在<-done 会阻塞住, 直到发送端发送消息。

串联的channel

想实现如上的功能,分为3部分:

  • counter-生成数值
  • squarer - 平方数值
  • printer - 打印
    所以会用到两个channel:natural and squares.

所以,见如下code实现(ver1)

//ver1
package main

import (
    "fmt"
    "time"
)

func main() {
    naturals := make(chan uint)
    squares := make(chan uint)

    // squares
    go func() {
        x := <-naturals
        fmt.Printf("Squares rcv: %d\n", x)
        squares <- x * x
        fmt.Printf("Squares send %d\n", x*x)
    }()

    // printer
    go func() {
        val := <-squares
        fmt.Printf("Printer rcv: %d\n", val)
    }()

    i := uint(0)
    for {
        i++
        naturals <- i
        time.Sleep(1 * time.Second) // 每个1s产生1个数值
    }
}

但是run之后,会报如下的error

fatal error: all goroutines are asleep - deadlock!

|

In https://golang.org/ref/spec#Program_execution, A complete program is created by linking a single, unimported package called the main package with all the packages it imports, transitively. The main package must have package name main and declare a function mainthat takes no arguments and returns no value.
func main() { … }
Program execution begins by initializing the main package and then invoking the function main. When that function invocation returns, the program exits. It does not wait for other (non-main) goroutines to complete.

上述错误的原因是在子 goroutine中没有for语句,run一遍之后就自动推出了。所以,当执行完第一遍之后,就只剩下main groutine了。此时naturals <-i没有接收方了,所以就肯定会阻塞这里,即dead lock.
关于channel发送和接收的顺序,其实两者都是阻塞住的,也就是接收端会一直阻塞住直到收到一个消息,发送方会一直阻塞住直到消息被接收端接收。对于无缓存channel,发送成功和接收成功是同时发生的。比如

package main

import (
    "log"
    "time"
)

func main() {
    ch := make(chan int)
    go func() {
        for {
            val := <-ch
            log.Printf("Rcv:%d\n", val)
            time.Sleep(2 * time.Second)
        }
    }()

    i := 0
    for {
        i++
        ch <- i
        log.Printf("Send %d\n", i)
        time.Sleep(1 * time.Second)
    }
}

从输出可以看出,尽管发生和接受的sleep时间不一致,但是在输出的时刻是一致的。

2019/03/09 22:35:56 Rcv:1
2019/03/09 22:35:56 Send 1
2019/03/09 22:35:58 Rcv:2
2019/03/09 22:35:58 Send 2
2019/03/09 22:36:00 Rcv:3
2019/03/09 22:36:00 Send 3

上述是发送无尽个数值,假如想发送有限个数列呢 ?其实问题就在于怎么让后面的Squarer,Printer知道数列已经发送完成了?可以通过close(naturals)来实现

//ver1
package main

import (
    "fmt"
    "time"
)

func main() {
    naturals := make(chan uint, 10)
    squares := make(chan uint, 10)

    //naturals
    go func() {
        for i := uint(0); i < 10; i++ {
            naturals <- i
            time.Sleep(1 * time.Second)
        }
        close(naturals)
    }()
    // squares
    go func() {
        for {
            x := <-naturals
            fmt.Printf("Squares rcv: %d\n", x)
            squares <- x * x
            fmt.Printf("Squares send %d\n", x*x)
        }
    }()

    // printer
    for {
        val := <-squares
        fmt.Printf("Printer rcv: %d\n", val)
    }
}

当1个channel被close之后,在通过这个channel发送会产生一个pannic, 同时接收者也不会阻塞,它会立刻返回1个0值 。所以,上述的Suqares and Printer会一直打印0值。

Printer rcv: 0
Printer rcv: 0
Printer rcv: 0
Printer rcv: 0
Squares send 0
Squares rcv: 0
Squares send 0
Squares rcv: 0

那么有没办法让接收端知道channel是否已经被close ? 有!

//ver1
package main

import (
    "fmt"
    "time"
)

func main() {
    naturals := make(chan uint, 10)
    squares := make(chan uint, 10)

    //naturals
    go func() {
        for i := uint(0); i < 5; i++ {
            naturals <- i
            time.Sleep(1 * time.Second)
        }
        close(naturals)
    }()
    // squares
    go func() {
        for {
            x, ok := <-naturals
            if !ok {
                fmt.Println("naturals has been closed")
                break
            }

            fmt.Printf("Squares rcv: %d\n", x)
            squares <- x * x
            fmt.Printf("Squares send %d\n", x*x)
        }
    }()

    // printer
    for {
        val, ok := <-squares
        if !ok {
            fmt.Println("the squares has been closed!")
            break
        }

        fmt.Printf("Printer rcv: %d\n", val)
    }
}

在接收端我们可以通过增加一个参数ok, 来判断channel是否被close。true表示接收值成功,false表示channel已经被close,并且没有值可以读取。
但是在如上的code,还是会输出

fatal error: all goroutines are asleep - deadlock!

原因是在line27,break会退出子groutine,但是channel square并没有被close。此时在line38 依然会等待读取,但此时squares已经没有send了。所以就会形成dead lock
修改方式是:

//ver1
package main

import (
    "fmt"
    "time"
)

func main() {
    naturals := make(chan uint, 10)
    squares := make(chan uint, 10)

    //naturals
    go func() {
        for i := uint(0); i < 5; i++ {
            naturals <- i
            time.Sleep(1 * time.Second)
        }
        close(naturals)
    }()
    // squares
    go func() {
        // 在此处添加一个defer close
        defer close(squares)
        for {
            x, ok := <-naturals
            if !ok {
                fmt.Println("naturals has been closed")
                break
            }

            fmt.Printf("Squares rcv: %d\n", x)
            squares <- x * x
            fmt.Printf("Squares send %d\n", x*x)
        }
    }()

    // printer
    for {
        val, ok := <-squares
        if !ok {
            fmt.Println("the squares has been closed!")
            break
        }

        fmt.Printf("Printer rcv: %d\n", val)
    }
}

更优雅的用法是range,range依次从channel中读取数据,当channel被关闭且没有数据时跳出。

//ver1
package main

import (
    "fmt"
    "time"
)

func main() {
    naturals := make(chan uint, 10)
    squares := make(chan uint, 10)

    //naturals
    go func() {
        defer close(naturals)
        for i := uint(0); i < 10; i++ {
            naturals <- i
            time.Sleep(1 * time.Second)
        }
    }()
    // squares
    go func() {
        // 在此处添加一个defer close
        defer close(squares)
        for x := range naturals {
            fmt.Printf("squares rcv: %d\n", x)
            squares <- x * x
        }

        fmt.Println("naturals has been closed")
    }()

    // printer
    for x := range squares {
        fmt.Printf("printer rcv: %d\n", x)
    }

    fmt.Println("squares has been closed")
}

其实你并不需要关闭每一个channel。只要当需要告诉接收者goroutine,所有的数据已经全部发送时才需要关闭channel。不管一个channel是否被关闭,当它没有被引用时将会被Go语言
的垃圾自动回收器回收。(不要将关闭一个打开文件的操作和关闭一个channel操作混淆。对于每个打开的文件,都需要在不使用的使用调用对应的Close方法来关闭文件。)

单方向的channel

单方向的channel也就是只能发送或只能接收的channel。比如说,上文的counter的channel只需要发送功能,printer只负责接收功能,为了code的安全性,所以有单方向的channel的设计。

package main

import (
   "fmt"
   "time"
)

func counter(out chan<-int) {
   defer close(out)
   for i:= 0; i < 10; i++ {
      out <-i
      time.Sleep(1*time.Second)
   }

   fmt.Println("counter is closed")
}

func square(in <-chan int, out chan <- int) {
   defer close(out)

   for x := range in {
      out <- x * x
   }

   fmt.Println("square is closed")
}

func printer(in <-chan int) {
   for x := range in {
      fmt.Printf("Rcv: %d\n", x)
   }
}

func main() {
   naturals := make(chan int)
   squares := make(chan int)

   go counter(naturals) // 将chan int隐式转换为<-chan int(send)
   go square(naturals, squares) // 将chan int隐式转换为chan<-int(rcv), <-chan int(send)

   printer(squares)
}

带缓冲的channel

带缓冲的channel实际上是将发送和接收解耦了,可以在无人接收的同时进行发生。效率高一些。
无缓冲的channel在同步性上更高一点。

package  main

import (
   "fmt"
   "time"
)

func main() {
   done := make(chan int, 3)

   go func() {
      time.Sleep(910 * time.Microsecond)
      done <- 3
   }()
   go func() {
      time.Sleep(890 * time.Microsecond)
      done <- 1
   }()

   go func() {
      time.Sleep(900 * time.Microsecond)
      done <- 2
   }()


   fmt.Println(<-done)
}

输出为1.

4 并发的循环

现在想调用gopl.io的thumbnail包来处理图片,gopl的相关定义是:

package thumbnail
// ImageFile reads an image from infile and writes
// a thumbnail-size version of it in the same directory.
// It returns the generated file name, e.g. "foo.thumb.jpeg".
func ImageFile(infile string) (string, error) {
   ext := filepath.Ext(infile) // e.g., ".jpg", ".JPEG"
   outfile := strings.TrimSuffix(infile, ext) + ".thumb" + ext
   return outfile, ImageFile2(outfile, infile)
}
  • 首先能想到的就是串行的处理方式
func makeThumbnails(filenames []string) {
   for _, f := range filenames {
      thumbnail.ImageFile(f)
   }
}
  • 但是考虑到效率,现在使用并行的方式来处理(incorrect code)
func makeThumbnails1(filenames []string) {
   for _, filename := range filenames {
      go thumbnail.ImageFile(filename)
   }
}

上述的实现是不正确的,因为在makeThumbnails执行完之后,main goroutine就自动退出了, 不会等待worker goroutine执行完。

  • 那么怎么让main groutine知道worker goroutine执行完了呢?方法就是channel
func makeThumbnails3(filenames []string) {
   done := make(chan struct{})
   for _, f :=range filenames {
      go func() {
         log.Printf("Start process image %s\n", f)
         _, err := thumbnail.ImageFile(f)
         if err != nil {
            log.Println(err)
         }

         done <- struct{}{}
      }()
   }

   for range filenames {
      <-done
   }
}

run 之后,发现输出有问题

PS D:\07-go\src\gopl> .\main.exe .\1.jpg .\2.jpg .\3.jpg
2019/03/11 21:58:39 Start process image .\2.jpg
2019/03/11 21:58:39 Start process image .\3.jpg
2019/03/11 21:58:39 Start process image .\3.jpg

执行了2次3.jpg, 为什么呢?其实在前面讲过,函数字面量会保存f的值。也就是如下的逻辑:
1) goroutine 1: 执行到log.Printf, 此时f=1.jpg, 还为进行ImageFile.
2)goroutine 2: 此时f变为2.jpg, 当goroutine1 执行ImageFile时,就变成了执行2.jpg的变换。
3) 其实这也就是goroutine间的共享变量的问题。

  • 应该采用显式传入filename的方式
func makeThumbnails3(filenames []string) {
   done := make(chan struct{})
   for _, f :=range filenames {
      go func(infile string) {
         log.Printf("Start process image %s\n", infile)
         _, err := thumbnail.ImageFile(infile)
         if err != nil {
            log.Println(err)
         }

         done <- struct{}{}
      }(f)
   }

   for range filenames {
      <-done
   }
}
  • 假如想实现遇到第一个error之后,直接退出并打印error的功能呢?
func makeThumbnails4(filenames []string) error{
   errCh := make(chan error)
   for _, f := range filenames {
      go func(infile string) {
         log.Println("Start processing %s", infile)
         _, err := thumbnail.ImageFile(infile)
         errCh <- err
      }(f)
   }

   for range filenames {
      if err := <- errCh; err != nil {
         log.Println(err)
         return err
      }
   }

   return nil
}

但是这里有个bug,当make函数遇到第1个error退出后,此时对于channel errCh就没有接收者了。但是对于其他仍在run的worker goroutine会block在发送端,从而导致goroutine泄漏。
可能会导致整个程序的卡住或者out of memory的错误

  • 解决办法就是采用带缓冲的channel。 我们在前面也说过,带缓冲的channel可以去掉channel发送和接收端的耦合。
func makeThumbnails4(filenames []string) error{
   errCh := make(chan error, len(filenames))
   for _, f := range filenames {
      go func(infile string) {
         log.Println("Start processing %s", infile)
         _, err := thumbnail.ImageFile(infile)
         errCh <- err
      }(f)
   }

   for range filenames {
      if err := <- errCh; err != nil {
         log.Println(err)
         return err
      }
   }

   return nil
}
  • 假如我们想实现多个goroutine,但是有不知道goroutine的个数,应该怎么管理goroutine呢?又怎么能知道start了多少个goroutine,然后又close了多少goroutine呢?
    解决办法就是Sync.WaitGroup
package main

import (
   "fmt"
   "gopl.io/ch8/thumbnail"
   "log"
   "os"
   "sync"
)

func makeThumbnails(filenames []string) {
   for _, f := range filenames {
      thumbnail.ImageFile(f)
   }
}


func makeThumbnails1(filenames []string) {
   for _, filename := range filenames {
      go thumbnail.ImageFile(filename)
   }
}

func makeThumbnails3(filenames []string) {
   done := make(chan struct{})
   for _, f :=range filenames {
      go func(infile string) {
         log.Printf("Start process image %s\n", infile)
         _, err := thumbnail.ImageFile(infile)
         if err != nil {
            log.Println(err)
         }

         done <- struct{}{}
      }(f)
   }

   for range filenames {
      <-done
   }
}

func makeThumbnails4(filenames []string) error{
   errCh := make(chan error, len(filenames))
   for _, f := range filenames {
      go func(infile string) {
         log.Println("Start processing %s", infile)
         _, err := thumbnail.ImageFile(infile)
         errCh <- err
      }(f)
   }

   for range filenames {
      if err := <- errCh; err != nil {
         log.Println(err)
         return err
      }
   }

   return nil
}

func makeThumbnails6(filenames []string) int64 {
   ch := make(chan int64)
   var wg sync.WaitGroup
   for _, f := range filenames {
      wg.Add(1)
      go func(infile string) {
         defer wg.Done()
         f, _ := thumbnail.ImageFile(infile)
         info, _ := os.Stat(f)
         ch <- info.Size()
      }(f)
   }

   go func() {
      wg.Wait()
      close(ch)
   }()

   var total int64
   for size := range ch {
      total += size
   }

   return total
}

func main() {
   filenames := os.Args[1:]
   sizes := makeThumbnails6(filenames)
   fmt.Printf("The sizes of files is %d\n", sizes)
}

如上实现了统计变换后图片的大小。
那么现在就有个问题,为什么要把wg.Wait放到goroutine里?
假如不放到goroutine中,就需要考虑放在for total的前面还是后面?
假如放在循环之前,则程序永远都不会结束。因为对于ch走不到接收的地方,则send端会一直block住,从而形成dead lock.
如果放在之后,则也会形成dead lock, 因为worker goroutine发送完之后就退出了,就永远没有send了,则ch则会一直block在接收端。
因此,就需要将close操作也放到一个额外的goroutine中。
上面的程序代码结构, 是当我们使用并发循环,但又不知道迭代次数时很通常而且很地道的写法。

基于select的多路复用

package main

import (
   "fmt"
   "os"
   "time"
)

func launch() {
   fmt.Println("LAUNCH")
}

func main() {
   abort := make(chan struct{})
   go func() {
      os.Stdin.Read(make([]byte, 1))
      abort <- struct{}{}
   }()

   tick := time.Tick(1*time.Second)
   for countdown := 10; countdown > 0; countdown-- {
      fmt.Println(countdown)
      select {
      case <-abort:
         fmt.Println("Rcv abort signal")
         return
      case <-tick:
         fmt.Println("Time out")
      }
   }

   launch()
}
package main

import (
   "fmt"
   "time"
)

func main() {
   ch := make(chan int, 1) // 注意这里的size是1
   for i := 0; i < 10; i++ {
      select {
      case ch <- i:
         fmt.Printf("Send %d\n", i)
      case x := <-ch:
         fmt.Printf("Rcv %d\n", x)
      }
      time.Sleep(1*time.Second)
   }
}

当channel size是1,输出永远是偶数,原因自己考虑吧。
那么当channel >1时,结果就不确定了,因为在select的多个case都满足时,就会自动选择其中1个。

网友评论

登录后评论
0/500
评论
游客byz6gwrxm2jos
+ 关注