雑なメモ書き

気楽にいきます

Goの並行処理を実際に試す(2)

1-4-lockstep.go

  • 前回のgeneratorの続き
  • generatorが増やせる話
  • 隔離空間を簡単に作れる訳か
//    Kevin Chen (2017)
// Patterns from Pike's Google I/O talk, "Go Concurrency Patterns"

// Golang channels as handles on a service (working in lockstep)

package main

import (
    "fmt"
    "time"
)

func main() {
    ch1 := generator("Hello")
    ch2 := generator("Bye")
    for i := 0; i < 5; i++ {
        fmt.Println(<- ch1)
        fmt.Println(<- ch2)
    }
}

func generator(msg string) <-chan string { // returns receive-only channel
    ch := make(chan string)
    go func() { // anonymous goroutine
        for i := 0; ; i++ {
            ch <- fmt.Sprintf("%s %d", msg, i)
            time.Sleep(time.Second)
        }
    }()
    return ch
}

1-5-fanin.go

launch two goroutine while loops to continuously pipe to new channel

このコメントが重要そう

//    Kevin Chen (2017)
// Patterns from Pike's Google I/O talk, "Go Concurrency Patterns"

// Golang multiplexing (fan-in) function to allow multiple channels go through one channel

package main

import (
    "fmt"
    "time"
)

func main() {
    ch := fanIn(generator("Hello"), generator("Bye"))
    for i := 0; i < 10; i++ {
        fmt.Println(<- ch)
    }
}

// fanIn is itself a generator
func fanIn(ch1, ch2 <-chan string) <-chan string { // receives two read-only channels
    new_ch := make(chan string)
    go func() { for { new_ch <- <-ch1 } }() // launch two goroutine while loops to continuously pipe to new channel
    go func() { for { new_ch <- <-ch2 } }()
    return new_ch
}

func generator(msg string) <-chan string { // returns receive-only channel
    ch := make(chan string)
    go func() { // anonymous goroutine
        for i := 0; ; i++ {
            ch <- fmt.Sprintf("%s %d", msg, i)
            time.Sleep(time.Second)
        }
    }()
    return ch
}

1-6-restoring-sequencing.go

// Golang restoring sequencing after multiplexing

//    Kevin Chen (2017)
// Patterns from Pike's Google I/O talk, "Go Concurrency Patterns"

// Golang restoring sequencing after multiplexing

package main

import (
    "fmt"
    "time"
)

type Message struct {
    str string
    block chan int
}

func main() {
    ch := fanIn(generator("Hello"), generator("Bye"))
    for i := 0; i < 10; i++ {
        msg1 := <-ch
        fmt.Println(msg1.str)

        msg2 := <-ch
        fmt.Println(msg2.str)

        <- msg1.block // reset channel, stop blocking
        <- msg2.block
    }
}

// fanIn is itself a generator
func fanIn(ch1, ch2 <-chan Message) <-chan Message { // receives two read-only channels
    new_ch := make(chan Message)
    go func() { for { new_ch <- <-ch1 } }() // launch two goroutine while loops to continuously pipe to new channel
    go func() { for { new_ch <- <-ch2 } }()
    return new_ch
}

func generator(msg string) <-chan Message { // returns receive-only channel
    ch := make(chan Message)
    blockingStep := make(chan int) // channel within channel to control exec, set false default
    go func() { // anonymous goroutine
        for i := 0; ; i++ {
            ch <- Message{fmt.Sprintf("%s %d", msg, i), blockingStep}
            time.Sleep(time.Second)
            blockingStep <- 1 // block by waiting for input
        }
    }()
    return ch
}

1-7-select-fanin.go

//    Kevin Chen (2017)
// Patterns from Pike's Google I/O talk, "Go Concurrency Patterns"

// Select is a control structure for concurrency (why channels/goroutines are built in; not library)
//  Based off of Dijkstra's guarded commands... providing an idiomatic way for concurrent processes to
//  pass in data without programmer having to worry about 'steps'

package main

import (
    "fmt"
    "time"
)

func main() {
    ch := fanIn(generator("Hello"), generator("Bye"))
    for i := 0; i < 10; i++ {
        fmt.Println(<- ch)
    }
}

// fanIn is itself a generator
func fanIn(ch1, ch2 <-chan string) <-chan string { // receives two read-only channels
    new_ch := make(chan string)
    go func() {
        for {
            select {
                case s := <-ch1: new_ch <- s
                case s := <-ch2: new_ch <- s
            }
        }
    }()
    return new_ch
}

func generator(msg string) <-chan string { // returns receive-only channel
    ch := make(chan string)
    go func() { // anonymous goroutine
        for i := 0; ; i++ {
            ch <- fmt.Sprintf("%s %d", msg, i)
            time.Sleep(time.Second)
        }
    }()
    return ch
}

1-8-timeout-select.go

  • selectでtimeoutを実装
//    Kevin Chen (2017)
// Patterns from Pike's Google I/O talk, "Go Concurrency Patterns"

//  In non deterministic select control block, 1 second timer (created each iteration) may
//  time out if channel does not return a string in a second

package main

import (
    "fmt"
    "time"
)

func main() {
    ch := generator("Hi!")
    for i := 0; i < 10; i++ {
        select {
        case s := <-ch:
            fmt.Println(s)
        case <-time.After(1 * time.Second): // time.After returns a channel that waits N time to send a message
            fmt.Println("Waited too long!")
            return
        }
    }
}

func generator(msg string) <-chan string { // returns receive-only channel
    ch := make(chan string)
    go func() { // anonymous goroutine
        for i := 0; ; i++ {
            ch <- fmt.Sprintf("%s %d", msg, i)
            time.Sleep(time.Second)
        }
    }()
    return ch
}