在Go的实现中,所有IO都是阻塞调用的,Go的设计思想是程序员使用阻塞式的接口来编写程序,然后通过goroutine+channel来处理并发。因此所有的IO逻辑都是直来直去的,先xx,再xx, 你不再需要回调,不再需要future,要的仅仅是step by step。这对于代码的可读性是很有帮助的。
// Read implements io.Reader. func (fd *FD) Read(p []byte) (int, error) { if err := fd.readLock(); err != nil { return0, err } defer fd.readUnlock() iflen(p) == 0 { // If the caller wanted a zero byte read, return immediately // without trying (but after acquiring the readLock). // Otherwise syscall.Read returns 0, nil which looks like // io.EOF. // TODO(bradfitz): make it wait for readability? (Issue 15735) return0, nil } if err := fd.pd.prepareRead(fd.isFile); err != nil { return0, err } if fd.IsStream && len(p) > maxRW { p = p[:maxRW] } for { n, err := syscall.Read(fd.Sysfd, p) if err != nil { n = 0 if err == syscall.EAGAIN && fd.pd.pollable() { if err = fd.pd.waitRead(fd.isFile); err == nil { continue } }
// On MacOS we can see EINTR here if the user // pressed ^Z. See issue #22838. if runtime.GOOS == "darwin" && err == syscall.EINTR { continue } } err = fd.eofError(n, err) return n, err } }
//go:linkname poll_runtime_pollWait internal/poll.runtime_pollWait func poll_runtime_pollWait(pd *pollDesc, mode int)int{ err := netpollcheckerr(pd, int32(mode)) if err != 0 { return err } // As for now only Solaris uses level-triggered IO. if GOOS == "solaris" { netpollarm(pd, mode) } for !netpollblock(pd, int32(mode), false) { err = netpollcheckerr(pd, int32(mode)) if err != 0 { return err } // Can happen if timeout has fired and unblocked us, // but before we had a chance to run, timeout has been reset. // Pretend it has not happened and retry. } return0 }
// set the gpp semaphore to WAIT for { old := *gpp if old == pdReady { *gpp = 0 returntrue } if old != 0 { throw("runtime: double wait") } if atomic.Casuintptr(gpp, 0, pdWait) { break } }
// need to recheck error states after setting gpp to WAIT // this is necessary because runtime_pollUnblock/runtime_pollSetDeadline/deadlineimpl // do the opposite: store to closing/rd/wd, membarrier, load of rg/wg if waitio || netpollcheckerr(pd, mode) == 0 { gopark(netpollblockcommit, unsafe.Pointer(gpp), "IO wait", traceEvGoBlockNet, 5) } // be careful to not lose concurrent READY notification old := atomic.Xchguintptr(gpp, 0) if old > pdWait { throw("runtime: corrupted polldesc") } return old == pdReady }
// Puts the current goroutine into a waiting state and calls unlockf. // If unlockf returns false, the goroutine is resumed. // unlockf must not access this G's stack, as it may be moved between // the call to gopark and the call to unlockf. func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason string, traceEv byte, traceskip int){ mp := acquirem() gp := mp.curg status := readgstatus(gp) if status != _Grunning && status != _Gscanrunning { throw("gopark: bad g status") } mp.waitlock = lock mp.waitunlockf = *(*unsafe.Pointer)(unsafe.Pointer(&unlockf)) gp.waitreason = reason mp.waittraceev = traceEv mp.waittraceskip = traceskip releasem(mp) // can't do anything that might move the G between Ms here. mcall(park_m) }
// polls for ready network connections // returns list of goroutines that become runnable func netpoll(block bool) *g { if epfd == -1 { return nil } waitms := int32(-1) if !block { waitms = 0 } var events [128]epollevent retry: n := epollwait(epfd, &events[0], int32(len(events)), waitms) if n < 0 { if n != -_EINTR { println("runtime: epollwait on fd", epfd, "failed with", -n) throw("runtime: netpoll failed") } goto retry } var gp guintptr for i := int32(0); i < n; i++ { ev := &events[i] if ev.events == 0 { continue } var mode int32 if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 { mode += 'r' } if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 { mode += 'w' } if mode != 0 { pd := *(**pollDesc)(unsafe.Pointer(&ev.data))
netpollready(&gp, pd, mode) } } if block && gp == 0 { goto retry } return gp.ptr() }
func (fd *FD) Init(net string, pollable bool) error { // We don't actually care about the various network types. if net == "file" { fd.isFile = true } if !pollable { fd.isBlocking = true return nil } return fd.pd.init(fd) }
for { old := *gpp if old == pdReady { return nil } if old == 0 && !ioready { // Only set READY for ioready. runtime_pollWait // will check for timeout/cancel before waiting. return nil } var new uintptr if ioready { new = pdReady } if atomic.Casuintptr(gpp, old, new) { if old == pdReady || old == pdWait { old = 0 } return (*g)(unsafe.Pointer(old)) } } }